Options of the kafka() source
This section describes the options of the kafka() source in syslog-ng OSE.
The kafka() source of syslog-ng OSE can directly consume log messages from the Apache Kafka message bus. The source has the following options.
Required options
To use the kafka() source, the following two options are required: bootstrap-servers() and topic().
bootstrap-servers()
| Type: | string |
| Default: | N/A |
| Mandatory: | yes |
Description: Specifies the hostname or IP address of the Kafka server. When specifying an IP address, IPv4 (for example, 192.168.0.1) or IPv6 (for example, [::1]) can be used as well. Use a colon (:) after the address to specify the port number of the server. When specifying multiple addresses, use a comma to separate the addresses, for example:
bootstrap-servers(
"127.0.0.1:2525,remote-server-hostname:6464"
)
chain-hostname()
| Type: | yes, no |
| Default: | no |
Description: This option can be used to enable or disable the chained hostname format. For more information, see the chain-hostnames() global option.
config()
| Type: | key-value pairs |
| Default: | N/A |
Description: You can use this option to set the properties of the kafka consumer.
The syslog-ng OSE kafka source supports all properties of the official Kafka consumer. For details, see the librdkafka documentation.
The syntax of the config() option is the following:
config(
“key1” => “value1”
“key2” => “value2”
)
NOTE: The following kafka consumer config options are protected and cannot be overriden in the config() list: bootstrap.servers metadata.broker.list enable.auto.offset.store auto.offset.reset enable.auto.commit auto.commit.enable
default-facility()
| Type: | facility string |
| Default: | kern |
Description: This parameter assigns a facility value to the messages received from the kafka source if the message does not specify one.
default-level()
This is just an alias of default-priority().
default-priority()
| Type: | priority string |
| Default: |
Description: This option defines the default level value if the PRIORITY entry does not exist in the msg received from the kafka source.
For example, default-priority(warning).
default-severity()
This is just an alias of default-priority().
disable-bookmarks()
| Type: | boolean |
| Default: | no |
Description: This option prevents syslog-ng OSE from storing a bookmark (such as position or offset) in its persist file for the last processed message.
NOTE: This will not prevent usage of an already presented bookmark entry, for ignoring those bookmark entries specify ignore-saved-bookmarks(yes) as well.
See Bookmarking in the kafka() source for more details.
dns-cache()
| Type: | yes, no |
| Default: | no |
Description: This option enables or disables the DNS cache usage.
hook-commands()
Description: This option makes it possible to execute external programs when the relevant driver is initialized or torn down. The hook-commands() can be used with all source and destination drivers with the exception of the usertty() and internal() drivers.
NOTE: The syslog-ng OSE application must be able to start and restart the external program, and have the necessary permissions to do so. For example, if your host is running AppArmor or SELinux, you might have to modify your AppArmor or SELinux configuration to enable syslog-ng OSE to execute external applications.
Using the hook-commands() when syslog-ng OSE starts or stops
To execute an external program when syslog-ng OSE starts or stops, use the following options:
startup()
| Type: | string |
| Default: | N/A |
Description: Defines the external program that is executed as syslog-ng OSE starts.
shutdown()
| Type: | string |
| Default: | N/A |
Description: Defines the external program that is executed as syslog-ng OSE stops.
Using the hook-commands() when syslog-ng OSE reloads
To execute an external program when the syslog-ng OSE configuration is initiated or torn down, for example, on startup/shutdown or during a syslog-ng OSE reload, use the following options:
setup()
| Type: | string |
| Default: | N/A |
Description: Defines an external program that is executed when the syslog-ng OSE configuration is initiated, for example, on startup or during a syslog-ng OSE reload.
teardown()
| Type: | string |
| Default: | N/A |
Description: Defines an external program that is executed when the syslog-ng OSE configuration is stopped or torn down, for example, on shutdown or during a syslog-ng OSE reload.
Example: Using the hook-commands() with a network source
In the following example, the hook-commands() is used with the network() driver and it opens an iptables port automatically as syslog-ng OSE is started/stopped.
The assumption in this example is that the LOGCHAIN chain is part of a larger ruleset that routes traffic to it. Whenever the syslog-ng OSE created rule is there, packets can flow, otherwise the port is closed.
source {
network(transport(udp)
hook-commands(
startup("iptables -I LOGCHAIN 1 -p udp --dport 514 -j ACCEPT")
shutdown("iptables -D LOGCHAIN 1")
)
);
};
host-override()
| Type: | string |
| Default: |
Description: Replaces the HOST part of the message with the
parameter string.
ignore-saved-bookmarks()
| Type: | boolean |
| Default: | no |
Description: By default, syslog-ng OSE continues reading logs from the last remembered position (or offset, etc.) stored in its persist file after a restart or reload. If this option is set to yes, it will always start reading from either the beginning or the end of the available log list (depending on the setting of the read-old-records() option.
See Bookmarking in the kafka() source for more details.
kafka-logging()
| Accepted values: | disabled | trace | kafka |
| Default: | disabled |
Description: This option allows you to control how internal Kafka logs appear in the syslog-ng OSE logs.
- disabled: Disables internal Kafka log messages in the syslog-ng OSE logs.
- trace: Logs all internal Kafka messages at the
tracelevel of syslog-ng OSE. - kafka: Logs internal Kafka messages using log levels mapped to those of syslog-ng OSE.
NOTE: The internal Kafka logging level itself can be configured using the config() Kafka options. For details, refer to the librdkafka documentation.
keep-hostname()
| Type: | yes or no |
| Default: | no |
Description: Enable or disable hostname rewriting.
-
If enabled (keep-hostname(yes)), syslog-ng OSE assumes that the incoming log message was sent by the host specified in the
HOSTfield of the message. -
If disabled (keep-hostname(no)), syslog-ng OSE rewrites the
HOSTfield of the message, either to the IP address (if the use-dns() parameter is set to no), or to the hostname (if the use-dns() parameter is set to yes and the IP address can be resolved to a hostname) of the host sending the message to syslog-ng OSE. For details on using name resolution in syslog-ng OSE, see Using name resolution in syslog-ng.
NOTE: If the log message does not contain a hostname in its HOST field,
syslog-ng OSE automatically adds a hostname to the message.
-
For messages received from the network, this hostname is the address of the host that sent the message (this means the address of the last hop if the message was transferred via a relay).
-
For messages received from the local host, syslog-ng OSE adds the name of the host.
This option can be specified globally, and per-source as well. The local setting of the source overrides the global option if available.
NOTE: When relaying messages, enable this option on the syslog-ng OSE server and also on every relay, otherwise syslog-ng OSE will treat incoming messages as if they were sent by the last relay.
keep-timestamp()
| Accepted values: | yes | no |
| Default: | yes |
Description: Specifies whether syslog-ng OSE should accept the timestamp received from the sending application or client. If disabled, the time of reception will be used instead. This option can be specified globally, and per-source as well. The local setting of the source overrides the global option if available.
CAUTION: To use the S_ macros,
the keep-timestamp() option must be enabled (this is the default behavior of syslog-ng OSE).
log-fetch-limit()
| Type: | integer |
| Default: | 10000 |
Description: Specifies the maximum number of messages the main worker will consume and queue from the Kafka broker. This effectively determines the size of the internally used Kafka message queue. If the limit is reached, the kafka() source stops fetching messages from the broker, logs the situation, and waits the amount of time specified by fetch-queue-full-delay() before attempting to fetch new data again.
NOTE: If more than 2 workers are configured and separated-worker-queues() is set to yes, then all processor workers share this total queue size.
For example, with workers(3) and fetch-limit(100000), the 2 processor workers (remember, the first of the configured 3 is always the main worker) will each receive their own queue, and neither queue will grow beyond 50,000 messages.
NOTE: This options worth align with the kafka config options queued.min.messages and queued.max.messages.kbytes, For details, refer to the librdkafka documentation.
log-fetch-delay()
| Type: | integer [1 second / fetch_retry_delay * 1000000 milliseconds] |
| Default: | 1000 (1 millisecond) |
Description: Specifies the time the main worker will wait between attempts to fetch new data.
log-fetch-retry-delay()
| Type: | integer [1 second / fetch_retry_delay * 1000000 milliseconds] |
| Default: | 10000 (10 milliseconds) |
Description: Specifies the time the main worker will wait before attempting to fetch new data again when the broker signals no more data is available.
log-fetch-queue-full-delay()
| Type: | integer in milliseconds |
| Default: | 1000 |
Description: When the main worker reaches the queued message limit defined by fetch-limit(), the kafka() source temporarily stops retrieving messages from the broker. It then waits for the duration specified by fetch-queue-full-delay() before attempting to fetch additional messages.
normalize-hostnames()
| Accepted values: | yes | no |
| Default: | no |
Description: If enabled (normalize-hostnames(yes)), syslog-ng OSE converts the hostnames to lowercase.
NOTE: This setting applies only to hostnames resolved from DNS. It has no effect if the keep-hostname() option is enabled, and the message contains a hostname.
persist-name()
| Type: | string |
| Default: | N/A |
Description: If you receive the following error message during syslog-ng OSE startup, set the persist-name() option of the duplicate drivers:
Error checking the uniqueness of the persist names, please override it with persist-name option. Shutting down.
or
Automatic assignment of persist names failed, as conflicting persist names were found. Please override the automatically assigned identifier using an explicit persist-name() option or remove the duplicated configuration elements.
This error happens if you use identical drivers in multiple sources, for example, if you configure two file sources to read from the same file. In this case, set the persist-name() of the drivers to a custom string, for example, persist-name(“example-persist-name1”).
persist-store()
| Accepted values: | local | remote |
| Default: | local |
Description: Specifies where syslog-ng OSE stores the offset of the last consumed Kafka message. local means it uses the default syslog-ng OSE persist file, while remote means it uses the remote Kafka offset store.
For more details, see Bookmarking in the kafka() source.
poll-timeout()
| Type: | integer in milliseconds |
| Default: | 10000 |
Description: Specifies the maximum amount of time syslog-ng OSE waits during a Kafka broker poll request for new messages to become available.
program-override()
| Type: | string |
| Default: |
Description: Replaces the ${PROGRAM} part of the message with the parameter string. For example, to mark every message coming from the kernel, include the program-override("kernel") option in the source containing /proc/kmsg. \
read-old-records()
| Type: | boolean |
| Default: | no |
Description: If read-old-record() is set to yes, syslog-ng OSE will start fetching from the oldest available message; otherwise, it will start from the newest one (if no bookmarks are present, or ignore-saved-bookmarks() is set to yes).
See Bookmarking in the kafka() source for more details.
single-worker-queue()
| Type: | yes | no |
| Default: | yes |
Description: When the value of workers() is greater than 2 (meaning multiple processor threads are used to handle queued messages), and single-worker-queue() is set to no, the main worker of the kafka() source distributes the consumed messages into separate queues, one for each processor worker.
NOTE: This approach can improve performance, especially in high-throughput scenarios, but may also lead to significantly increased memory usage.
state-update-timeout()
| Type: | integer in milliseconds |
| Default: | 1000 |
Description: Specifies the maximum amount of time syslog-ng OSE waits during Kafka broker state queries or other requests, such as metadata queries, partition offset queries/seeking, etc.
store-metadata()
| Type: | yes | no |
| Default: | no |
Description: If set to yes, syslog-ng OSE will add the next key-value pairs to the forwarded log messages
.kafka.topic=> the name of the topic the message came from.kafka.partition=> the partition of the topic.kafka.offset=> the offset of the message in the partition.kafka.key=> the partition key
strategy-hint()
| Accepted values: | assign, subscribe |
| Default: | assign |
Description: This option provides a hint about which Kafka consumer strategy the kafka() source should use when the topic() list contains topic/partition definitions that could be handled in either way.
Why is it worth using dual consumer strategies? describes the differences between the two.
For details on how the resulting topic names, partitions, and Kafka assign/subscribe strategies are determined in different scenarios, see the Basic strategy usage cross-reference of the different topic configuration cases ; for information on how the resulting strategy participates in offset storing and bookmarking, refer to Bookmarking in the kafka() source.
tags()
| Type: | string |
| Default: |
Description: Label the messages received from the source with custom tags. Tags must be unique, and enclosed between double quotes. When adding multiple tags, separate them with comma, for example, tags(“dmz”, “router”). This option is available only in syslog-ng OSE 3.1 and later.
time-reopen()
| Type: | integer in seconds |
| Default: | 60 |
Description: The time syslog-ng OSE waits between attempts to recover from errors that require re-initialization of the full kafka connection and its internally used data structures.
time-zone()
| Type: | name of the timezone, or the timezone offset |
| Default: |
Description: The default timezone for messages read from the source. Applies only if no timezone is specified within the message itself.
The timezone can be specified by using the name, for example, time-zone(“Europe/Budapest”)), or as the timezone offset in +/-HH:MM format, for example, +01:00). On Linux and UNIX platforms, the valid timezone names are listed under the /usr/share/zoneinfo directory.
topic()
| Type: | key-value pairs |
| Default: | N/A |
| Mandatory: | yes |
Description: A list of pairs consisting of Kafka topic name(s) and partition number(s) from which messages are consumed, for example:
topic(
"^topic-name-[13]$" => "-1"
"topic-name-2" => "1"
"topic-name-4" => "-1"
"topic-name-5" => "0,1,4"
}
Valid topic names have the following limitations:
- The topic name must either contain only characters matching the pattern
[-._a-zA-Z0-9], or it can be a regular expression.
For example:^topic-name-[13]$(which expands totopic-name-1andtopic-name-3). - The length of the topic name must be between 1 and 249 characters.
The partition number must be:
- either a single partition number or a comma-separated list of partition numbers
- a positive integer, or
-1, which means all partitions of the topic
For details about how the resulting topic names, partitions, and Kafka assign/subscribe strategies are determined in different scenarios, see Basic startegy usage cross-reference of the different topic configuration cases and Why is it worth using dual consumer strategies?
use-dns()
| Accepted values: | yes, no, persist_only |
| Default: | yes |
Description: Enable or disable DNS usage. The persist_only option attempts to resolve hostnames locally from file (for example, from /etc/hosts). The syslog-ng OSE application blocks on DNS queries, so enabling DNS may lead to a Denial of Service attack. To prevent DoS, protect your syslog-ng OSE network endpoint with firewall rules, and make sure that all hosts which may get to syslog-ng OSE are resolvable. This option can be specified globally, and per-source as well. The local setting of the source overrides the global option if available.
NOTE: This option has no effect if the keep-hostname() option is enabled (keep-hostname(yes)) and the message contains a hostname.
use-fqdn()
| Accepted values: | yes | no |
| Default: | no |
Description: Use this option to add a Fully Qualified Domain Name (FQDN) instead of a short hostname. You can specify this option either globally or per-source. The local setting of the source overrides the global option if available.
TIP: Set use-fqdn() to yes if you want to use the custom-domain() global option.
NOTE: This option has no effect if the keep-hostname() option is enabled (keep-hostname(yes)) and the message contains a hostname.
workers()
| Type: | integer |
| Default: | 2 |
Description: The number of workers the kafka() source uses to consume and process messages from the kafka broker. By default, uses two of them:
- One main worker that fetches messages from the Kafka broker and stores them into an internal queue.
- A second worker that processes the queued messages and forwards them to the configured destination.
Although the source can operate using a single worker, this configuration typically results in a significant performance penalty compared to the default multi-worker setup.
Increasing the number of workers beyond two may further improve throughput, especially when the main worker can fetch messages at high speed. In such cases, you may also need to fine-tune related options such as single-worker-queue(), log-fetch-limit(), log-fetch-delay(), log-fetch-retry-delay() and log-fetch-queue-full-delay().
CAUTION:
Only kafka() sources with workers() set to less than 3 can guarantee ordered message forwarding.
NOTE: Kafka clients have their own threadpool, entirely independent from
any syslog-ng OSE settings. The workers() option has no effect on this threadpool.