Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Note that offset checkpointing is managed by Kafka Connect. As it is common to monitor consumer lag using consumer groups, the connector will also commit the last batch of offsets to the source Kafka cluster before polling the consumer for new data, unless this is turned off via configuration (consumer.enable.auto.commit = false).

Configuration Options

...

Standard Options

These are the most common options that are required when configuring this connector:

...

1500partitionmonitortopiclisttimeoutmsIndicates whether the partition monitor should request a task reconfiguration when partition leaders have changed. In some cases this may be a minor optimization as when generating task configurations, the connector will try to group partitions to be consumed by each task by the leader node. The downside to this is that it may result in additional rebalances.
Configuration ParameterDefaultDescription
include.message.headerstrueIndicates whether message headers from source records should be included when delivered to the destination cluster.
topic.list.timeout.ms60000Amount of time (in milliseconds) the partition monitor thread should wait for the source kafka cluster to return topic information before logging a timeout error.
topic.list.poll.interval.ms300000Amount of time (in milliseconds) the partition monitor will wait before re-querying the source cluster for a change in the topic partitions to be consumed
reconfigure.tasks.on.leader.changefalseIndicates whether the partition monitor should request a task reconfiguration when partition leaders have changed. In some cases this may be a minor optimization as when generating task configurations, the connector will try to group partitions to be consumed by each task by the leader node. The downside to this is that it may result in additional rebalances.
poll.loop.timeout.ms1000Maximum amount of time (in milliseconds) the connector will wait in each poll loop without data before returning control to the kafka connect task thread.
max.shutdown.wait.ms2000Maximum amount of time (in milliseconds) to wait for the connector to gracefully shut down before forcing the consumer and admin clients to close. Note that any values greater than the kafka connect parameter task.shutdown.graceful.timeout.ms will not have any effect.



source.max.poll.records500Maximum number of records to return from each poll of the internal KafkaConsumer. When dealing with topics with very large messages, the connector may sometimes spend too long processing each batch of records, causing lag in offset commits, or in serious cases, unnecessary consumer rebalances. Reducing this value can help in these scenarios. Conversely, when processing very small messages, increasing this value may improve overall throughput.
source.key.deserializer

org.

apache.

kafka.

common.

serialization.

60000Amount of time (in milliseconds) the partition monitor thread should wait for the source kafka cluster to return topic information before logging a timeout error.
partition.monitor.poll.interval.ms300000Amount of time (in milliseconds) the partition monitor will wait before re-querying the source cluster for a change in the partitions to be consumed

ByteArrayDeserializer

Key deserializer to use for the kafka consumers connecting to the source cluster.

source.value.deserializer

org.apache.kafka.common.serialization.ByteArrayDeserializer

Value deserializer to use for the kafka consumers connecting to the source cluster.

source.enable.auto.committrue

If true the consumer's offset will be periodically committed to the source cluster in the background.

Note that these offsets are not used to resume the connector (They are stored in the Kafka Connect offset store), but may be useful in monitoring the current offset lag of this connector on the source cluster

partition.monitor.reconfigure.tasks.on.leader.changefalse

Overriding the internal KafkaConsumer and AdminClient Configuration

Note that standard Kafka parameters can be passed to the internal KafkaConsumer and AdminClient by prefixing the standard consumer configuration parameters with "source.".

For cases where the configuration for the KafkaConsumer and AdminClient diverges, you can use the more explicit "consumer." and "admin." configuration parameters parameter prefixes to fine tune the settings used for each. 

...