THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
private boolean isTransactional = config.getTransactionalProducer(); private boolean transactionInProgress = false; protected Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>(); private Set<TopicPartition> taskTopicPartitions; private KafkaProducer<byte[], byte[]> producer; private String connectConsumerGroup; @Override public void start(Map<String, String> props) { config = new MirrorTaskConfig(props); taskTopicPartitions = config.taskTopicPartitions(); isTransactional = config.transactionalProducer(); producer = initProducer(isTransactional); connectConsumerGroup = getSourceConsumerGroupId(); if (isTransactional) { Map<TopicPartition, OffsetAndMetadata> initOffsetsOnTarget = listTargetConsumerGroupOffsets(connectConsumerGroup); // To rewind consumer to correct offset per <topic, partition> assigned to this task, there are more than one case: // (1) offset exists on target, use target offset as the starting point of consumption // (2) offset DOES NOT exist on target, presume a new topic, so set offset to 0 Map<TopicPartition, Long> taskOffsets = taskTopicPartitions.stream().collect(Collectors.toMap( x -> x, x -> initOffsetsOnTarget.containsKey(x) ? initOffsetsOnTarget.get(x).offset() + 1 : 0L)); context.offset(taskOffsets); } } protected KafkaProducer<byte[], byte[]> initProducer(boolean isTransactional) { Map<String, Object> producerConfig = config.targetProducerConfig(); if (isTransactional) { String transactionId = getTransactionId(); log.info("use transactional producer with Id: {} ", transactionId); producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, genTransactionId()transactionId); producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "trasactional-producer"transactionId); } else { return producerConfigMirrorUtils.put(ProducerConfig.CLIENT_ID_CONFIG, "non-trasactional-producer"newProducer(producerConfig); } /** } * Per some articles, to avoid ProducerFencedException, transaction id is suggested to set application name + hostname * Each MirrorSinkTask is also assigned with different set of <topic, partition>. To get unique transaction id, * one way is to append connector name, hostname and string of each <topic, partition> pair */ protected String getTransactionId() { return connectorName + "-" + getHostName() + "-" + getTopicPartitionStr(); } protected String getTopicPartitionStr() { StringBuffer buf = new StringBuffer(); for (TopicPartition tp : taskTopicPartitions) { buf.append(tp.topic() + "_" + tp.partition() + "-"); } return MirrorUtils.newProducer(producerConfigbuf.toString().substring(0, buf.length() - 1); } @Override public void put(Collection<SinkRecord> records) { log.info("receive {} messages from consumer", records.size()); if (records.size() == 0) { return; } try { sendBatch(records, producer); } catch (RebalanceException e) { producer.close(); producer = initProducer(isTransactional); } catch (ResendRecordsException e) { abortTransaction(producer); //TODO: add limited retry sendBatch(e.getRemainingRecords(), producer); } catch (Throwable e) { log.error(getHostName() + " terminating on exception: {}", e); return; } } private void sendBatch(Collection<SinkRecord> records, KafkaProducer<byte[], byte[]> producer) { try { Map<TopicPartition, List<SinkRecord> remainingRecordsMap = new HashMap<>(); offsetsMap.clear(); beginTransaction(producer); SinkRecord record; for ((record = records.peek()) != null) { ProducerRecord<byte[], byte[]> producerRecord = convertToProducerRecord(record); offsetsMap.compute(new TopicPartition(record.topic(), record.kafkaPartition()), (tp, curOffsetMetadata) -> (curOffsetMetadata == null || record.kafkaOffset() > curOffsetMetadata.offset()) ? new OffsetAndMetadata(record.kafkaOffset()) : curOffsetMetadata); Future<RecordMetadata> future = producer.send(producerRecord, (recordMetadata, e) -> { if (e != null) { log.error("{} failed to send record to {}: ", MirrorSinkTask.this, producerRecord.topic(), e); log.debug("{} Failed record: {}", MirrorSinkTask.this, producerRecord); throw new KafkaException(e); } else { log.info("{} Wrote record successfully: topic {} partition {} offset {}", //log.trace MirrorSinkTask.this, recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); commitRecord(record, recordMetadata); } }); futures.add(future); records.poll(); } } catch (KafkaException e) { // Any unsent messages are added to the remaining remainingRecordsMap for re-send for (SinkRecord record = records.poll(); record != null; record = records.poll()) { addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMap); } finally { //TODO: may add more exception handling case for (Future<RecordMetadata> future : futures) { try { future.get(); } catch (Exception e) { SinkRecord record = futureMap.get(future); // Any record failed to send, add to the remainingRecordsMap addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMap); } } } if (isTransactional && remainingRecordsMap.size() == 0) { producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId); commitTransaction(producer); } if (remainingRecordsMap.size() > 0) { // For transaction case, all records should be put into remainingRecords, as the whole transaction should be redone Collection<SinkRecord> recordsToReSend; if (isTransactional) { // transactional: retry all records, as the transaction will cancel all successful and failed records. recordsToReSend = records; } else { // non-transactional: only retry failed records, others were finished and sent. recordsToReSend = remainingRecordsMap; } throw new ResendRecordsException(recordsToReSend); } } @Override public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> offsets) { // if transactional, return empty Map intentionally to disable the offset commit by commitOffsets() in WorkerSinkTask.java // so that the transactional producer is able to commit the consumer offsets to target cluster in a transaction at its pace if (isTransactional) return new HashMap<TopicPartition, OffsetAndMetadata>(); else // otherwise, return offsetsMap to let commitOffsets() in "WorkerSinkTask.java" to commit offsets return offsetsMap; } // This commitRecord() follows the same logics as commitRecord() in MirrorSourceTask, to public void commitRecord(SinkRecord record, RecordMetadata metadata) { try { if (stopping) { return; } if (!metadata.hasOffset()) { log.error("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic()); return; } TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition()); long latency = System.currentTimeMillis() - record.timestamp(); metrics.countRecord(topicPartition); metrics.replicationLatency(topicPartition, latency); TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition()); long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset()); long downstreamOffset = metadata.offset(); maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset); } catch (Throwable e) { log.warn("Failure committing record.", e); } } private void beginTransaction(KafkaProducer<byte[], byte[]> producer) { if (isTransactional) { producer.beginTransaction(); transactionInProgress = true; } } private void initTransactions(KafkaProducer<byte[], byte[]> producer) { if (isTransactional) { producer.initTransactions(); } } private void commitTransaction(KafkaProducer<byte[], byte[]> producer) { if (isTransactional) { producer.commitTransaction(); transactionInProgress = false; } } private void abortTransaction(KafkaProducer<byte[], byte[]> producer) { if (isTransactional && transactionInProgress) { producer.abortTransaction(); transactionInProgress = false; } } public static class ResendRecordsException extends Exception { private Collection<SinkRecord> remainingRecords; public ResendRecordsException(Collection<SinkRecord> remainingRecords) { super(cause); this.remainingRecords = remainingRecords; } public Collection<SinkRecord> getRemainingRecords() { return remainingRecords; } } |
...