Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
titleMirrorSinkTask
    private boolean isTransactional = config.getTransactionalProducer();
    private boolean transactionStart = false;
	protected Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>();

	protected KafkaProducer<byte[], byte[]> initProducer(boolean isTransactional) {
        Map<String, Object> producerConfig = config.targetProducerConfig();
        if (isTransactional) {
        	log.info("use transactional producer");
            producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
            producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        	producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, genTransactionId());
        	producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "trasactional-producer");
        } else {
        	producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "non-trasactional-producer");
        }
        return MirrorUtils.newProducer(producerConfig);
	}

    @Override
	public void put(Collection<SinkRecord> records) {
        log.info("receive {} messages from consumer", records.size());
        if (records.size() == 0) {
        	return;
        }
        try {
            sendBatch(records, producer);
        } catch (RebalanceException e) {
            producer.close();
            producer = initProducer(isTransactional);  
        } catch (ResendRecordsException e) {
            abortTransaction(producer);
            //TODO: add limited retry
            sendBatch(e.getRemainingRecords(), producer);
        } catch (Throwable e) {
            log.error(getHostName() + " terminating on exception: {}", e);
            return;
        }
    }

    private void sendBatch(Collection<SinkRecord> records, KafkaProducer<byte[], byte[]> producer) {
        try {
            Map<TopicPartition, List<SinkRecord> remainingRecordsMap = new HashMap<>();
            offsetsMap.clear();
            beginTransaction(producer);
            SinkRecord record;
            for ((record = records.peek()) != null) {
                ProducerRecord<byte[], byte[]> producerRecord = convertToProducerRecord(record);
                offsetsMap.compute(new TopicPartition(record.topic(), record.kafkaPartition()),
                       (tp, curOffsetMetadata) ->
                               (curOffsetMetadata == null || record.kafkaOffset() > curOffsetMetadata.offset())
                                       ?
                                       new OffsetAndMetadata(record.kafkaOffset())
                                       : curOffsetMetadata);
                Future<RecordMetadata> future = producer.send(producerRecord, (recordMetadata, e) -> {
                    if (e != null) {
                        log.error("{} failed to send record to {}: ", MirrorSinkTask.this, producerRecord.topic(), e);
                        log.debug("{} Failed record: {}", MirrorSinkTask.this, producerRecord);
                        throw new KafkaException(e);
                    } else {
                        log.info("{} Wrote record successfully: topic {} partition {} offset {}", //log.trace
                        		MirrorSinkTask.this,
                                recordMetadata.topic(), recordMetadata.partition(),
                                recordMetadata.offset());
                        commitRecord(record, recordMetadata);
                    }
                });
                futures.add(future);
                records.poll();
            }

        } catch (KafkaException e) {
            // Any unsent messages are added to the remaining remainingRecordsMap for re-send
            for (SinkRecord record = records.poll(); record != null; record = records.poll()) {
            	addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMap);
        } finally {  //TODO: may add more exception handling case
            for (Future<RecordMetadata> future : futures) {
                try {
                    future.get();
                } catch (Exception e) {
                    SinkRecord record = futureMap.get(future);
                    // Any record failed to send, add to the remainingRecordsMap
                    addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMap);
                }
            }
         }

         if (isTransactional && remainingRecordsMap.size() == 0) {
             producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);
             commitTransaction(producer);
         }

        if (remainingRecordsMap.size() > 0) {
            // For transaction case, all records should be put into remainingRecords, as the whole transaction should be redone
            Collection<SinkRecord> recordsToSendrecordsToReSend;
            if (isTransactional) {
                // Transactional: retry all records, the transaction will have cancelled all of our outputs.
                recordsToSendrecordsToReSend = records;
            } else {
                // Non-transactional: only retry failed records, others already were finished and sent.
                recordsToSendrecordsToReSend = remainingRecordsMap;
            }
            throw new ResendRecordsException(recordsToThrowrecordsToReSend);
        }
    }

    @Override
    public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    	// if transactional, return empty Map intentionally to disable the offset commit by commitOffsets() in "WorkerSinkTask.java"
        // so that the transactional producer is able to commit the consumer offsets in a transaction
    	if (isTransactional) 
    		return new HashMap<TopicPartition, OffsetAndMetadata>();
    	else // otherwise, return offsetsMap to let commitOffsets() in "WorkerSinkTask.java" to commit offsets
    		return offsetsMap;	
    }

    // This commitRecord() follows the same logics as commitRecord() in MirrorSourceTask
    public void commitRecord(SinkRecord record, RecordMetadata metadata) {
        try {
            if (stopping) {
                return;
            }
            if (!metadata.hasOffset()) {
                log.error("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic());
                return;
            }
            TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition());
            long latency = System.currentTimeMillis() - record.timestamp();
            metrics.countRecord(topicPartition);
            metrics.replicationLatency(topicPartition, latency);
            TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition());
            long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
            long downstreamOffset = metadata.offset();
            maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset);
        } catch (Throwable e) {
            log.warn("Failure committing record.", e);
        }
    }

    private void beginTransaction(KafkaProducer<byte[], byte[]> producer) {
        if (isTransactional) {
            producer.beginTransaction();
            transactionStart = true;
        }
    }
	
    private void initTransactions(KafkaProducer<byte[], byte[]> producer) {
        if (isTransactional) {
            producer.initTransactions();
        }
    }
    
    private void commitTransaction(KafkaProducer<byte[], byte[]> producer) {
        if (isTransactional) {
            producer.commitTransaction();
            transactionStart = false;
        }
    }
    
    private void abortTransaction(KafkaProducer<byte[], byte[]> producer) {
        if (isTransactional && transactionStart) {
            producer.abortTransaction();
            transactionStart = false;
        }
    }

    public static class ResendRecordsException extends Exception {
        private Collection<SinkRecord> remainingRecords;

        public ResendRecordsException(Collection<SinkRecord> remainingRecords) {
            super(cause);
            this.remainingRecords = remainingRecords;
        }

        public Collection<SinkRecord> getRemainingRecords() {
            return remainingRecords;
        }
    }

Rejected Alternatives

 https://github.com/apache/kafka/pull/5553

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-6080
 and 
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-3821
 are relevant efforts in a bigger scope, but it seems none of them proceeded successfully for a quite amount of time.

...