THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
/** * <p> * Commit an individual {@link SourceRecord} when the callback from the producer client is received. This method is * also called when a record is filtered by a transformation or when {@link ConnectorConfig} "errors.tolerance" is set to "all" * and thus will never be ACK'd by a broker. * In both cases {@code metadata} will be null. * </p> * <p> * SourceTasks are not required to implement this functionality; Kafka Connect will record offsets * automatically. This hook is provided for systems that also need to store offsets internally * in their own system. * </p> * <p> * The default implementation just calls {@link #commitRecord(SourceRecord)}, which is a nop by default. It is * not necessary to implement both methods. * </p> * * @param record {@link SourceRecord} that was successfully sent via the producer, filtered by a transformation, or dropped on producer exception * @param metadata {@link RecordMetadata} record metadata returned from the broker, or null if the record was filtered or if producer exceptions are ignored * @throws InterruptedException */ public void commitRecord(SourceRecord record, RecordMetadata metadata) throws InterruptedException { // by default, just call other method for backwards compatibility commitRecord(record); } |
Other Changes
WorkerSourceTask.java
Code Block |
---|
producer.send( producerRecord, (recordMetadata, e) -> { if (e != null) { log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e); log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord); if (retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) { commitTaskRecord(preTransformRecord, null); } else { producerSendException.compareAndSet(null, e); } |
...