Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

MirrorMaker2 is currently implemented on Kafka Connect Framework, more specifically the Source Connector / Task, which do not provide exactly-once semantics (EOS) out-of-the-box, as discussed in https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  https://github.com/apache/kafka/pull/5553

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-6080
 and 
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-3821
. Therefore MirrorMaker2 currently Therefore current MirrorMaker2 does not provide EOS as well.

This proposal is to provide an option to enable EOS for MirrorMaker 2 if no data loss or duplicate data is preferred. The key idea behind of this proposal is to extend SinkTask with a brand new MirrorSinkTask implementation, which leverages the consumer from WorkerSinkTask and provides which has an option to either deliver messagesmanage the consumer offsets in transactional way (similar high-level idea as HDFS Sink Connector), such that the messages can be delivered across clusters:

(1) Exactly-once: by a transactional producer in MirrorSinkTask and consumer offsets are committed within a transaction by the transactional producer

...

  • as mentioned above, Kafka Connect Source Connector / Task do not provide EOS by nature, mostly because of async and periodic source task offset commit, which in MirorrMaker case, the "offset" is consumer offset. So this is one of the blockers blocker to enable EOS without a lots of changes in WokerSourceTask WorkerSourceTask.
  • Since MirrorSinkTask (that extends SinkTask) is a new implementation, we can fully control how a producer is created (transactional v.s. non-transactional) and handle various Exception cases (especially in transactional mode), purely in the new implementation, rather than changing the existing producer in WorkerSourceTaskMirrorSinkTask can intentionally return an empty result from preCommit() (defined in SinkTask) by overriding it to disable the consumer offset commit done by WorkerSinkTask, so that the transactional producer is able to commit consumer offset within one transaction
  • HDFS Sink Connector already achieved EOS, we can correctly implement MirrorSinkTask based on the methods of SinkTask by referring the best practices from HDFS Sink Connector.

Public Interfaces

New classes and interfaces include:

...

(1) in MirrorMaker case, there are source and target clusters. The Normally the consumer pulls the data and stores its offsets are typically stored in the source cluster, while then the producer takes over the data from the consumer and sends them to target cluster. However Kafka transaction can not happen across clusters out-of-the-box. What If we want EOS across clusters, what modifications need to be done?

A: The short answer is the consumer group offsets , which are supposed to be stored on source cluster, are maintained / committed by are managed, committed by the transactional producer and are stored on the target cluster instead.

But However the consumer still has to live on the source cluster in order to pull the data and however , but it seems the offsets are not stored in source cluster (or stored in source cluster, not not accurate). We propose to use the following idea to position rewind the consumer correctly when task restarts or rebalances, while its offsets are stored in target cluster: (the pseudocode are shown in below)

  • MirrorSinkTask don't rely on Connect's internal offsets tracking or __consumer_offsets on the source cluster.
  • the offsets are only written by transaction producer to the target cluster.
  • offsets are stored on the target cluster using a "fake" consumer group. The "fake" means there would be no actual records being consumed by the group, just offsets being stored in __consumer_offsets topic. However, the __consumer_offsets topic on the target cluster (managed by the "fake" consumer group) is the "source of truth" offsets.
  • all records are written in a transaction.
  • when MirrorSourceTask starts or rebalances, it loads initial offsets from __consumer_offsets on the target cluster.

...

  • if the transaction succeeds, the __consumer_offsets topic on the target cluster is updated.
  • if the transaction aborts, all data records are dropped, and the __consumer_offsets topic is on the target cluster is not updated.
  • when MirrorSinkTask starts/restarts, it resumes at the last committed offsets, as stored in the target cluster.

...

  • If consumer group already exists on source cluster, while the "fake" consumer group (with same Group Id) on the target cluster does not exist or its offsets lower than the high watermark. We To avoid duplicate data, it may need to do a one-time offline job to sync the offsets from source cluster to target cluster.

(2) Since the

...

offsets of the consumer group

...

on the source cluster is NOT "source of truth"

...

  and not involved in the transaction. Are they still being updated? Do we still need them in some cases?

A: the offsets of the consumer group on the source cluster are still being updated periodically and independently by the logics in WorkerSInkTask. However they may be lagging behind a little, since (1) they are not involved in transaction, (2) they are periodically committed.

However, they may be still useful in some cases: (1) measure the replication lag between the upstream produce on the source cluster and MirrorMaker's consumption. (2) restore the lost "fake" consumer group with small # of duplicate data

(2) The consumer in WorkerSinkTask periodically commits the offsets that are maintained by itself, how to disable this behavior so that transactional producer in MirrorSinkTask can control when and what to commit?

A: The SinkTask provides a preCommit() method let the implementation of SinkTask (here is MirrorSinkTask) to determine what offsets should be committed in WorkerSinkTask. If in transaction mode, we will intentionally return empty Map in preCommit() (see the code below), so that if the return of preCommit() is empty, the consumer of WorkerSinkTask will ignore committing the offsets that are maintained by itself.



The following is the pseudocode illustrates the high-level key implementation:

Code Block
titleMirrorSinkTask
    private boolean isTransactional = config.getTransactionalProducer();
    private boolean transactionInProgress = false;
	protected Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>();
    private Set<TopicPartition> taskTopicPartitions;
	private KafkaProducer<byte[], byte[]> producer;
    private String connectConsumerGroup;

    @Override
	public void start(Map<String, String> props) {
		config = new MirrorTaskConfig(props);
	    taskTopicPartitions = config.taskTopicPartitions();
        isTransactional = config.transactionalProducer();
	    producer = initProducer(isTransactional)    private boolean isTransactional = config.getTransactionalProducer();
    private boolean transactionInProgress = false;
	protected Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>();
    private Set<TopicPartition> taskTopicPartitions;
	private KafkaProducer<byte[], byte[]> producer;
    private String connectConsumerGroup;

  connectConsumerGroup = @OverridegetSourceConsumerGroupId();
		publicif void start(Map<String, String> props(isTransactional) {
		config = new MirrorTaskConfig(props);
	    taskTopicPartitions = config.taskTopicPartitions();        loadContextOffsets();
        }
    }

    @Override
    public void open(Collection<TopicPartition>  isTransactional = config.transactionalProducer();
	    producer = initProducer(isTransactional);partitions) {
		if (isTransactional) {
    	    loadContextOffsets();
		}
    }

    connectConsumerGroupprivate = getSourceConsumerGroupId();
		if (isTransactionalvoid loadContextOffsets() {
			Map<TopicPartition, OffsetAndMetadata> initOffsetsOnTarget = listTargetConsumerGroupOffsets(connectConsumerGroup);
	
			// To rewind consumer to correct offset per <topic, partition> assigned to this task, there are more than one case:        Set<TopicPartition> assignments = context.assignment();

        // only keep the offsets of the partitions assigned to this task
        Map<TopicPartition, Long> contextOffsets  // (1) offset exists on target, use target offset as the starting point of consumption
= assignments.stream()
        		                 // (2) offset DOES NOT exist on target, presume a new topic, so set offset to 0
            Map<TopicPartition, Long> taskOffsets = taskTopicPartitions.stream().collect(Collectors.toMap(filter(x -> currentOffsets.containsKey(x))
            		x -> x, x -> initOffsetsOnTarget.containsKey(x) 
            		?
                    initOffsetsOnTarget.get(x).offset() + 1
           .collect(Collectors.toMap(
         : 0L));
			context.offset(taskOffsets);
         }
    }

	protected KafkaProducer<byte[], byte[]> initProducer(boolean isTransactional) {
        Map<String, Object> producerConfig = config.targetProducerConfig();
        if (isTransactional) {
      x -> 	Stringx, transactionIdx = getTransactionId(-> currentOffsets.get(x)));

        	logcontext.info("use transactional producer with Id: {} ", transactionIdoffset(contextOffsets);
    }

	protected KafkaProducer<byte[],       producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");byte[]> initProducer(boolean isTransactional) {
            producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"Map<String, Object> producerConfig = config.targetProducerConfig();
        if 	producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);(isTransactional) {
        	producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG,String transactionId = getTransactionId();
        }
        return MirrorUtils.newProducer(producerConfig);
	}

	log.info("use transactional producer with Id: {} ", transactionId);
    /**
     * Per some articles, to avoid ProducerFencedException, transaction id is suggested to set application name + hostname producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
            producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
     * Each MirrorSinkTask is also assigned with different set of <topic, partition>. To get unique transaction id, 	producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);
        	producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, transactionId);
     * one way is}
 to append connector name, hostname and string of each <topic, partition> pairreturn MirrorUtils.newProducer(producerConfig);
	}

     /**/
    protected * StringPer getTransactionId() {
    	return connectorName + "-" + getHostName() + "-" + getTopicPartitionStr();some articles, to avoid ProducerFencedException, transaction id is suggested to set application name + hostname
    }
 * Each MirrorSinkTask 
is also assigned with protecteddifferent Stringset getTopicPartitionStr() {
    	StringBuffer buf = new StringBuffer();of <topic, partition>. To get unique transaction id,
    	for (TopicPartition* tpone :way is taskTopicPartitions)to {
append connector name,  		buf.append(tp.topic() + "_" + tp.partition() + "-");hostname and string of each <topic, partition> pair
    	} */
    protected 	return buf.toString().substring(0, buf.length() - 1String getTransactionId() {
    	return getHostName() + "-" + getUniquePredictableStr();
    }

    @Override
	public void put(Collection<SinkRecord> records) {
        log.info("receive {} messages from consumer", records.size());
        if (records.size() == 0) {
        	return;
        }
        try {
        if    sendBatch(records, producer);.size() == 0) {
        	return;
 } catch (RebalanceException e) {
   }
         producer.close();try {
            sendBatch(records, producer = initProducer(isTransactional);  
        } catch (ResendRecordsExceptionRebalanceException e) {
            abortTransactionproducer.close(producer);
            //TODO: add limited retry
          producer = sendBatch(e.getRemainingRecords(), producer);initProducer(isTransactional);  
        } catch (ThrowableResendRecordsException e) {
            log.error(getHostName() + " terminating on exception: {}", e)abortTransaction(producer);
            return;
   //TODO: add limited retry
     }
    }

    private void sendBatch(Collection<SinkRecord> records, KafkaProducer<byte[], byte[]>e.getRemainingRecords(), producer);
   {
     } catch (Throwable trye) {
            log.error(getHostName() + Map<TopicPartition," List<SinkRecord>terminating remainingRecordsMapon = new HashMap<>(exception: {}", e);
            offsetsMap.clear()return;
        }
    beginTransaction(producer);}

    private void sendBatch(Collection<SinkRecord> records,     SinkRecord record;KafkaProducer<byte[], byte[]> producer) {
        try {
   for ((record = records.peek()) != null) {
   Map<TopicPartition, List<SinkRecord> remainingRecordsMap = new HashMap<>();
        ProducerRecord<byte[], byte[]> producerRecord = convertToProducerRecordoffsetsMap.clear(record);
                offsetsMap.compute(new TopicPartition(record.topic(), record.kafkaPartition()),beginTransaction(producer);
            SinkRecord record;
          (tp, curOffsetMetadata) ->
      for ((record = records.peek()) != null) {
                ProducerRecord<byte[], byte[]> producerRecord =      (curOffsetMetadata == null || record.kafkaOffset() > curOffsetMetadata.offset())
convertToProducerRecord(record);
                offsetsMap.compute(new TopicPartition(record.topic(), record.kafkaPartition()),
                       (tp,    ?curOffsetMetadata) ->
                               (curOffsetMetadata == null || record.kafkaOffset()    new OffsetAndMetadata(record.kafkaOffset> curOffsetMetadata.offset())
                                       : curOffsetMetadata);?
                Future<RecordMetadata> future = producer.send(producerRecord, (recordMetadata, e) -> {
                  new OffsetAndMetadata(record.kafkaOffset())
  if (e != null) {
                        log.error("{} failed to send record to {}: ", MirrorSinkTask.this, producerRecord.topic(), e : curOffsetMetadata);
                Future<RecordMetadata> future = producer.send(producerRecord, (recordMetadata, e) -> {
   log.debug("{} Failed record: {}", MirrorSinkTask.this, producerRecord);
            if (e != null) {
        throw new KafkaException(e);
              log.error("{} failed to send record to {} else {: ", MirrorSinkTask.this, producerRecord.topic(), e);
                        log.infodebug("{} WroteFailed record successfully: topic {} partition {} offset {}", //log.traceMirrorSinkTask.this, producerRecord);
                        		MirrorSinkTask.this, throw new KafkaException(e);
                    } else {
          recordMetadata.topic(), recordMetadata.partition(),
             log.info("{} Wrote record successfully: topic {} partition {} offset {}", //log.trace
             recordMetadata.offset());
           		MirrorSinkTask.this,
             commitRecord(record, recordMetadata);
                    }recordMetadata.topic(), recordMetadata.partition(),
                });
                futuresrecordMetadata.addoffset(future));
                records.poll();
        commitRecord(record, recordMetadata);
   }

        } catch (KafkaException e) {
     }
       // Any unsent messages are added to the remaining remainingRecordsMap for re-send });
            for (SinkRecord record = recordsfutures.polladd(future); record != null; record = records.poll()) {

                	addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMaprecords.poll();
        } finally {  //TODO: may add more exception handling case}

        }    forcatch (Future<RecordMetadata> future : futuresKafkaException e) {
            // Any unsent messages are added tryto {
the remaining remainingRecordsMap for re-send
            for (SinkRecord record = futurerecords.getpoll();
 record != null; record = records.poll()) {
         }  catch 	addConsumerRecordToTopicPartitionRecordsMap(Exceptionrecord, eremainingRecordsMap) {;
        } finally {  //TODO: may add more exception handling case
  SinkRecord record = futureMap.get(future);
       for (Future<RecordMetadata> future : futures) {
        // Any record failed to send, add to thetry remainingRecordsMap{
                    addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMapfuture.get();
                }
 catch (Exception e) {
        }
         }

    SinkRecord record    if (isTransactional && remainingRecordsMap.size() == 0) {
      = futureMap.get(future);
       producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);
             commitTransaction(producer);
         }

// Any record failed to send, add to the remainingRecordsMap
         if (remainingRecordsMap.size() > 0) {
       addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMap);
    // For transaction case, all records should be put into remainingRecords, as the}
 whole transaction should be redone
       }
     Collection<SinkRecord> recordsToReSend;
   }

         if (isTransactional && remainingRecordsMap.size() == 0) {
             producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);
  // transactional: retry all records, as the transaction will cancel all successful and failed records.
 commitTransaction(producer);
         }

        if  recordsToReSend = records;(remainingRecordsMap.size() > 0) {
            // For }transaction elsecase, {
all records should be put into remainingRecords, as the whole transaction should be redone
   // non-transactional: only retry failed records, others were finished andCollection<SinkRecord> sent.recordsToReSend;
            if (isTransactional) {
  recordsToReSend = remainingRecordsMap;
            }
// transactional: retry all records, as the transaction will cancel all successful throwand new ResendRecordsException(recordsToReSend);
failed records.
          }
    }

  recordsToReSend = @Overriderecords;
    public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
  } else 	//{
 if transactional, return empty Map intentionally to disable the offset commit by commitOffsets() in WorkerSinkTask.java
 // non-transactional: only retry failed records, others //were sofinished thatand thesent.
 transactional producer is able to commit the consumer offsets to target cluster in a transaction atrecordsToReSend its= paceremainingRecordsMap;
    	if (isTransactional) 
        		return}
 new HashMap<TopicPartition, OffsetAndMetadata>();
    	else // otherwise, return offsetsMap tothrow letnew commitOffsetsResendRecordsException(recordsToReSend);
 in "WorkerSinkTask.java" to commit offsets
    		return offsetsMap;	}
    }

    // This commitRecord() follows the same logics as commitRecord() in MirrorSourceTask, to 
    public void commitRecord(SinkRecord record, RecordMetadata metadata) {
        try {
            if (stopping) {
                return;
            }
            if (!metadata.hasOffset()) {
                log.error("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic());
                return;
            }
            TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition());
            long latency = System.currentTimeMillis() - record.timestamp();
            metrics.countRecord(topicPartition);
            metrics.replicationLatency(topicPartition, latency);
            TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition());
            long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
            long downstreamOffset = metadata.offset();
            maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset);
        } catch (Throwable e) {
            log.warn("Failure committing record.", e);
        }
    }

    private void beginTransaction(KafkaProducer<byte[], byte[]> producer) {
        if (isTransactional) {
            producer.beginTransaction();
            transactionInProgress = true;
        }
    }
	
    private void initTransactions(KafkaProducer<byte[], byte[]> producer) {
        if (isTransactional) {
            producer.initTransactions();
        }
    }
    
    private void commitTransaction(KafkaProducer<byte[], byte[]> producer) {
        if (isTransactional) {
            producer.commitTransaction();
            transactionInProgress = false;
        }
    }
    
    private void abortTransaction(KafkaProducer<byte[], byte[]> producer) {
        if (isTransactional && transactionInProgress) {
            producer.abortTransaction();
            transactionInProgress = false;
        }
    }

    public static class ResendRecordsException extends Exception {
        private Collection<SinkRecord> remainingRecords;

        public ResendRecordsException(Collection<SinkRecord> remainingRecords) {
            super(cause);
            this.remainingRecords = remainingRecords;
        }

        public Collection<SinkRecord> getRemainingRecords() {
            return remainingRecords;
        }
    }

...