...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Should be called before the start of each new transaction. Note that prior to the first invocation * of this method, you must invoke {@link #initTransactions()} exactly one time. * * @param outputPartitions the set of output partitions to produce to. * @param consumerGroupId the consumer group id to send offsets to. * * @throws IllegalStateException if no transactional.id has been configured or if {@link #initTransactions()} * has not yet been invoked * @throws ProducerFencedException if another producer with the same transactional.id is active * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker * does not support transactions (i.e. if its version is lower than 0.11.0.0), or doesn't support pre-registration of * partitions and consumer. * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured * transactional.id is not authorized. See the exception for more details * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error */ public void beginTransaction(Set<TopicPartition> outputPartitions, StringOptional<String> consumerGroupId) throws ProducerFencedException; |
...
Code Block | ||
---|---|---|
| ||
// Read some records from the consumer and collect the offsets to commit ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time. Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed); // --------- Call before process ---------- // // The application could infer output partition beforehand Set<TopicPartition> outputPartitions = outputPartition(consumed); producer.beginTransaction(outputPartitions, Optional.of(consumer.groupMetadata().groupId())); // Do processing while sending out the built records asynchronously for (ProducerRecord record : process(consumed)) producer.send(record); // Pass the entire consumer group metadata producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata()); producer.commitTransaction(); |
...
Code Block | ||
---|---|---|
| ||
// Read some records from the consumer and collect the offsets to commit ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time. Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed); // Do some processing and build the records we want to produce List<ProducerRecord> processed = process(consumed); // --------- Call after process ---------- // Set<TopicPartition> outputPartitions = outputPartitions(processed); producer.beginTransaction(outputPartitions, Optional.of(consumer.groupMetadata().groupId())); // Write the records and commit offsets under a single transaction for (ProducerRecord record : processed) producer.send(record); // Pass the entire consumer group metadata producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata()); producer.commitTransaction(); |
...
Code Block |
---|
UNKNOWN_TOPIC_OR_PARTITION(3, "This server does not host this topic-partition, or this error is thrown from a transactional producer who doesn't expect this topic partition to send to.", UnknownTopicOrPartitionException::new), INVALID_GROUP_ID(69, "The configured groupId is invalid, or this error is thrown from a transactional producer who doesn't expect to commit offset to this group.", InvalidGroupIdException::new), |
Updated JavaDoc for send and sendOffsetsToTransaction:
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged. * * @param record The record to send * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null * indicates no callback) * * @throws AuthenticationException if authentication fails. See the exception for more details * @throws AuthorizationException fatal error indicating that the producer is not allowed to write * @throws IllegalStateException if a transactional.id has been configured and no transaction has been started, or * when send is invoked after producer has been closed. * @throws InterruptException If the thread is interrupted while blocked * @throws SerializationException If the key or value are not valid objects given the configured serializers * @throws UnknownTopicOrPartitionException If the record topic partition is not included in the pre-registered partitions * @throws KafkaException If a Kafka related error occurs that does not belong to the public API exceptions. */ @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); } /* * Sends a list of specified offsets to the consumer group coordinator, and also marks * those offsets as part of the current transaction. * * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started. * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker * does not support transactions (i.e. if its version is lower than 0.11.0.0) or * the broker doesn't support latest version of transactional API with consumer group metadata (i.e. if its version is * lower than 2.5.0). * @throws org.apache.kafka.common.errors.UnsupportedForMessageFormatException fatal error indicating the message * format used for the offsets topic on the broker does not support transactions * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured * transactional.id is not authorized, or the consumer group id is not authorized. * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried * (e.g. if the consumer has been kicked out of the group). Users should handle this by aborting the transaction. * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this producer instance gets fenced by broker due to a * mis-configured consumer instance id within group metadata. * @throws org.apache.kafka.common.errors.InvalidGroupIdException if the given consumer group id doesn't match the * pre-registered value. * @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any * other unexpected error */ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata groupMetadata) throws ProducerFencedException |
...
The new beginTransaction API and blocking behavior shall only be available for new producers, so there should not be a compatibility issue. To enable blocking behavior and pre-registration, the broker has to be on the latest version , while for partition pre-registration, the broker just need to be 0.11 to support first version of EOS semanticsas well, otherwise UnsupportedVersionException would be thrown.
Rejected Alternatives
We have thought about using a context session on the broker side to remember the partitions being written to in the last transaction, and bump the EndTxn to include the actual written partitions so that the session gets updated.
...