...
There is extensive error handling/retry capability (KIP-298, KIP-458) but none of it applies to the step of the process if the actual write to Kafka fails.
Public Interfaces
...
SourceTask.java
Code Block |
---|
/** public static final String IGNORE_PRODUCER_EXCEPTIONS_CONFIG = "errors.producer.ignore";* <p> public static* finalCommit boolean IGNORE_PRODUCER_EXCEPTIONS_DEFAULT = false; private static final String IGNORE_PRODUCER_EXCEPTIONS_DOC = "If true, the connector will ignore producer exceptions. " + "The SourceRecord that failed to be written will be handed to commitRecord for handling by the connector."; ..an individual {@link SourceRecord} when the callback from the producer client is received. This method is * also called when a record is filtered by a transformation or when {@link ConnectorConfig} "errors.tolerance" is set to "all" * and thus will never be ACK'd by a broker. * In both cases {@code metadata} will be null. .define(IGNORE_PRODUCER_EXCEPTIONS_CONFIG, Type.BOOLEAN, IGNORE_PRODUCER_EXCEPTIONS_DEFAULT, * </p> * <p> * SourceTasks are not required to implement this functionality; Kafka Connect will record offsets Importance.MEDIUM, IGNORE_PRODUCER_EXCEPTIONS_DOC) ... |
SourceTask.java
Code Block |
---|
* automatically. /**This hook is provided for systems that also need to store *offsets <p>internally * Commitin antheir individual {@link SourceRecord} when the callback from the producer client is received. This method isown system. * </p> * <p> * also called when a record is filtered by a transformation, and thus will never be ACK'd by a broker. It is also The default implementation just calls {@link #commitRecord(SourceRecord)}, which is a nop by default. It is * not necessary to implement both methods. * called</p> when {@link WorkerConfig} "errors.producer.ignore" is set to true. * * In@param bothrecord cases {@code@link metadataSourceRecord} willthat bewas null.successfully sent via the producer, filtered by a * </p> * <p>transformation, or dropped on producer exception * SourceTasks@param aremetadata not{@link requiredRecordMetadata} torecord implementmetadata thisreturned functionality;from Kafkathe Connectbroker, will or null if the record offsets was filtered or if producer *exceptions automatically.are Thisignored hook is provided for systems* that@throws alsoInterruptedException need to store offsets internally*/ public *void in their own system.commitRecord(SourceRecord record, RecordMetadata metadata) * </p> throws *InterruptedException <p>{ * The default implementation// just calls {@link #commitRecord(SourceRecord)}by default, whichjust iscall aother nopmethod byfor default. It isbackwards compatibility * not commitRecord(record); } |
WorkerSourceTask.java
Code Block |
---|
producer.send(necessary to implement both methods. * </p> * * @param record {@link SourceRecord} that was successfully sent via the producer, filtered by a transformation, or dropped on producer exception producerRecord, (recordMetadata, *e) @param-> metadata{ {@link RecordMetadata} record metadata returned from the broker, or null if the record was filtered or if producer exceptions are ignored if (e *!= @throwsnull) InterruptedException{ */ public void commitRecord(SourceRecord record, RecordMetadata metadata) throws InterruptedException log.error("{ } failed to send record to {}: ", WorkerSourceTask.this, // by default, just call other method for backwards compatibility topic, e); commitRecord(record); } |
WorkerSourceTask.java
Code Block |
---|
producer.send( log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord); producerRecord, (recordMetadata, e) ->if (retryWithToleranceOperator.withinToleranceLimits()) { if (e != commitTaskRecord(preTransformRecord, null) {; log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e); } else { log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord); if (workerConfig.getBoolean(WorkerConfig.IGNORE_PRODUCER_EXCEPTIONS_CONFIG)) {producerSendException.compareAndSet(null, e); commitTaskRecord(preTransformRecord, null); } else { producerSendException.compareAndSet(null, e); } |
Proposed Changes
I would like to add a configuration item to WorkerConfig: "errors.producer.ignore".
WorkerSourceTask will check this configuration item to see if it should “ignore” this exception, call commitRecord(SourceRecord, RecordMetadata) to allow the connector to ack their source system, and continue processing. Otherwise it will set the producerSendException and the current behavior applies.
Compatibility, Deprecation, and Migration Plan
The configuration item would default to false. Any Connect Worker that does not set it to true it will continue with the current behavior of killing the task on a Producer Exception.
...
} |
Proposed Changes
On kafka producer failure, WorkerSourceTask will check
Code Block |
---|
retryWithToleranceOperator.withinToleranceLimits() |
to see if it should “ignore” this exception, call commitRecord(SourceRecord, RecordMetadata) to allow the connector to ack their source system, and continue processing. Otherwise it will set the producerSendException and the current behavior applies.
Setting "errors.tolerance" to "all" enables the functionality to skip records on producer write failure.
Compatibility, Deprecation, and Migration Plan
The configuration item would default to false. Any Connect Worker that does not set it to true it will continue with the current behavior of killing the task on a Producer Exception.
Rejected Alternatives
Adding a new connect worker configuration item:
Existing configuration could be used and setting this at the connect worker level takes the choice out of the hands of the connector developers on how to handle kafka write errors.
WorkerConfig.java
Code Block |
---|
public static final String IGNORE_PRODUCER_EXCEPTIONS_CONFIG = "errors.producer.ignore";
public static final boolean IGNORE_PRODUCER_EXCEPTIONS_DEFAULT = false;
private static final String IGNORE_PRODUCER_EXCEPTIONS_DOC = "If true, the connector will ignore producer exceptions. "
+ "The SourceRecord that failed to be written will be handed to commitRecord for handling by the connector.";
...
.define(IGNORE_PRODUCER_EXCEPTIONS_CONFIG, Type.BOOLEAN, IGNORE_PRODUCER_EXCEPTIONS_DEFAULT,
Importance.MEDIUM, IGNORE_PRODUCER_EXCEPTIONS_DOC)
... |
SourceTask.java
Code Block |
---|
/**
* <p>
* Commit an individual {@link SourceRecord} when the callback from the producer client is received. This method is
* also called when a record is filtered by a transformation, and thus will never be ACK'd by a broker. It is also
* called when {@link WorkerConfig} "errors.producer.ignore" is set to true.
* In both cases {@code metadata} will be null.
* </p>
* <p>
* SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
* automatically. This hook is provided for systems that also need to store offsets internally
* in their own system.
* </p>
* <p>
* The default implementation just calls {@link #commitRecord(SourceRecord)}, which is a nop by default. It is
* not necessary to implement both methods.
* </p>
*
* @param record {@link SourceRecord} that was successfully sent via the producer, filtered by a transformation, or dropped on producer exception
* @param metadata {@link RecordMetadata} record metadata returned from the broker, or null if the record was filtered or if producer exceptions are ignored
* @throws InterruptedException
*/
public void commitRecord(SourceRecord record, RecordMetadata metadata)
throws InterruptedException {
// by default, just call other method for backwards compatibility
commitRecord(record);
} |
WorkerSourceTask.java
Code Block |
---|
producer.send(
producerRecord,
(recordMetadata, e) -> {
if (e != null) {
log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
if (workerConfig.getBoolean(WorkerConfig.IGNORE_PRODUCER_EXCEPTIONS_CONFIG)) {
commitTaskRecord(preTransformRecord, null);
}
else {
producerSendException.compareAndSet(null, e);
} |
New interface method to SourceTask:
...