Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: Draft

Discussion thread:

JIRA

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

This proposal is to provide an option to enable EOS for MirrorMaker 2 if no data loss or duplicate data is preferred. The key idea behind this proposal is to extend SinkTask with a brand new MirrorSinkTask implementation, which leverages the consumer from WorkerSinkTask and provides an option to either deliver messages

(1) create Exactly-once: by a transactional producer in MirrorSinkTask and commit consumer offset offsets are committed within an a transaction by the transactional producer

OR

(2) create At-least-once: by a non-transactional producer in MirrorSinkTask and consumer offsets are committed by the consumer itself from WorkerSinkTask separately

The reasons to extend SinkTaskimplement a brand new MirrorSinkTask that extends SinkTask, rather than working on the existing MirrorSourceTask, are the following:

  • as mentioned above, Kafka Connect Source Connector / Task do not provide EOS by nature, mostly because of async and periodic source task offset commit, which in MirorrMaker case, the "offset" is consumer offset. So this seems one of the blockers to enable EOS without a lots of changes in WokerSourceTask.
  • Since MirrorSinkTask (that extends SinkTask) can use a config to use transactional producer or non-transactional producer to produce to the downstream kafka clusteris a new implementation, we can fully control how a producer is created (transactional v.s. non-transactional) and handle various Exception cases (especially in transactional mode), purely in the new implementation.
  • MirrorSinkTask can intentionally MirrorSinkTask can return an empty result from preCommit() (defined in SinkTask) by overriding it to disable the consumer offset commit in done by WorkerSinkTask, so that the transactional producer is able to commit consumer offset within one transaction.

...

New classes and interfaces include:

MirrorSinkConnector, MirrorSinkTask classesMirrorMakerConfig

NameTypeDefaultDoc
transaction
connector.
producer
type
boolean
string
false
sourceif
True, EOS is enabled between consumer and producer

Proposed Changes

MirrorSinkTask

"source", the existing MirrorSourceConnector will be launched. If "sink", the new MirrorSinkConnector will be launched with further option to enable EOS

MirrorConnectorConfig

NameTypeDefaultDoc
transaction.producerbooleanfalseif True, EOS is enabled between consumer and producer

Proposed Changes

MirrorSinkTask

The following is the pseudocode snippet The following is the pseudocode snippet illustrates the high-level key implementation:

Code Block
titleMirrorSinkTask
    private boolean isTransactional = config.getTransactionalProducer();
    private boolean transactionStart = false;
	protected Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>();

	protected KafkaProducer<byte[], byte[]> initProducer(boolean isTransactional) {
        Map<String, Object> producerConfig = config.targetProducerConfig();
        if (isTransactional) {
        	log.info("use transactional producer");
            producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
            producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        	producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, genTransactionId());
        	producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "trasactional-producer");
        } else {
        	producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "non-trasactional-producer");
        }
        return MirrorUtils.newProducer(producerConfig);
	}

    @Override
	public void put(Collection<SinkRecord> records) {
        log.info("receive {} messages from consumer", records.size());
        if (records.size() == 0) {
        	return;
        }
        try {
            sendBatch(records, producer);
        } catch (RebalanceException e) {
            producer.close();
            producer = initProducer(isTransactional);  
        } catch (ResendRecordsException e) {
            abortTransaction(producer);
            //TODO: add limited retry
            sendBatch(e.getRemainingRecords(), producer);
        } catch (Throwable e) {
            log.error(getHostName() + " terminating on exception: {}", e);
            return;
        }
    }

    private void sendBatch(Collection<SinkRecord> records, KafkaProducer<byte[], byte[]> producer) {
        try {
            Map<TopicPartition, List<SinkRecord> remainingRecordsMap = new HashMap<>();
            offsetsMap.clear();
            beginTransaction(producer);
            SinkRecord record;
            for ((record = records.peek()) != null) {
                ProducerRecord<byte[], byte[]> producerRecord = convertToProducerRecord(record);
                offsetsMap.compute(new TopicPartition(record.topic(), record.kafkaPartition()),
                       (tp, curOffsetMetadata) ->
                               (curOffsetMetadata == null || record.kafkaOffset() > curOffsetMetadata.offset())
                                       ?
                                       new OffsetAndMetadata(record.kafkaOffset())
                                       : curOffsetMetadata);
                Future<RecordMetadata> future = producer.send(producerRecord, (recordMetadata, e) -> {
                    if (e != null) {
                        log.error("{} failed to send record to {}: ", MirrorSinkTask.this, producerRecord.topic(), e);
                        log.debug("{} Failed record: {}", MirrorSinkTask.this, producerRecord);
                        throw new KafkaException(e);
                    } else {
                        log.info("{} Wrote record successfully: topic {} partition {} offset {}", //log.trace
                        		MirrorSinkTask.this,
                                recordMetadata.topic(), recordMetadata.partition(),
                                recordMetadata.offset());
                        commitRecord(record, recordMetadata);
                    }
                });
                futures.add(future);
                records.poll();
            }

        } catch (KafkaException e) {
            // Any unsent messages are added to the remaining remainingRecordsMap for re-send
            for (SinkRecord record = records.poll(); record != null; record = records.poll()) {
            	addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMap);
        } finally {  //TODO: may add more exception handling case
            for (Future<RecordMetadata> future : futures) {
                try {
                    future.get();
                } catch (Exception e) {
                    SinkRecord record = futureMap.get(future);
                    // Any record failed to send, add to the remainingRecordsMap
                    addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMap);
                }
            }
         }

         if (isTransactional && remainingRecordsMap.size() == 0) {
             producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);
             commitTransaction(producer);
         }

        if (remainingRecordsMap.size() > 0) {
            // For transaction case, all records should be put into remainingRecords, as the whole transaction should be redone
            Collection<SinkRecord> recordsToReSend;
            if (isTransactional) {
                // Transactional: retry all records, the transaction will have cancelled all of our outputs.
                recordsToReSend = records;
            } else {
                // Non-transactional: only retry failed records, others already were finished and sent.
                recordsToReSend = remainingRecordsMap;
            }
            throw new ResendRecordsException(recordsToReSend);
        }
    }

    @Override
    public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    	// if transactional, return empty Map intentionally to disable the offset commit by commitOffsets() in "WorkerSinkTask.java"
        // so that the transactional producer is able to commit the consumer offsets in a transaction
    	if (isTransactional) 
    		return new HashMap<TopicPartition, OffsetAndMetadata>();
    	else // otherwise, return offsetsMap to let commitOffsets() in "WorkerSinkTask.java" to commit offsets
    		return offsetsMap;	
    }

    // This commitRecord() follows the same logics as commitRecord() in MirrorSourceTask
    public void commitRecord(SinkRecord record, RecordMetadata metadata) {
        try {
            if (stopping) {
                return;
            }
            if (!metadata.hasOffset()) {
                log.error("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic());
                return;
            }
            TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition());
            long latency = System.currentTimeMillis() - record.timestamp();
            metrics.countRecord(topicPartition);
            metrics.replicationLatency(topicPartition, latency);
            TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition());
            long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
            long downstreamOffset = metadata.offset();
            maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset);
        } catch (Throwable e) {
            log.warn("Failure committing record.", e);
        }
    }

    private void beginTransaction(KafkaProducer<byte[], byte[]> producer) {
        if (isTransactional) {
            producer.beginTransaction();
            transactionStart = true;
        }
    }
	
    private void initTransactions(KafkaProducer<byte[], byte[]> producer) {
        if (isTransactional) {
            producer.initTransactions();
        }
    }
    
    private void commitTransaction(KafkaProducer<byte[], byte[]> producer) {
        if (isTransactional) {
            producer.commitTransaction();
            transactionStart = false;
        }
    }
    
    private void abortTransaction(KafkaProducer<byte[], byte[]> producer) {
        if (isTransactional && transactionStart) {
            producer.abortTransaction();
            transactionStart = false;
        }
    }

    public static class ResendRecordsException extends Exception {
        private Collection<SinkRecord> remainingRecords;

        public ResendRecordsException(Collection<SinkRecord> remainingRecords) {
            super(cause);
            this.remainingRecords = remainingRecords;
        }

        public Collection<SinkRecord> getRemainingRecords() {
            return remainingRecords;
        }
    }

Migration to EOS


Deprecation

A config "connector.type" is proposed to choose which type of Connector (source or sink) to use in MirrorMaker2. So both types of Connectors will co-exist for the near future.

In the long term, if MirrorSinkConnector covers all use cases of MirrorSourceConnector and migration is seamless, then in the future release, deprecation of MirrorSource Connector could be considered.

Rejected Alternatives

 https://github.com/apache/kafka/pull/5553

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-6080
 and 
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-3821
 are relevant efforts in a bigger scope, but it seems none of them proceeded successfully for a quite amount of time.

...