...
Current state: Draft
Discussion thread:
JIRA: Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-10339
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block | ||
---|---|---|
| ||
private boolean isTransactional = config.getTransactionalProducer(); private boolean transactionStarttransactionInProgress = false; protected Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>(); private Set<TopicPartition> taskTopicPartitions; private KafkaProducer<byte[], byte[]> producer; private String consumerGroupIdconnectConsumerGroup; @Override public void start(Map<String, String> props) { config = new MirrorTaskConfig(props); taskTopicPartitions = config.taskTopicPartitions(); isTransactional = config.transactionalProducer(); producer = initProducer(isTransactional); consumerGroupIdconnectConsumerGroup = getSourceConsumerGroupId(); if (isTransactional) { Map<TopicPartition, OffsetAndMetadata> offsetsOnTargetinitOffsetsOnTarget = listTargetConsumerGroupOffsets(consumerGroupIdconnectConsumerGroup); Map<TopicPartition, Long> offsets = new HashMap<>(); for (Map.Entry<TopicPartition, OffsetAndMetadata> offset : offsetsOnTarget.entrySet()) { offsets.put(offset.getKey(), offset.getValue().offset()); } Map<TopicPartition, Long> taskOffsets = loadOffsets(taskTopicPartitions, offsets); context.offset(taskOffsets); // To rewind consumer to correct offset per <topic, partition> assigned to this task, there are more than one case: } } protected KafkaProducer<byte[], byte[]> initProducer(boolean isTransactional) { Map<String, Object> producerConfig = config.targetProducerConfig(); // (1) offset exists on target, use target offset as the starting point of consumption if// (isTransactional2) { offset DOES NOT exist on target, presume log.info("use transactional producer");a new topic, so set offset to 0 producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); Map<TopicPartition, Long> taskOffsets = taskTopicPartitions.stream().collect(Collectors.toMap( producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); x -> x, x -> producerConfiginitOffsetsOnTarget.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, genTransactionId());containsKey(x) producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "trasactional-producer"); ? } else { producerConfiginitOffsetsOnTarget.putget(ProducerConfig.CLIENT_ID_CONFIG, "non-trasactional-producer"); x).offset() + 1 } return MirrorUtils.newProducer(producerConfig); } : 0L)); context.offset(taskOffsets); } @Override} public void put(Collection<SinkRecord> recordsprotected KafkaProducer<byte[], byte[]> initProducer(boolean isTransactional) { log.info("receive {} messages from consumer", records.size())Map<String, Object> producerConfig = config.targetProducerConfig(); if (records.size() == 0isTransactional) { returnlog.info("use transactional producer"); } try {producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); sendBatch(records, producerproducerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); } catch (RebalanceException e) { producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, genTransactionId()); producer.close( producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "trasactional-producer"); } else { producer = initProducer(isTransactional); producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "non-trasactional-producer"); } catch (ResendRecordsException e) { abortTransaction(producer);return MirrorUtils.newProducer(producerConfig); } @Override public void put(Collection<SinkRecord> records) { //TODO: add limited retry sendBatch(e.getRemainingRecords(), producer log.info("receive {} messages from consumer", records.size()); } catchif (Throwable erecords.size() == 0) { return; log.error(getHostName() + " terminating on exception: {}", e); } try { returnsendBatch(records, producer); } catch (RebalanceException e) }{ private void sendBatch(Collection<SinkRecord> records, KafkaProducer<byte[], byte[]> producer.close() {; try { producer = initProducer(isTransactional); Map<TopicPartition, List<SinkRecord> remainingRecordsMap =} newcatch HashMap<>(ResendRecordsException e); { offsetsMap.clearabortTransaction(producer); beginTransaction(producer);//TODO: add limited retry SinkRecord recordsendBatch(e.getRemainingRecords(), producer); } catch (Throwable e) for ((record = records.peek()) != null) {{ log.error(getHostName() + " terminating on exception: {}", e); return; ProducerRecord<byte[], byte[]> producerRecord = convertToProducerRecord(record);} } private void sendBatch(Collection<SinkRecord> records, KafkaProducer<byte[], byte[]> producer) { offsetsMap.compute(new TopicPartition(record.topic(), record.kafkaPartition()), try { Map<TopicPartition, List<SinkRecord> remainingRecordsMap = new HashMap<>(tp, curOffsetMetadata) ->); offsetsMap.clear(); beginTransaction(producer); (curOffsetMetadata == null || record.kafkaOffset() > curOffsetMetadata.offset())SinkRecord record; for ((record = records.peek()) != null) { ProducerRecord<byte[], byte[]> producerRecord ?= convertToProducerRecord(record); offsetsMap.compute(new TopicPartition(record.topic(), record.kafkaPartition()), new OffsetAndMetadata(record.kafkaOffset())(tp, curOffsetMetadata) -> (curOffsetMetadata == null || record.kafkaOffset() > : curOffsetMetadata.offset();) Future<RecordMetadata> future = producer.send(producerRecord, (recordMetadata, e) -> { ? if (e != null) { log.error("{} failed to send record to {}: ", MirrorSinkTask.this, producerRecord.topic(), e); new OffsetAndMetadata(record.kafkaOffset()) log.debug("{} Failed record: {}", MirrorSinkTask.this, producerRecord); : curOffsetMetadata); Future<RecordMetadata> future throw new KafkaException(e);= producer.send(producerRecord, (recordMetadata, e) -> { } elseif (e != null) { log.infoerror("{} Wrotefailed to send record successfully: topicto {}: partition {} offset {}", //log.trace", MirrorSinkTask.this, producerRecord.topic(), e); log.debug("{} Failed record: {}", MirrorSinkTask.this, producerRecord); throw new KafkaException(e); recordMetadata.topic(), recordMetadata.partition(), } else { recordMetadata.offset()); log.info("{} Wrote record successfully: topic {} partition {} offset {}", //log.trace commitRecord(record, recordMetadata); MirrorSinkTask.this, } }); futures.add(future); recordMetadata.topic(), recordMetadata.partition(), records.poll recordMetadata.offset()); } } catch commitRecord(KafkaExceptionrecord, erecordMetadata) {; // Any unsent messages are added to the remaining remainingRecordsMap for re-send } for (SinkRecord record = records.poll(}); record != null; record = records.poll()) { futures.add(future); addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMap); } finally { //TODO: may add more exception handling case records.poll(); } for (Future<RecordMetadata>} futurecatch :(KafkaException futurese) { // Any unsent messages tryare { added to the remaining remainingRecordsMap for re-send for future.get(SinkRecord record = records.poll(); record != null; record = records.poll()) { } catch addConsumerRecordToTopicPartitionRecordsMap(Exceptionrecord, eremainingRecordsMap) {; } finally { //TODO: may add more exception handling case SinkRecord record = futureMap.get(future); for (Future<RecordMetadata> future : futures) { // Any record failed to send, add to thetry remainingRecordsMap{ addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMapfuture.get(); } catch (Exception e) { } } SinkRecord record if (isTransactional && remainingRecordsMap.size() == 0) { = futureMap.get(future); producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId); // Any record failed to send, add to the commitTransaction(producer); remainingRecordsMap } if addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMap.size(); > 0) { // For} transaction case, all records should be put into remainingRecords, as the whole} transaction should be redone } Collection<SinkRecord> recordsToReSend; if (isTransactional && remainingRecordsMap.size() == 0) { if (isTransactional) { producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId); // transactional: retry all records, as the transaction will cancel all successful and failed records. commitTransaction(producer); } if (remainingRecordsMap.size() > 0) { recordsToReSend = records; // For transaction case, all records }should elsebe { put into remainingRecords, as the whole transaction should be redone // non-transactional: only retry failed records, others were finished and sent.Collection<SinkRecord> recordsToReSend; if (isTransactional) { recordsToReSend = remainingRecordsMap; } // transactional: retry all records, as the transaction will cancel all successful throwand new ResendRecordsException(recordsToReSend);failed records. } } @Override recordsToReSend = records; public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata>} offsets)else { // if transactional, return empty Map intentionally to disable the offset commit by commitOffsets() in WorkerSinkTask.java // so that the transactional producer is able to commit the consumer offsets to target cluster in a transaction at its pace if (isTransactional) return new HashMap<TopicPartition, OffsetAndMetadata>(); else // otherwise, return offsetsMap to let commitOffsets() in "WorkerSinkTask.java" to commit offsets return offsetsMap; // non-transactional: only retry failed records, others were finished and sent. recordsToReSend = remainingRecordsMap; } throw new ResendRecordsException(recordsToReSend); } } //@Override This commitRecord() follows thepublic sameMap<TopicPartition, logicsOffsetAndMetadata> as commitRecord() in MirrorSourceTask, to preCommit(Map<TopicPartition, OffsetAndMetadata> offsets) { public // void commitRecord(SinkRecord record, RecordMetadata metadata) { try {if transactional, return empty Map intentionally to disable the offset commit by commitOffsets() in WorkerSinkTask.java // so that the iftransactional (stopping) { return; }producer is able to commit the consumer offsets to target cluster in a transaction at its pace if (isTransactional) return new HashMap<TopicPartition, OffsetAndMetadata>(); else // otherwise, return offsetsMap to if (!metadata.hasOffsetlet commitOffsets()) { in "WorkerSinkTask.java" to commit offsets return offsetsMap; } log.error("RecordMetadata has no// offset -- can't sync offsets for {}.", record.topic()); This commitRecord() follows the same logics as commitRecord() in MirrorSourceTask, to public void commitRecord(SinkRecord record, RecordMetadata metadata) { return; try { } if (stopping) { TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition()); long latency = System.currentTimeMillis() - record.timestamp()return; metrics.countRecord(topicPartition);} metrics.replicationLatency(topicPartition, latency);if (!metadata.hasOffset()) { TopicPartition sourceTopicPartition = MirrorUtilslog.unwrapPartition(record.sourcePartition()); long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffseterror("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic()); long downstreamOffset = metadata.offset()return; maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset);} } catch (Throwable e) { TopicPartition topicPartition = log.warn("Failure committing record.", enew TopicPartition(record.topic(), record.kafkaPartition()); } } long latency = System.currentTimeMillis() private void beginTransaction(KafkaProducer<byte[], byte[]> producer) { - record.timestamp(); if metrics.countRecord(isTransactionaltopicPartition) {; producermetrics.beginTransactionreplicationLatency(topicPartition, latency); transactionStartTopicPartition sourceTopicPartition = trueMirrorUtils.unwrapPartition(record.sourcePartition()); } } long upstreamOffset = private void initTransactions(KafkaProducer<byte[], byte[]> producer) { MirrorUtils.unwrapOffset(record.sourceOffset()); long downstreamOffset if= metadata.offset(isTransactional) {; producer.initTransactions(maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset); } catch (Throwable e) { log.warn("Failure committing record.", e); } } private void commitTransactionbeginTransaction(KafkaProducer<byte[], byte[]> producer) { if (isTransactional) { producer.commitTransactionbeginTransaction(); transactionStarttransactionInProgress = falsetrue; } } private void abortTransactioninitTransactions(KafkaProducer<byte[], byte[]> producer) { if (isTransactional && transactionStart) { producer.abortTransactioninitTransactions(); } transactionStart = false;} } private void commitTransaction(KafkaProducer<byte[], } public static class ResendRecordsException extends Exception byte[]> producer) { private Collection<SinkRecord> remainingRecords; public ResendRecordsException(Collection<SinkRecord> remainingRecordsif (isTransactional) { superproducer.commitTransaction(cause); this.remainingRecordstransactionInProgress = remainingRecordsfalse; } } public Collection<SinkRecord> getRemainingRecords() private void abortTransaction(KafkaProducer<byte[], byte[]> producer) { if (isTransactional && transactionInProgress) return remainingRecords{ producer.abortTransaction(); transactionInProgress = false; } } |
MirrorSinkConnector
public static class ResendRecordsException extends Exception {
private Collection<SinkRecord> remainingRecords;
public ResendRecordsException(Collection<SinkRecord> remainingRecords) {
super(cause);
this.remainingRecords = remainingRecords;
}
public Collection<SinkRecord> getRemainingRecords() {
return remainingRecords;
}
} |
MirrorSinkConnector
As SinkTask can As SinkTask can only be created by SinkConnector, MirrorSinkConnector will be implemented and follow the follow the most same logics as current MirrorSourceConnector. To minimize the duplicate code, a new class, e.g. "MirrorCommonConnector", will be proposed to host the common code used by both MirrorSourceConnector and MirrorSinkConnectornew class, e.g. "MirrorCommonConnector", may be proposed to host the common code as a separate code change merged before this KIP.
WorkerSinkTask
Due to the fact that group offsets from target cluster is single-source-of-truth, each time when MirrorSInkTask restarts, it loads group offsets from target cluster and supply them to the consumer in WorkerSinkTask so that the consumer will rewind to the last committed offsets without duplicating or missing data. However in our test, we ran into
Jira | ||||||
---|---|---|---|---|---|---|
|
Migration from MirrorSourceConnector to MirrorSinkConnector /w EOS
...