private boolean isTransactional = config.getTransactionalProducer();
private boolean transactionStart = false;
protected Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>();
protected KafkaProducer<byte[], byte[]> initProducer(boolean isTransactional) {
Map<String, Object> producerConfig = config.targetProducerConfig();
if (isTransactional) {
log.info("use transactional producer");
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, genTransactionId());
producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "trasactional-producer");
} else {
producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "non-trasactional-producer");
}
return MirrorUtils.newProducer(producerConfig);
}
@Override
public void put(Collection<SinkRecord> records) {
log.info("receive {} messages from consumer", records.size());
if (records.size() == 0) {
return;
}
try {
sendBatch(records, producer);
} catch (RebalanceException e) {
producer.close();
producer = initProducer(isTransactional);
} catch (ResendRecordsException e) {
abortTransaction(producer);
//TODO: add limited retry
sendBatch(e.getRemainingRecords(), producer);
} catch (Throwable e) {
log.error(getHostName() + " terminating on exception: {}", e);
return;
}
}
private void sendBatch(Collection<SinkRecord> records, KafkaProducer<byte[], byte[]> producer) {
try {
Map<TopicPartition, List<SinkRecord> remainingRecordsMap = new HashMap<>();
offsetsMap.clear();
beginTransaction(producer);
SinkRecord record;
for ((record = records.peek()) != null) {
ProducerRecord<byte[], byte[]> producerRecord = convertToProducerRecord(record);
offsetsMap.compute(new TopicPartition(record.topic(), record.kafkaPartition()),
(tp, curOffsetMetadata) ->
(curOffsetMetadata == null || record.kafkaOffset() > curOffsetMetadata.offset())
?
new OffsetAndMetadata(record.kafkaOffset())
: curOffsetMetadata);
Future<RecordMetadata> future = producer.send(producerRecord, (recordMetadata, e) -> {
if (e != null) {
log.error("{} failed to send record to {}: ", MirrorSinkTask.this, producerRecord.topic(), e);
log.debug("{} Failed record: {}", MirrorSinkTask.this, producerRecord);
throw new KafkaException(e);
} else {
log.info("{} Wrote record successfully: topic {} partition {} offset {}", //log.trace
MirrorSinkTask.this,
recordMetadata.topic(), recordMetadata.partition(),
recordMetadata.offset());
commitRecord(record, recordMetadata);
}
});
futures.add(future);
records.poll();
}
} catch (KafkaException e) {
// Any unsent messages are added to the remaining remainingRecordsMap for re-send
for (SinkRecord record = records.poll(); record != null; record = records.poll()) {
addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMap);
} finally { //TODO: may add more exception handling case
for (Future<RecordMetadata> future : futures) {
try {
future.get();
} catch (Exception e) {
SinkRecord record = futureMap.get(future);
// Any record failed to send, add to the remainingRecordsMap
addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMap);
}
}
}
if (isTransactional && remainingRecordsMap.size() == 0) {
producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);
commitTransaction(producer);
}
if (remainingRecordsMap.size() > 0) {
// For transaction case, all records should be put into remainingRecords, as the whole transaction should be redone
Collection<SinkRecord> recordsToSend;
if (isTransactional) {
// Transactional: retry all records, the transaction will have cancelled all of our outputs.
recordsToSend = records;
} else {
// Non-transactional: only retry failed records, others already were finished and sent.
recordsToSend = remainingRecordsMap;
}
throw new ResendRecordsException(recordsToThrow);
}
}
@Override
public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
// if transactional, return empty Map intentionally to disable the offset commit by commitOffsets() in "WorkerSinkTask.java"
// so that the transactional producer is able to commit the consumer offsets in a transaction
if (isTransactional)
return new HashMap<TopicPartition, OffsetAndMetadata>();
else // otherwise, return offsetsMap to let commitOffsets() in "WorkerSinkTask.java" to commit offsets
return offsetsMap;
}
// This commitRecord() follows the same logics as commitRecord() in MirrorSourceTask
public void commitRecord(SinkRecord record, RecordMetadata metadata) {
try {
if (stopping) {
return;
}
if (!metadata.hasOffset()) {
log.error("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic());
return;
}
TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition());
long latency = System.currentTimeMillis() - record.timestamp();
metrics.countRecord(topicPartition);
metrics.replicationLatency(topicPartition, latency);
TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition());
long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
long downstreamOffset = metadata.offset();
maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset);
} catch (Throwable e) {
log.warn("Failure committing record.", e);
}
}
private void beginTransaction(KafkaProducer<byte[], byte[]> producer) {
if (isTransactional) {
producer.beginTransaction();
transactionStart = true;
}
}
private void initTransactions(KafkaProducer<byte[], byte[]> producer) {
if (isTransactional) {
producer.initTransactions();
}
}
private void commitTransaction(KafkaProducer<byte[], byte[]> producer) {
if (isTransactional) {
producer.commitTransaction();
transactionStart = false;
}
}
private void abortTransaction(KafkaProducer<byte[], byte[]> producer) {
if (isTransactional && transactionStart) {
producer.abortTransaction();
transactionStart = false;
}
}
public static class ResendRecordsException extends Exception {
private Collection<SinkRecord> remainingRecords;
public ResendRecordsException(Collection<SinkRecord> remainingRecords) {
super(cause);
this.remainingRecords = remainingRecords;
}
public Collection<SinkRecord> getRemainingRecords() {
return remainingRecords;
}
} |