Options of the kafka() source
This section describes the options of the kafka() source in syslog-ng OSE.
The kafka() source of syslog-ng OSE can directly consume log messages from the Apache Kafka message bus. The source has the following options.
Required options
To use the kafka() source, the following two options are required: bootstrap-servers() and topic(). Both must appear at the beginning of your syslog-ng OSE configuration.
bootstrap-servers()
| Type: | string |
| Default: | N/A |
| Mandatory: | yes |
Description: Specifies the hostname or IP address of the Kafka server. When specifying an IP address, IPv4 (for example, 192.168.0.1) or IPv6 (for example, [::1]) can be used as well. Use a colon (:) after the address to specify the port number of the server. When specifying multiple addresses, use a comma to separate the addresses, for example:
bootstrap-servers(
"127.0.0.1:2525,remote-server-hostname:6464"
)
config()
| Type: | key-value pairs |
| Default: | N/A |
Description: You can use this option to set the properties of the kafka consumer.
The syslog-ng OSE kafka source supports all properties of the official Kafka consumer. For details, see the librdkafka documentation.
The syntax of the config() option is the following:
config(
“key1” => “value1”
“key2” => “value2”
)
NOTE: The following kafka consumer config options are protected and cannot be overriden in the config() list: bootstrap.servers metadata.broker.list enable.auto.offset.store auto.offset.reset enable.auto.commit auto.commit.enable
disable-bookmarks()
| Type: | boolean |
| Default: | no |
Description: This option prevents syslog-ng OSE from storing a bookmark (such as position or offset) in its persist file for the last processed message.
NOTE: This will not prevent usage of an already presented bookmark entry, for ignoring those bookmark entries specify ignore-saved-bookmarks(yes) as well.
fetch-limit()
| Type: | integer |
| Default: | 10000 |
Description: Specifies the maximum number of messages the main worker will consume and queue from the Kafka broker. This effectively determines the size of the internally used Kafka message queue. If the limit is reached, the kafka() source stops fetching messages from the broker, logs the situation, and waits the amount of time specified by fetch-queue-full-delay() before attempting to fetch new data again.
NOTE: If more than 2 workers are configured and separated-worker-queues() is set to yes, then all processor workers share this total queue size.
For example, with workers(3) and fetch-limit(100000), the 2 processor workers (remember, the first of the configured 3 is always the main worker) will each receive their own queue, and neither queue will grow beyond 50,000 messages.
fetch-delay()
| Type: | integer [1 second / fetch_retry_delay * 1000000 milliseconds] |
| Default: | 1000 (1 millisecond) |
Description: Specifies the time the main worker will wait between attempts to fetch new data.
fetch-retry-delay()
| Type: | integer [1 second / fetch_retry_delay * 1000000 milliseconds] |
| Default: | 10000 (10 milliseconds) |
Description: Specifies the time the main worker will wait before attempting to fetch new data again when the broker signals no more data is available.
fetch-queue-full-delay()
| Type: | integer in milliseconds |
| Default: | 1000 |
Description: When the main worker reaches the queued message limit defined by fetch-limit(), the kafka() source temporarily stops retrieving messages from the broker. It then waits for the duration specified by fetch-queue-full-delay() before attempting to fetch additional messages.
hook-commands()
Description: This option makes it possible to execute external programs when the relevant driver is initialized or torn down. The hook-commands() can be used with all source and destination drivers with the exception of the usertty() and internal() drivers.
NOTE: The syslog-ng OSE application must be able to start and restart the external program, and have the necessary permissions to do so. For example, if your host is running AppArmor or SELinux, you might have to modify your AppArmor or SELinux configuration to enable syslog-ng OSE to execute external applications.
Using the hook-commands() when syslog-ng OSE starts or stops
To execute an external program when syslog-ng OSE starts or stops, use the following options:
startup()
| Type: | string |
| Default: | N/A |
Description: Defines the external program that is executed as syslog-ng OSE starts.
shutdown()
| Type: | string |
| Default: | N/A |
Description: Defines the external program that is executed as syslog-ng OSE stops.
Using the hook-commands() when syslog-ng OSE reloads
To execute an external program when the syslog-ng OSE configuration is initiated or torn down, for example, on startup/shutdown or during a syslog-ng OSE reload, use the following options:
setup()
| Type: | string |
| Default: | N/A |
Description: Defines an external program that is executed when the syslog-ng OSE configuration is initiated, for example, on startup or during a syslog-ng OSE reload.
teardown()
| Type: | string |
| Default: | N/A |
Description: Defines an external program that is executed when the syslog-ng OSE configuration is stopped or torn down, for example, on shutdown or during a syslog-ng OSE reload.
Example: Using the hook-commands() with a network source
In the following example, the hook-commands() is used with the network() driver and it opens an iptables port automatically as syslog-ng OSE is started/stopped.
The assumption in this example is that the LOGCHAIN chain is part of a larger ruleset that routes traffic to it. Whenever the syslog-ng OSE created rule is there, packets can flow, otherwise the port is closed.
source {
network(transport(udp)
hook-commands(
startup("iptables -I LOGCHAIN 1 -p udp --dport 514 -j ACCEPT")
shutdown("iptables -D LOGCHAIN 1")
)
);
};
ignore-saved-bookmarks()
| Type: | boolean |
| Default: | no |
Description: By default, syslog-ng OSE continues reading logs from the last remembered position (or offset, etc.) stored in its persist file after a restart or reload. If this option is set to yes, it will always start reading from either the beginning or the end of the available log list (depending on the setting of the read-old-records() option.
kafka-logging()
| Accepted values: | disabled | trace | kafka |
| Default: | disabled |
Description: This option allows you to control how internal Kafka logs appear in the syslog-ng OSE logs.
- disabled: Disables internal Kafka log messages in the syslog-ng OSE logs.
- trace: Logs all internal Kafka messages at the
tracelevel of syslog-ng OSE. - kafka: Logs internal Kafka messages using log levels mapped to those of syslog-ng OSE.
NOTE: The internal Kafka logging level itself can be configured using the config() Kafka options. For details, refer to the librdkafka documentation.
persist-name()
| Type: | string |
| Default: | N/A |
Description: If you receive the following error message during syslog-ng OSE startup, set the persist-name() option of the duplicate drivers:
Error checking the uniqueness of the persist names, please override it with persist-name option. Shutting down.
or
Automatic assignment of persist names failed, as conflicting persist names were found. Please override the automatically assigned identifier using an explicit persist-name() option or remove the duplicated configuration elements.
This error happens if you use identical drivers in multiple sources, for example, if you configure two file sources to read from the same file. In this case, set the persist-name() of the drivers to a custom string, for example, persist-name(“example-persist-name1”).
poll-timeout()
| Type: | integer in milliseconds |
| Default: | 10000 |
Description: Specifies the maximum amount of time syslog-ng OSE waits during a Kafka broker poll request for new messages to become available.
separated-worker-queues()
| Type: | yes | no |
| Default: | no |
Description: When the value of workers() is greater than 2 (meaning multiple processor threads are used to handle queued messages), and separated-worker-queues() is set to yes, the main worker of the kafka() source distributes the consumed messages into separate queues, one for each processor worker.
NOTE: This approach can improve performance, especially in high-throughput scenarios, but may also lead to significantly increased memory usage.
strategy-hint()
| Accepted values: | assign, subscribe |
| Default: | assign |
Description: This option provides a hint about which Kafka consumer strategy the kafka() source should use when the topic() list contains topic/partition definitions that could be handled in either way.
Why is it worth using dual consumer strategies? describes the differences between the two.
For details about how the resulting topic names, partitions, and Kafka assign/subscribe strategies are determined in different scenarios, see Basic startegy usage cross-reference of the different topic configuration cases
time-reopen()
| Type: | integer in seconds |
| Default: | 60 |
Description: The time syslog-ng OSE waits between attempts to recover from errors that require re-initialization of the full kafka connection and its internally used data structures.
topic()
| Type: | key-value pairs |
| Default: | N/A |
| Mandatory: | yes |
Description: The Kafka topic(s) and partition number(s) from which messages are consumed, for example:
topic(
"^topic-name-[13]$" => "-1"
"topic-name-2" => "1"
"topic-name-4" => "-1"
"topic-name-5" => "0,1,4"
}
Valid topic names for the topic() option have the following limitations:
- The topic name must either contain only characters matching the pattern
[-._a-zA-Z0-9], or it can be a regular expression.
For example:^topic-name-[13]$(which expands totopic-name-1andtopic-name-3). - The length of the topic name must be between 1 and 249 characters.
The partition number must be either a single partition number or a comma-separated list of partition numbers.
It must be a positive integer, or -1, which means all partitions of the topic.
For details about how the resulting topic names, partitions, and Kafka assign/subscribe strategies are determined in different scenarios, see Basic startegy usage cross-reference of the different topic configuration cases and Why is it worth using dual consumer strategies?
workers()
| Type: | integer |
| Default: | 2 |
Description: The number of workers the kafka() source uses to consume and process messages from the kafka broker. By default, uses two of them:
- One main worker that fetches messages from the Kafka broker and stores them into an internal queue.
- A second worker that processes the queued messages and forwards them to the configured destination.
Although the source can operate using a single worker, this configuration typically results in a significant performance penalty compared to the default multi-worker setup.
Increasing the number of workers beyond two may further improve throughput, especially when the main worker can fetch messages at high speed. In such cases, you may also need to fine-tune related options such as separated-worker-queues(), fetch-limit(), fetch-delay(), fetch-retry-delay(), fetch-queue-full-delay().
CAUTION: Only kafka() sources with workers() set to less than 3 can guarantee ordered message forwarding.
NOTE: Kafka clients have their own threadpool, entirely independent from
any syslog-ng OSE settings. The workers() option has no effect on this threadpool.