...
But the consumer still has to live on the source cluster in order to pull the data and however the offsets are not stored in source cluster (or stored in source cluster, not not accurate). We propose to use the following idea to position the consumer correctly while its offsets are in target cluster: (the pseudocode are shown in below)
- MirrorSinkTask don't rely on Connect's internal offsets tracking or __consumer_offsets on the source cluster.
- the offsets are only written by transaction producer to the target cluster.
- offsets are stored on the target cluster using a "fake" consumer group. The "fake" means there would be no actual records being consumed by the group, just offsets being stored in __consumer_offsets topic.
- all records are written in a transaction.
- when MirrorSourceTask starts, it loads initial offsets from __consumer_offsets on the target cluster.
The outcome of the above idea:
- if the transaction succeeds, the __consumer_offsets topic on the target cluster is updated.
- if the transaction aborts, all data records are dropped, and the __consumer_offsets topic is not updated.
- when MirrorSinkTask starts/restarts, it resumes at the last committed offsets, as stored in the target cluster.
Some items to pay attention in order to make above idea work correctly:
- If consumer group already exists on source cluster, while the "fake" consumer group (with same Group Id) on the target cluster does not exist or its offsets lower than the high watermark. We need to do a one-time job to sync the offsets from source cluster to target cluster.
- Since the "fake" consumer group becomes "Single source of truth" and will be only maintained by MirrorSinkTask, some cases may need special handling, e.g. adding or deleting topics for existing set of topics to be replicated.
...
Code Block | ||
---|---|---|
| ||
private boolean isTransactional = config.getTransactionalProducer(); private boolean transactionStart = false; protected Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>(); private Set<TopicPartition> taskTopicPartitions; private KafkaProducer<byte[], byte[]> producer; private String consumerGroupId; @Override public void start(Map<String, String> props) { config = new MirrorTaskConfig(props); taskTopicPartitions = config.taskTopicPartitions(); isTransactional = config.transactionalProducer(); producer = initProducer(isTransactional); consumerGroupId = getSourceConsumerGroupId(); if (isTransactional) { // in order to leverage "read-process-write" loop provided by Kafka EOS out-of-the-box // the restriction is that "topics have to been in one cluster" // in MM case, since the consumer pulls from source cluster and producer sends to target cluster, // to workaround this restriction, the consumer offsets are stored in target cluster with same "group Id" // (1) when MirrorSinkTask starts, it loads initial offsets from __consumer_offsets on the target cluster // (2) pass the initial offsets to SinkTaskContext, followed by supplying them to WorkerSinkTask // (3) the consumer in WorkerSinkTask rewinds and starts consuming from the initial offsets Map<TopicPartition, OffsetAndMetadata> offsetsOnTarget = listTargetConsumerGroupOffsets(consumerGroupId); Map<TopicPartition, Long> offsets = new HashMap<>(); for (Map.Entry<TopicPartition, OffsetAndMetadata> offset : offsetsOnTarget.entrySet()) { offsets.put(offset.getKey(), offset.getValue().offset()); } Map<TopicPartition, Long> taskOffsets = loadOffsets(taskTopicPartitions, offsets); context.offset(taskOffsets); } } protected KafkaProducer<byte[], byte[]> initProducer(boolean isTransactional) { Map<String, Object> producerConfig = config.targetProducerConfig(); if (isTransactional) { log.info("use transactional producer"); producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, genTransactionId()); producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "trasactional-producer"); } else { producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "non-trasactional-producer"); } return MirrorUtils.newProducer(producerConfig); } @Override public void put(Collection<SinkRecord> records) { log.info("receive {} messages from consumer", records.size()); if (records.size() == 0) { return; } try { sendBatch(records, producer); } catch (RebalanceException e) { producer.close(); producer = initProducer(isTransactional); } catch (ResendRecordsException e) { abortTransaction(producer); //TODO: add limited retry sendBatch(e.getRemainingRecords(), producer); } catch (Throwable e) { log.error(getHostName() + " terminating on exception: {}", e); return; } } private void sendBatch(Collection<SinkRecord> records, KafkaProducer<byte[], byte[]> producer) { try { Map<TopicPartition, List<SinkRecord> remainingRecordsMap = new HashMap<>(); offsetsMap.clear(); beginTransaction(producer); SinkRecord record; for ((record = records.peek()) != null) { ProducerRecord<byte[], byte[]> producerRecord = convertToProducerRecord(record); offsetsMap.compute(new TopicPartition(record.topic(), record.kafkaPartition()), (tp, curOffsetMetadata) -> (curOffsetMetadata == null || record.kafkaOffset() > curOffsetMetadata.offset()) ? new OffsetAndMetadata(record.kafkaOffset()) : curOffsetMetadata); Future<RecordMetadata> future = producer.send(producerRecord, (recordMetadata, e) -> { if (e != null) { log.error("{} failed to send record to {}: ", MirrorSinkTask.this, producerRecord.topic(), e); log.debug("{} Failed record: {}", MirrorSinkTask.this, producerRecord); throw new KafkaException(e); } else { log.info("{} Wrote record successfully: topic {} partition {} offset {}", //log.trace MirrorSinkTask.this, recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); commitRecord(record, recordMetadata); } }); futures.add(future); records.poll(); } } catch (KafkaException e) { // Any unsent messages are added to the remaining remainingRecordsMap for re-send for (SinkRecord record = records.poll(); record != null; record = records.poll()) { addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMap); } finally { //TODO: may add more exception handling case for (Future<RecordMetadata> future : futures) { try { future.get(); } catch (Exception e) { SinkRecord record = futureMap.get(future); // Any record failed to send, add to the remainingRecordsMap addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMap); } } } if (isTransactional && remainingRecordsMap.size() == 0) { producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId); commitTransaction(producer); } if (remainingRecordsMap.size() > 0) { // For transaction case, all records should be put into remainingRecords, as the whole transaction should be redone Collection<SinkRecord> recordsToReSend; if (isTransactional) { // Transactionaltransactional: retry all records, as the transaction will havecancel cancelledall allsuccessful ofand ourfailed outputsrecords. recordsToReSend = records; } else { // Nonnon-transactional: only retry failed records, others already were finished and sent. recordsToReSend = remainingRecordsMap; } throw new ResendRecordsException(recordsToReSend); } } @Override public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> offsets) { // if transactional, return empty Map intentionally to disable the offset commit by commitOffsets() in "WorkerSinkTask.java" // so that the transactional producer is able to commit the consumer offsets to target cluster in a transaction at its pace if (isTransactional) return new HashMap<TopicPartition, OffsetAndMetadata>(); else // otherwise, return offsetsMap to let commitOffsets() in "WorkerSinkTask.java" to commit offsets return offsetsMap; } // This commitRecord() follows the same logics as commitRecord() in MirrorSourceTask, to public void commitRecord(SinkRecord record, RecordMetadata metadata) { try { if (stopping) { return; } if (!metadata.hasOffset()) { log.error("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic()); return; } TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition()); long latency = System.currentTimeMillis() - record.timestamp(); metrics.countRecord(topicPartition); metrics.replicationLatency(topicPartition, latency); TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition()); long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset()); long downstreamOffset = metadata.offset(); maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset); } catch (Throwable e) { log.warn("Failure committing record.", e); } } private void beginTransaction(KafkaProducer<byte[], byte[]> producer) { if (isTransactional) { producer.beginTransaction(); transactionStart = true; } } private void initTransactions(KafkaProducer<byte[], byte[]> producer) { if (isTransactional) { producer.initTransactions(); } } private void commitTransaction(KafkaProducer<byte[], byte[]> producer) { if (isTransactional) { producer.commitTransaction(); transactionStart = false; } } private void abortTransaction(KafkaProducer<byte[], byte[]> producer) { if (isTransactional && transactionStart) { producer.abortTransaction(); transactionStart = false; } } public static class ResendRecordsException extends Exception { private Collection<SinkRecord> remainingRecords; public ResendRecordsException(Collection<SinkRecord> remainingRecords) { super(cause); this.remainingRecords = remainingRecords; } public Collection<SinkRecord> getRemainingRecords() { return remainingRecords; } } |
MirrorSinkConnector
As SinkTask can only be created by SinkConnector, MirrorSinkConnector will be implemented and follow the current MirrorSourceConnector. To minimize the duplicate code, a new class, e.g. "MirrorCommonConnector", will be proposed to host the common code used by both MirrorSourceConnector and MirrorSinkConnector.
Migration from MirrorSourceConnector to
...
MirrorSinkConnector /w EOS
This is a simply high-level guidance without real-world practices and is subject to change. Also each migration case may be handled differently with different requirements.
...