THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
- as mentioned above, Kafka Connect Source Connector / Task do not provide EOS by nature, mostly because of async and periodic source task offset commit, which in MirorrMaker case, the "offset" is consumer offset. So this seems is one of the blockers to enable EOS without a lots of changes in WokerSourceTask.
- Since MirrorSinkTask (that extends SinkTask) is a new implementation, we can fully control how a producer is created (transactional v.s. non-transactional) and handle various Exception cases (especially in transactional mode), purely in the new implementation., rather than changing the existing producer in WorkerSourceTask
- MirrorSinkTask can intentionally return an empty result from preCommit() (defined in SinkTask) by overriding it to disable the consumer offset commit done by WorkerSinkTask, so that the transactional producer is able to commit consumer offset within one transaction.
...
Code Block | ||
---|---|---|
| ||
private boolean isTransactional = config.getTransactionalProducer(); private boolean transactionStart = false; protected Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>(); private Set<TopicPartition> taskTopicPartitions; private KafkaProducer<byte[], byte[]> producer; private String consumerGroupId; @Override public void start(Map<String, String> props) { config = new MirrorTaskConfig(props); taskTopicPartitions = config.taskTopicPartitions(); isTransactional = config.transactionalProducer(); producer = initProducer(isTransactional); consumerGroupId = getSourceConsumerGroupId(); if (isTransactional) { // in order to leverage "read-process-write" loop provided by Kafka EOS out-of-the-box // the restriction is that "topics have to been in one cluster" // in MM case, since the consumer pulls from source cluster and producer sends to target cluster, // to workaround this restriction, the consumer offsets are stored in target cluster with same "group Id" // (1) when MirrorSinkTask starts, it loads initial offsets from __consumer_offsets on the target cluster // (2) pass the initial offsets to SinkTaskContext, followed by supplying them to WorkerSinkTask // (3) the consumer in WorkerSinkTask rewinds and starts consuming from the initial offsets Map<TopicPartition, OffsetAndMetadata> offsetsOnTarget = listTargetConsumerGroupOffsets(consumerGroupId); Map<TopicPartition, Long> offsets = new HashMap<>(); for (Map.Entry<TopicPartition, OffsetAndMetadata> offset : offsetsOnTarget.entrySet()) { offsets.put(offset.getKey(), offset.getValue().offset()); } Map<TopicPartition, Long> taskOffsets = loadOffsets(taskTopicPartitions, offsets); context.offset(taskOffsets); } } protected KafkaProducer<byte[], byte[]> initProducer(boolean isTransactional) { Map<String, Object> producerConfig = config.targetProducerConfig(); if (isTransactional) { log.info("use transactional producer"); producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, genTransactionId()); producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "trasactional-producer"); } else { producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "non-trasactional-producer"); } return MirrorUtils.newProducer(producerConfig); } @Override public void put(Collection<SinkRecord> records) { log.info("receive {} messages from consumer", records.size()); if (records.size() == 0) { return; } try { sendBatch(records, producer); } catch (RebalanceException e) { producer.close(); producer = initProducer(isTransactional); } catch (ResendRecordsException e) { abortTransaction(producer); //TODO: add limited retry sendBatch(e.getRemainingRecords(), producer); } catch (Throwable e) { log.error(getHostName() + " terminating on exception: {}", e); return; } } private void sendBatch(Collection<SinkRecord> records, KafkaProducer<byte[], byte[]> producer) { try { Map<TopicPartition, List<SinkRecord> remainingRecordsMap = new HashMap<>(); offsetsMap.clear(); beginTransaction(producer); SinkRecord record; for ((record = records.peek()) != null) { ProducerRecord<byte[], byte[]> producerRecord = convertToProducerRecord(record); offsetsMap.compute(new TopicPartition(record.topic(), record.kafkaPartition()), (tp, curOffsetMetadata) -> (curOffsetMetadata == null || record.kafkaOffset() > curOffsetMetadata.offset()) ? new OffsetAndMetadata(record.kafkaOffset()) : curOffsetMetadata); Future<RecordMetadata> future = producer.send(producerRecord, (recordMetadata, e) -> { if (e != null) { log.error("{} failed to send record to {}: ", MirrorSinkTask.this, producerRecord.topic(), e); log.debug("{} Failed record: {}", MirrorSinkTask.this, producerRecord); throw new KafkaException(e); } else { log.info("{} Wrote record successfully: topic {} partition {} offset {}", //log.trace MirrorSinkTask.this, recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); commitRecord(record, recordMetadata); } }); futures.add(future); records.poll(); } } catch (KafkaException e) { // Any unsent messages are added to the remaining remainingRecordsMap for re-send for (SinkRecord record = records.poll(); record != null; record = records.poll()) { addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMap); } finally { //TODO: may add more exception handling case for (Future<RecordMetadata> future : futures) { try { future.get(); } catch (Exception e) { SinkRecord record = futureMap.get(future); // Any record failed to send, add to the remainingRecordsMap addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMap); } } } if (isTransactional && remainingRecordsMap.size() == 0) { producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId); commitTransaction(producer); } if (remainingRecordsMap.size() > 0) { // For transaction case, all records should be put into remainingRecords, as the whole transaction should be redone Collection<SinkRecord> recordsToReSend; if (isTransactional) { // Transactional: retry all records, the transaction will have cancelled all of our outputs. recordsToReSend = records; } else { // Non-transactional: only retry failed records, others already were finished and sent. recordsToReSend = remainingRecordsMap; } throw new ResendRecordsException(recordsToReSend); } } @Override public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> offsets) { // if transactional, return empty Map intentionally to disable the offset commit by commitOffsets() in "WorkerSinkTask.java" // so that the transactional producer is able to commit the consumer offsets in a transaction if (isTransactional) return new HashMap<TopicPartition, OffsetAndMetadata>(); else // otherwise, return offsetsMap to let commitOffsets() in "WorkerSinkTask.java" to commit offsets return offsetsMap; } // This commitRecord() follows the same logics as commitRecord() in MirrorSourceTask public void commitRecord(SinkRecord record, RecordMetadata metadata) { try { if (stopping) { return; } if (!metadata.hasOffset()) { log.error("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic()); return; } TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition()); long latency = System.currentTimeMillis() - record.timestamp(); metrics.countRecord(topicPartition); metrics.replicationLatency(topicPartition, latency); TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition()); long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset()); long downstreamOffset = metadata.offset(); maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset); } catch (Throwable e) { log.warn("Failure committing record.", e); } } private void beginTransaction(KafkaProducer<byte[], byte[]> producer) { if (isTransactional) { producer.beginTransaction(); transactionStart = true; } } private void initTransactions(KafkaProducer<byte[], byte[]> producer) { if (isTransactional) { producer.initTransactions(); } } private void commitTransaction(KafkaProducer<byte[], byte[]> producer) { if (isTransactional) { producer.commitTransaction(); transactionStart = false; } } private void abortTransaction(KafkaProducer<byte[], byte[]> producer) { if (isTransactional && transactionStart) { producer.abortTransaction(); transactionStart = false; } } public static class ResendRecordsException extends Exception { private Collection<SinkRecord> remainingRecords; public ResendRecordsException(Collection<SinkRecord> remainingRecords) { super(cause); this.remainingRecords = remainingRecords; } public Collection<SinkRecord> getRemainingRecords() { return remainingRecords; } } |
...