You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 17 Next »

Status

Current state: Draft

Discussion thread:

JIRA: 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

MirrorMaker2 is currently implemented on Kafka Connect Framework, more specifically the Source Connector / Task, which do not provide exactly-once semantics (EOS) out-of-the-box, as discussed in https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  https://github.com/apache/kafka/pull/5553 Unable to render Jira issues macro, execution error.  and  Unable to render Jira issues macro, execution error. . Therefore MirrorMaker2 currently does not provide EOS.

This proposal is to provide an option to enable EOS for MirrorMaker 2 if no data loss or duplicate data is preferred. The key idea behind this proposal is to extend SinkTask with a brand new MirrorSinkTask implementation, which leverages the consumer from WorkerSinkTask and provides an option to either deliver messages

(1) Exactly-once: by a transactional producer in MirrorSinkTask and consumer offsets are committed within a transaction by the transactional producer

OR

(2) At-least-once: by a non-transactional producer in MirrorSinkTask and consumer offsets are committed by the consumer from WorkerSinkTask separately

The reasons to implement a brand new MirrorSinkTask that extends SinkTask, rather than working on the existing MirrorSourceTask, are the following:

  • as mentioned above, Kafka Connect Source Connector / Task do not provide EOS by nature, mostly because of async and periodic source task offset commit, which in MirorrMaker case, the "offset" is consumer offset. So this is one of the blockers to enable EOS without a lots of changes in WokerSourceTask.
  • Since MirrorSinkTask (that extends SinkTask) is a new implementation, we can fully control how a producer is created (transactional v.s. non-transactional) and handle various Exception cases (especially in transactional mode), purely in the new implementation, rather than changing the existing producer in WorkerSourceTask
  • MirrorSinkTask can intentionally return an empty result from preCommit() (defined in SinkTask) by overriding it to disable the consumer offset commit done by WorkerSinkTask, so that the transactional producer is able to commit consumer offset within one transaction.

Public Interfaces

New classes and interfaces include:

MirrorMakerConfig

NameTypeDefaultDoc
connector.typestringsourceif "source", the existing MirrorSourceConnector will be launched. If "sink", the new MirrorSinkConnector will be launched with further option to enable EOS

MirrorConnectorConfig

NameTypeDefaultDoc
transaction.producerbooleanfalseif True, EOS is enabled between consumer and producer

Proposed Changes

MirrorSinkTask

There are several key challenges to make EOS happen across clusters. Those challenges are discussed here one-by-one:

(1) in MirrorMaker case, there are source and target clusters. The consumer pulls data and stores its offsets are typically stored in source cluster, while the producer sends to target cluster. However Kafka transaction can not happen across clusters out-of-the-box. What modifications need to be done?

A: The short answer is the consumer offsets, which are supposed to be stored on source cluster, are maintained / committed by transactional producer and stored on the target cluster instead.

But the consumer still has to live on the source cluster in order to pull the data and however the offsets are not stored in source cluster (or stored in source cluster, not not accurate). We propose to use the following idea to position the consumer correctly while its offsets are in target cluster: (the pseudocode are shown in below)

  • MirrorSinkTask don't rely on Connect's internal offsets tracking or __consumer_offsets on the source cluster.
  • the offsets are only written by transaction producer to the target cluster.
  • offsets are stored on the target cluster using a "fake" consumer group. The "fake" means there would be no actual records being consumed by the group, just offsets being stored in __consumer_offsets topic.
  • all records are written in a transaction.
  • when MirrorSourceTask starts, it loads initial offsets from __consumer_offsets on the target cluster.

The outcome of the above idea:

  • if the transaction succeeds, the __consumer_offsets topic on the target cluster is updated.
  • if the transaction aborts, all data records are dropped, and the __consumer_offsets topic is not updated.
  • when MirrorSinkTask starts/restarts, it resumes at the last committed offsets, as stored in the target cluster.

Some items to pay attention in order to make above idea work correctly:

  • If consumer group already exists on source cluster, while the "fake" consumer group (with same Group Id) on the target cluster does not exist or its offsets lower than the high watermark. We need to do a one-time job to sync the offsets from source cluster to target cluster.
  • Since the "fake" consumer group becomes "Single source of truth" and will be only maintained by MirrorSinkTask, some cases may need special handling, e.g. adding or deleting topics for existing set of topics to be replicated.

(2) The consumer in WorkerSinkTask periodically commits the offsets that are maintained by itself, how to disable this behavior so that transactional producer in MirrorSinkTask can control when and what to commit?

A: The SinkTask provides a preCommit() method let the implementation of SinkTask (here is MirrorSinkTask) to determine what offsets should be committed in WorkerSinkTask. If in transaction mode, we will intentionally return empty Map in preCommit() (see the code below), so that if the return of preCommit() is empty, the consumer of WorkerSinkTask will ignore committing the offsets that are maintained by itself.


The following is the pseudocode illustrates the high-level key implementation:

MirrorSinkTask
    private boolean isTransactional = config.getTransactionalProducer();
    private boolean transactionStart = false;
	protected Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>();
    private Set<TopicPartition> taskTopicPartitions;
	private KafkaProducer<byte[], byte[]> producer;
    private String consumerGroupId;

    @Override
	public void start(Map<String, String> props) {
		config = new MirrorTaskConfig(props);
	    taskTopicPartitions = config.taskTopicPartitions();
        isTransactional = config.transactionalProducer();
	    producer = initProducer(isTransactional);
        consumerGroupId = getSourceConsumerGroupId();
		if (isTransactional) {
            // in order to leverage "read-process-write" loop provided by Kafka EOS out-of-the-box
            // the restriction is that "topics have to been in one cluster"
            // in MM case, since the consumer pulls from source cluster and producer sends to target cluster,
            // to workaround this restriction, the consumer offsets are stored in target cluster with same "group Id"
            // (1) when MirrorSinkTask starts, it loads initial offsets from __consumer_offsets on the target cluster
            // (2) pass the initial offsets to SinkTaskContext, followed by supplying them to WorkerSinkTask
            // (3) the consumer in WorkerSinkTask rewinds and starts consuming from the initial offsets
			Map<TopicPartition, OffsetAndMetadata> offsetsOnTarget = listTargetConsumerGroupOffsets(consumerGroupId);
			Map<TopicPartition, Long> offsets = new HashMap<>();
			
			for (Map.Entry<TopicPartition, OffsetAndMetadata> offset : offsetsOnTarget.entrySet()) {
				offsets.put(offset.getKey(), offset.getValue().offset());
			}
			Map<TopicPartition, Long> taskOffsets = loadOffsets(taskTopicPartitions, offsets);
			context.offset(taskOffsets);
        }
    }

	protected KafkaProducer<byte[], byte[]> initProducer(boolean isTransactional) {
        Map<String, Object> producerConfig = config.targetProducerConfig();
        if (isTransactional) {
        	log.info("use transactional producer");
            producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
            producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        	producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, genTransactionId());
        	producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "trasactional-producer");
        } else {
        	producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "non-trasactional-producer");
        }
        return MirrorUtils.newProducer(producerConfig);
	}

    @Override
	public void put(Collection<SinkRecord> records) {
        log.info("receive {} messages from consumer", records.size());
        if (records.size() == 0) {
        	return;
        }
        try {
            sendBatch(records, producer);
        } catch (RebalanceException e) {
            producer.close();
            producer = initProducer(isTransactional);  
        } catch (ResendRecordsException e) {
            abortTransaction(producer);
            //TODO: add limited retry
            sendBatch(e.getRemainingRecords(), producer);
        } catch (Throwable e) {
            log.error(getHostName() + " terminating on exception: {}", e);
            return;
        }
    }

    private void sendBatch(Collection<SinkRecord> records, KafkaProducer<byte[], byte[]> producer) {
        try {
            Map<TopicPartition, List<SinkRecord> remainingRecordsMap = new HashMap<>();
            offsetsMap.clear();
            beginTransaction(producer);
            SinkRecord record;
            for ((record = records.peek()) != null) {
                ProducerRecord<byte[], byte[]> producerRecord = convertToProducerRecord(record);
                offsetsMap.compute(new TopicPartition(record.topic(), record.kafkaPartition()),
                       (tp, curOffsetMetadata) ->
                               (curOffsetMetadata == null || record.kafkaOffset() > curOffsetMetadata.offset())
                                       ?
                                       new OffsetAndMetadata(record.kafkaOffset())
                                       : curOffsetMetadata);
                Future<RecordMetadata> future = producer.send(producerRecord, (recordMetadata, e) -> {
                    if (e != null) {
                        log.error("{} failed to send record to {}: ", MirrorSinkTask.this, producerRecord.topic(), e);
                        log.debug("{} Failed record: {}", MirrorSinkTask.this, producerRecord);
                        throw new KafkaException(e);
                    } else {
                        log.info("{} Wrote record successfully: topic {} partition {} offset {}", //log.trace
                        		MirrorSinkTask.this,
                                recordMetadata.topic(), recordMetadata.partition(),
                                recordMetadata.offset());
                        commitRecord(record, recordMetadata);
                    }
                });
                futures.add(future);
                records.poll();
            }

        } catch (KafkaException e) {
            // Any unsent messages are added to the remaining remainingRecordsMap for re-send
            for (SinkRecord record = records.poll(); record != null; record = records.poll()) {
            	addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMap);
        } finally {  //TODO: may add more exception handling case
            for (Future<RecordMetadata> future : futures) {
                try {
                    future.get();
                } catch (Exception e) {
                    SinkRecord record = futureMap.get(future);
                    // Any record failed to send, add to the remainingRecordsMap
                    addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMap);
                }
            }
         }

         if (isTransactional && remainingRecordsMap.size() == 0) {
             producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);
             commitTransaction(producer);
         }

        if (remainingRecordsMap.size() > 0) {
            // For transaction case, all records should be put into remainingRecords, as the whole transaction should be redone
            Collection<SinkRecord> recordsToReSend;
            if (isTransactional) {
                // Transactional: retry all records, the transaction will have cancelled all of our outputs.
                recordsToReSend = records;
            } else {
                // Non-transactional: only retry failed records, others already were finished and sent.
                recordsToReSend = remainingRecordsMap;
            }
            throw new ResendRecordsException(recordsToReSend);
        }
    }

    @Override
    public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    	// if transactional, return empty Map intentionally to disable the offset commit by commitOffsets() in "WorkerSinkTask.java"
        // so that the transactional producer is able to commit the consumer offsets in a transaction
    	if (isTransactional) 
    		return new HashMap<TopicPartition, OffsetAndMetadata>();
    	else // otherwise, return offsetsMap to let commitOffsets() in "WorkerSinkTask.java" to commit offsets
    		return offsetsMap;	
    }

    // This commitRecord() follows the same logics as commitRecord() in MirrorSourceTask
    public void commitRecord(SinkRecord record, RecordMetadata metadata) {
        try {
            if (stopping) {
                return;
            }
            if (!metadata.hasOffset()) {
                log.error("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic());
                return;
            }
            TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition());
            long latency = System.currentTimeMillis() - record.timestamp();
            metrics.countRecord(topicPartition);
            metrics.replicationLatency(topicPartition, latency);
            TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition());
            long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
            long downstreamOffset = metadata.offset();
            maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset);
        } catch (Throwable e) {
            log.warn("Failure committing record.", e);
        }
    }

    private void beginTransaction(KafkaProducer<byte[], byte[]> producer) {
        if (isTransactional) {
            producer.beginTransaction();
            transactionStart = true;
        }
    }
	
    private void initTransactions(KafkaProducer<byte[], byte[]> producer) {
        if (isTransactional) {
            producer.initTransactions();
        }
    }
    
    private void commitTransaction(KafkaProducer<byte[], byte[]> producer) {
        if (isTransactional) {
            producer.commitTransaction();
            transactionStart = false;
        }
    }
    
    private void abortTransaction(KafkaProducer<byte[], byte[]> producer) {
        if (isTransactional && transactionStart) {
            producer.abortTransaction();
            transactionStart = false;
        }
    }

    public static class ResendRecordsException extends Exception {
        private Collection<SinkRecord> remainingRecords;

        public ResendRecordsException(Collection<SinkRecord> remainingRecords) {
            super(cause);
            this.remainingRecords = remainingRecords;
        }

        public Collection<SinkRecord> getRemainingRecords() {
            return remainingRecords;
        }
    }

Migration from MirrorSourceConnector to MirrorSourceConnector /w EOS

This is a simply high-level guidance without real-world practices and is subject to change. Also each migration case may be handled differently with different requirements.

By default, "connector.type" is set to "source", when the latest MirrorMaker2 is deployed, the current mirroring behavior should not be changed.

Next, if there are multiple instances of MirrorMaker2, consider to change "connector.type" to "sink" on one instance and deploy it. Once the config change looks stable, repeat for other instances. The message delivery semantics is still at-least-once, but all instances of MirrorMaker2 are now using MirrorSinkConnector.

Since "Transactional mode" or EOS will inevitably consume more resources and deliver lower throughput, it is always recommended to benchmark the impact and provision the enough capacity before switching to EOS.

If a short downtime is allowed, stopping all MirrorMaker2 instances, setting "transaction.producer" to "true", then starting them again. From now, MirrorMaker2 should mirror the data with EOS.

if expect "no downtime", the migration should be conducted more carefully and the operational experiences could refer to "how to migrate from non-transactional to transactional Kafka producer", which is out of scope of this KIP.

Deprecation

A config "connector.type" is proposed to choose which type of Connector (source or sink) to use in MirrorMaker2. So both MirrorSourceConnector and MIrrorSinkConnector will co-exist in the codebase in the near future.

In the long term, if MirrorSinkConnector covers all use cases of MirrorSourceConnector and the migration is proven seamless, then in the future release, deprecation of MirrorSource Connector could be considered.

Rejected Alternatives

 https://github.com/apache/kafka/pull/5553 Unable to render Jira issues macro, execution error.  and  Unable to render Jira issues macro, execution error.  are relevant efforts in a bigger scope, but it seems none of them proceeded successfully for a quite amount of time.



  • No labels