Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


Code Block
    private boolean isTransactional = config.getTransactionalProducer();
    private boolean transactionInProgress = false;
	protected Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>();
    private Set<TopicPartition> taskTopicPartitions;
	private KafkaProducer<byte[], byte[]> producer;
    private String connectConsumerGroup;

	public void start(Map<String, String> props) {
		config = new MirrorTaskConfig(props);
	    taskTopicPartitions = config.taskTopicPartitions();
        isTransactional = config.transactionalProducer();
	    producer = initProducer(isTransactional);
        connectConsumerGroup = getSourceConsumerGroupId();
		if (isTransactional) {
			Map<TopicPartition, OffsetAndMetadata> initOffsetsOnTarget = listTargetConsumerGroupOffsets(connectConsumerGroup);
			// To rewind consumer to correct offset per <topic, partition> assigned to this task, there are more than one case:
            // (1) offset exists on target, use target offset as the starting point of consumption
            // (2) offset DOES NOT exist on target, presume a new topic, so set offset to 0
            Map<TopicPartition, Long> taskOffsets =
            		x -> x, x -> initOffsetsOnTarget.containsKey(x) 
                    initOffsetsOnTarget.get(x).offset() + 1
                    : 0L));

	protected KafkaProducer<byte[], byte[]> initProducer(boolean isTransactional) {
        Map<String, Object> producerConfig = config.targetProducerConfig();
        if (isTransactional) {
        	String transactionId = getTransactionId();"use transactional producer with Id: {} ", transactionId);
            producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
            producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        	producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, genTransactionId()transactionId);
        	producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "trasactional-producer"transactionId);
        } else
       return 	producerConfigMirrorUtils.put(ProducerConfig.CLIENT_ID_CONFIG, "non-trasactional-producer"newProducer(producerConfig);

    } * Per some articles, to avoid ProducerFencedException, transaction id is suggested to set application name + hostname
     * Each MirrorSinkTask is also assigned with different set of <topic, partition>. To get unique transaction id,
     * one way is to append connector name, hostname and string of each <topic, partition> pair
    protected String getTransactionId() {
    	return connectorName + "-" + getHostName() + "-" + getTopicPartitionStr();
    protected String getTopicPartitionStr() {
    	StringBuffer buf = new StringBuffer();
    	for (TopicPartition tp :  taskTopicPartitions) {
    		buf.append(tp.topic() + "_" + tp.partition() + "-");
    	return MirrorUtils.newProducer(producerConfigbuf.toString().substring(0, buf.length() - 1);

	public void put(Collection<SinkRecord> records) {"receive {} messages from consumer", records.size());
        if (records.size() == 0) {
        try {
            sendBatch(records, producer);
        } catch (RebalanceException e) {
            producer = initProducer(isTransactional);  
        } catch (ResendRecordsException e) {
            //TODO: add limited retry
            sendBatch(e.getRemainingRecords(), producer);
        } catch (Throwable e) {
            log.error(getHostName() + " terminating on exception: {}", e);

    private void sendBatch(Collection<SinkRecord> records, KafkaProducer<byte[], byte[]> producer) {
        try {
            Map<TopicPartition, List<SinkRecord> remainingRecordsMap = new HashMap<>();
            SinkRecord record;
            for ((record = records.peek()) != null) {
                ProducerRecord<byte[], byte[]> producerRecord = convertToProducerRecord(record);
                offsetsMap.compute(new TopicPartition(record.topic(), record.kafkaPartition()),
                       (tp, curOffsetMetadata) ->
                               (curOffsetMetadata == null || record.kafkaOffset() > curOffsetMetadata.offset())
                                       new OffsetAndMetadata(record.kafkaOffset())
                                       : curOffsetMetadata);
                Future<RecordMetadata> future = producer.send(producerRecord, (recordMetadata, e) -> {
                    if (e != null) {
                        log.error("{} failed to send record to {}: ", MirrorSinkTask.this, producerRecord.topic(), e);
                        log.debug("{} Failed record: {}", MirrorSinkTask.this, producerRecord);
                        throw new KafkaException(e);
                    } else {
              "{} Wrote record successfully: topic {} partition {} offset {}", //log.trace
                                recordMetadata.topic(), recordMetadata.partition(),
                        commitRecord(record, recordMetadata);

        } catch (KafkaException e) {
            // Any unsent messages are added to the remaining remainingRecordsMap for re-send
            for (SinkRecord record = records.poll(); record != null; record = records.poll()) {
            	addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMap);
        } finally {  //TODO: may add more exception handling case
            for (Future<RecordMetadata> future : futures) {
                try {
                } catch (Exception e) {
                    SinkRecord record = futureMap.get(future);
                    // Any record failed to send, add to the remainingRecordsMap
                    addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMap);

         if (isTransactional && remainingRecordsMap.size() == 0) {
             producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);

        if (remainingRecordsMap.size() > 0) {
            // For transaction case, all records should be put into remainingRecords, as the whole transaction should be redone
            Collection<SinkRecord> recordsToReSend;
            if (isTransactional) {
                // transactional: retry all records, as the transaction will cancel all successful and failed records.
                recordsToReSend = records;
            } else {
                // non-transactional: only retry failed records, others were finished and sent.
                recordsToReSend = remainingRecordsMap;
            throw new ResendRecordsException(recordsToReSend);

    public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    	// if transactional, return empty Map intentionally to disable the offset commit by commitOffsets() in
        // so that the transactional producer is able to commit the consumer offsets to target cluster in a transaction at its pace
    	if (isTransactional) 
    		return new HashMap<TopicPartition, OffsetAndMetadata>();
    	else // otherwise, return offsetsMap to let commitOffsets() in "" to commit offsets
    		return offsetsMap;	

    // This commitRecord() follows the same logics as commitRecord() in MirrorSourceTask, to 
    public void commitRecord(SinkRecord record, RecordMetadata metadata) {
        try {
            if (stopping) {
            if (!metadata.hasOffset()) {
                log.error("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic());
            TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition());
            long latency = System.currentTimeMillis() - record.timestamp();
            metrics.replicationLatency(topicPartition, latency);
            TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition());
            long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
            long downstreamOffset = metadata.offset();
            maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset);
        } catch (Throwable e) {
            log.warn("Failure committing record.", e);

    private void beginTransaction(KafkaProducer<byte[], byte[]> producer) {
        if (isTransactional) {
            transactionInProgress = true;
    private void initTransactions(KafkaProducer<byte[], byte[]> producer) {
        if (isTransactional) {
    private void commitTransaction(KafkaProducer<byte[], byte[]> producer) {
        if (isTransactional) {
            transactionInProgress = false;
    private void abortTransaction(KafkaProducer<byte[], byte[]> producer) {
        if (isTransactional && transactionInProgress) {
            transactionInProgress = false;

    public static class ResendRecordsException extends Exception {
        private Collection<SinkRecord> remainingRecords;

        public ResendRecordsException(Collection<SinkRecord> remainingRecords) {
            this.remainingRecords = remainingRecords;

        public Collection<SinkRecord> getRemainingRecords() {
            return remainingRecords;
