Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Added info on REST API pre-flight validation for newly-introduced properties

...

Name

Type

Default

Importance

Doc

exactly.once.support STRING"requested" MEDIUM Permitted values are "requested" and "required". If set to "required", forces a preflight check for the connector to ensure that it can provide exactly-once delivery with the given configuration. Some connectors may be capable of providing exactly-once delivery but not signal to Connect that they support this; in that case, documentation for the connector should be consulted carefully before creating it, and the value for this property should be set to "requested". Additionally, if the value is set to "required" but the worker that performs preflight validation does not have exactly-once support enabled for source connectors, requests to create or validate the connector will fail.
transaction.boundary STRING "poll" MEDIUM Permitted values are "poll", "connector", and "interval". If set to "poll", a new producer transaction will be started and committed for every batch of records that each task from this connector provides to Connect. If set to "connector", relies on connector-defined transaction boundaries; note that not all connectors are capable of defining their own transaction boundaries, and in that case, attempts to create them with this property set to "connector" will fail. Finally, if set to "interval", commits transactions only after a user-defined time interval has passed.

offsets.storage.topic 

STRING 

null 

LOW 

The name of a separate offsets topic to use for this connector. If empty or not specified, the worker’s global offsets topic name will be used. If specified, the offsets topic will be created if it does not already exist on the Kafka cluster targeted by this connector (which may be different from the one used for the worker's global offsets topic if the bootstrap.servers property of the connector's producer has been overridden from the worker's).

transaction.boundary.interval.ms LONG null LOW If "transaction.boundary" is set to "interval", determines the interval for producer transaction commits by connector tasks. If unset, defaults to the value of the worker-level "offset.flush.interval.ms" property.

...

Code Block
languagejava
titleSourceConnector
package org.apache.kafka.connect.source;

public abstract class SourceConnector extends Connector {
	// Existing fields and methods omitted

    /**
     * Signals whether the connector can define its own transaction boundaries with the proposed
     * configuration. Developers must override this method if they wish to add connector-defined
     * transaction boundary support; if they do not, users will be unable to create instances of
     * this connector that use connector-defined transaction boundaries. The default implementation
     * will return {@code false}.
     * @param connectorConfigs the configuration that will be used for the connector
     * @return whether the connector can define its own transaction boundaries  with the given
     * config.
     */
    public boolean canDefineTransactionBoundaries(Map<String, String> connectorConfig) {
		return false;
    }
}

New metrics

Three new task-level JMX properties will be added:

MBean namekafka.connect:type=source-task-metrics,connector=([-.\w]+),task=([\d]+)

REST API pre-flight validation

When a user submits a new connector configuration today, Connect takes steps to ensure that the connector configuration is valid before writing it to the config topic. These steps are performed synchronously and, if any errors are detected or unexpected failures occur, reported to the user in the body and status of the HTTP response for their request.

This validation takes place on the following endpoints:

  • POST /connectors/
  • PUT /connectors/{connector}/config
  • PUT /connector-plugins/{connectorType}/config/validate

Some new pre-flight validation logic will be added for exactly-once logic that also applies to these endpoints (and any other endpoints in the future that perform similar pre-flight connector config validation).

When exactly-once support is required

If the exactly.once.support connector configuration is set to required , the connector's SourceConnector::exactlyOnceSupport  method will be queried.

If the result is UNSUPPORTED , then an error will be reported with the exactly.once.support  property stating that the connector does not provide exactly-once support with the given configuration.

If the result is null , then an error will be reported with the exactly.once.support  property stating that Connect cannot determine whether the connector provides exactly-once guarantees, and that the user should carefully consult documentation for the connector before proceeding by possibly setting the property to "requested".

If the result is SUPPORTED , no error will be reported.

When connector-defined transaction boundaries are required

If the transaction.boundary  connector configuration is set to connector , the connector's SourceConnector::canDefineTransactionBoundaries  method will be queried.

If the result is false , then an error will be reported with the transaction.boundary  property stating that the connector does not support defining its own transaction boundaries, and the user will be encouraged to configure the connector with a different transaction boundary type.

If the result is true , no error will be reported.

New metrics

Three new task-level JMX properties will be added:

MBean namekafka.connect:type=source-task-metrics,connector=([-.\w]+),task=([\d]+)

Metric nameDescription
transaction-size-min The number of records in the smallest transaction the task has
Metric nameDescription
transaction-size-min The number of records in the smallest transaction the task has committed so far.
transaction-size-max The number of records in the largest transaction the task has committed so far.
transaction-size-avg The average number of records in the transactions the task has committed so far.

...

When exactly.once.source.enabled is set to true for a worker, all source tasks created by that worker will use a transactional producer to write to Kafka. All source records that they provide to the worker will be written to Kafka inside a transaction. Once it comes time for offsets that transaction to be committed for a task, offset information for the current batch will be written to the offsets topic, inside the same transaction. Then, the transaction will be committed. This will ensure that source offsets will be committed to Kafka if and only if the source records for that batch were also written to Kafka.

The timing for offset commits will remain the same as it is today; they will be triggered periodically on a fixed interval that users can adjust via the offset.flush.interval.ms property. Transaction boundaries and record batches will be defined by these offset commits; expanded support for transaction boundary definition can be added later and is noted in the "future work" sectionUsers will be able to configure the transaction boundaries for connectors with the newly-introduced transaction.boundary connector configuration property (described in greater detail in the "Public Interfaces" section).

Once an offset commit is complete, if the connector is (implicitly or explicitly) configured with a separate offsets topic, the committed offsets will also be written to the worker’s global offsets topic using a non-transactional producer and the worker’s principal. This will be handled on a separate thread from the task’s work and offset commit threads, and should not block or interfere with the task at all. If the worker fails to write these offsets for some reason, it will retry indefinitely, but not fail the task. This will be done in order to facilitate "hard" downgrades and cases where users switch from per-connector offsets topics back to the global offsets topic.

...

It may take longer than the transaction timeout for a task to flush all of its records to Kafka. In this case, there are some remedial actions that users can take to nurse their connector back to health: tune their producer configuration for higher throughput, increase the transaction timeout for the producers used by the connector, decrease the offset commit interval (if using interval-based transaction boundaries), or reconfigure it to produce records with a lower throughput. We will include include these steps in the error message for a task that fails due to producer transaction timeout.

...