THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
// Read some records from the consumer and collect the offsets to commit ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time. Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed); // Do some processing and build the records we want to produce List<ProducerRecord> processed = process(consumed); // --------- Call after process ---------- // Set<TopicPartition> outputPartitions = outputPartitions(processed); producer.beginTransaction(outputPartitions); // Write the records and commit offsets under a single transaction for (ProducerRecord record : processed) producer.send(record); // Pass the entire consumer group metadata producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata()); producer.commitTransaction(); |
To ensure that we don't encounter human mistake, when the application is using the new `beginTransaction` API, we shall not expect any produced record going to an unknown partition unexpectedly, so we shall let producer.send() throw UNKNOWN_TOPIC_OR_PARTITION, while augmenting its definition:
Code Block |
---|
UNKNOWN_TOPIC_OR_PARTITION(3, "This server does not host this topic-partition, or this error is thrown from a transactional producer who doesn't expect this topic partition to send to.", UnknownTopicOrPartitionException::new), |
Updated JavaDoc for send:
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.
*
* @param record The record to send
* @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
* indicates no callback)
*
* @throws AuthenticationException if authentication fails. See the exception for more details
* @throws AuthorizationException fatal error indicating that the producer is not allowed to write
* @throws IllegalStateException if a transactional.id has been configured and no transaction has been started, or
* when send is invoked after producer has been closed.
* @throws InterruptException If the thread is interrupted while blocked
* @throws SerializationException If the key or value are not valid objects given the configured serializers
* @throws UnknownTopicOrPartitionException If the record topic partition is not included in the pre-registered partitions
* @throws KafkaException If a Kafka related error occurs that does not belong to the public API exceptions.
*/
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
} |
Compatibility, Deprecation, and Migration Plan
...