THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Commits the ongoing transaction. This method will flush any unsent records before actually committing the transaction. * * Further, if any of the {@link #send(ProducerRecord)} calls which were part of the transaction hit irrecoverable * errors, this method will throw the last received exception immediately and the transaction will not be committed. * So all {@link #send(ProducerRecord)} calls in a transaction must succeed in order for this method to succeed. * * Note that this method will raise {@link TimeoutException} if the transaction cannot be committed before expiration * of {@code max.block.ms}. Additionally, it will raise {@link InterruptException} if interrupted. * It is safe to retry in either case, but it is not possible to attempt a different operation (such as abortTransaction) * since the commit may already be in the progress of completing. If not retrying, the only option is to close the producer. * * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker * does not support transactions (i.e. if its version is lower than 0.11.0.0) * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured * transactional.id is not authorized. See the exception for more details * @throws org.apache.kafka.common.errors.InvalidProducerEpochException if the producer has attempted to produce with an old epoch * to the partition leader. See the exception for more details * @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any * other unexpected error * @throws TimeoutException if the time taken for committing the transaction has surpassed <code>max.block.ms</code>. * @throws InterruptException if the thread is interrupted while blocked */ public TransactionResultTransactionCommitResult commitTransaction() throws ProducerFencedException |
...
Code Block | ||||
---|---|---|---|---|
| ||||
KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig); producer.initTransactions(); volatile boolean isRunning = true; while (isRunning) { ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT); final boolean shouldCommit = try { producer.beginTransaction(); producer.send(new ProducerRecord<>("topic", new byte[1], new byte[1])// Do some processing and build the records we want to produce. List<ProducerRecord> processed = process(consumed); for (ProducerRecord record : processed) producer.send(record, (metadata, exception) -> { // not required to capture the exception here. }); producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata()); return true; } catch (KafkaException e) { // Catch any exception thrown from the data transmission phase. return false; } try { if (shouldCommit) { TransactionResultTransactionCommitResult result = producer.commitTransaction(); if (result.failed()) { // Cleanup state, rewind processing offsetsresetToLastCommittedPositions(consumer); producer.abortTransaction(); } } else { // Cleanup state, rewind processing offsetsresetToLastCommittedPositions(consumer); producer.abortTransaction(); } } catch (KafkaException e) { // log the error throw e; } } } producer.close(); |
In the above example, we separate the transactional processing into two phases: the data transmission phase, and the commit phase. In data transmission phase,
Callback Exception Improvement
...