THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
private boolean isTransactional = config.getTransactionalProducer(); private boolean transactionStart = false; protected Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>(); protected KafkaProducer<byte[], byte[]> initProducer(boolean isTransactional) { Map<String, Object> producerConfig = config.targetProducerConfig(); if (isTransactional) { log.info("use transactional producer"); producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, genTransactionId()); producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "trasactional-producer"); } else { producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "non-trasactional-producer"); } return MirrorUtils.newProducer(producerConfig); } @Override public void put(Collection<SinkRecord> records) { log.info("receive {} messages from consumer", records.size()); if (records.size() == 0) { return; } try { sendBatch(records, producer); } catch (RebalanceException e) { producer.close(); producer = initProducer(isTransactional); } catch (ResendRecordsException e) { // TODO: add limited retry sendBatch(e.getRemainingRecords(), producer); } catch (Throwable e) { log.error(getHostName() + " terminating on exception: {}", e); return; } //TODO: may add more exception handling case } private void sendBatch(Collection<SinkRecord> records, KafkaProducer<byte[], byte[]> producer) { try { offsetsMap.clear(); beginTransaction(producer); SinkRecord record; for ((record = records.peek()) != null) { ProducerRecord<byte[], byte[]> producerRecord = ConvertToProducerRecord(record); offsetsMap.compute(new TopicPartition(record.topic(), record.kafkaPartition()), (tp, curOffsetMetadata) -> (curOffsetMetadata == null || record.kafkaOffset() > curOffsetMetadata.offset()) ? new OffsetAndMetadata(record.kafkaOffset()) : curOffsetMetadata); Future<RecordMetadata> future = producer.send(producerRecord, (recordMetadata, e) -> { if (e != null) { log.error("{} failed to send record to {}: ", MirrorSinkTask.this, producerRecord.topic(), e); log.debug("{} Failed record: {}", MirrorSinkTask.this, producerRecord); throw new KafkaException(e); } else { log.info("{} Wrote record successfully: topic {} partition {} offset {}", //log.trace MirrorSinkTask.this, recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); commitRecord(record, recordMetadata); } }); futures.add(future); records.poll(); } } catch (KafkaException e) { // Any unsent messages are added to the remaining remainingRecordsMap for re-send for (SinkRecord record = records.poll(); record != null; record = records.poll()) { addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMap); } finally { //TODO: may add more }exception finallyhandling {case for (Future<RecordMetadata> future : futures) { try { future.get(); } catch (Exception e) { SinkRecord record = futureMap.get(future); // Any record failed to send, add to the remainingRecordsMap addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMap); } } } if (isTransactional && remainingRecordsMap.size() == 0) { producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId); commitTransaction(producer); } if (remainingRecordsMap.size() > 0) { // For transaction case, all records should be put into remainingRecords, as the whole transaction should be redone Collection<SinkRecord> recordsToSend; if (isTransactional) { // Transactional: retry all records, the transaction will have cancelled all of our outputs. recordsToSend = records; } else { // Non-transactional: only retry failed records, others already were finished and sent. recordsToSend = remainingRecordsMap; } throw new ResendRecordsException(recordsToThrow); } } @Override public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> offsets) { // if transactional, intentionally return empty Map, in order to let producer to commit the consumer offsets in a transaction if (isTransactional) return new HashMap<TopicPartition, OffsetAndMetadata>(); else return offsetsMap; } // This commitRecord() follows the same logics as commitRecord() in MirrorSourceTask public void commitRecord(SinkRecord record, RecordMetadata metadata) { try { if (stopping) { return; } if (!metadata.hasOffset()) { log.error("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic()); return; } TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition()); long latency = System.currentTimeMillis() - record.timestamp(); metrics.countRecord(topicPartition); metrics.replicationLatency(topicPartition, latency); TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition()); long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset()); long downstreamOffset = metadata.offset(); maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset); } catch (Throwable e) { log.warn("Failure committing record.", e); } } private void beginTransaction(KafkaProducer<byte[], byte[]> producer) { if (isTransactional) { producer.beginTransaction(); transactionStart = true; } } private void initTransactions(KafkaProducer<byte[], byte[]> producer) { if (isTransactional) { producer.initTransactions(); } } private void commitTransaction(KafkaProducer<byte[], byte[]> producer) { if (isTransactional) { producer.commitTransaction(); transactionStart = false; } } private void abortTransaction(KafkaProducer<byte[], byte[]> producer, String reason) { if (isTransactional && transactionStart) { producer.abortTransaction(); transactionOpen = false; } } public static class ResendRecordsException extends Exception { private Collection<SinkRecord> remainingRecords; public ResendRecordsException(Collection<SinkRecord> remainingRecords) { super(cause); this.remainingRecords = remainingRecords; } public Collection<SinkRecord> getRemainingRecords() { return remainingRecords; } } |
...