Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Refined the semantics of the OutOfOrderSequenceException.

...

Code Block
languagejava
titleKafkaProducer.java
linenumberstrue
public interface Producer<K,V> extends Closeable {
  
  /**
   * Needs to be called before any of the other transaction methods. Assumes that 
   * the transaction.app.id is specified in the producer configuration.
   *
   * This method does the following:
   *   1. Ensures any transactions initiated by previous instances of the producer
   *      are committed or rolled back.
   *   2. Gets the internal producer id and epoch, used in all future transactional
   *      messages issued by the producer.
   *
   * @throws IllegalStateException if the appId for the producer is not set 
   *         in the configuration.
   */
  void initTransactions() throws IllegalStateException;
  
  /**
   * Should be called before the start of each new transaction. 
   *
   * @throws ProducerFencedException if another producer is with the same
   *         transaction.app.id is active. 
   */
  void beginTransaction() throws ProducerFencedException;
  
  /** 
   * Sends a list of consumed offsets to the consumer group coordinator, and also marks 
   * those offsets as part of the current transaction. These offsets will be considered 
   * consumed only if the transaction is committed successfully.
   *
   * This method should be used when you need to batch consumed and produced messages 
   * together, typically in a consume-transform-produce pattern.
   * 
   * @throws ProducerFencedException if another producer is with the same
   *         transaction.app.id is active. 
   */
  void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, 
                                String consumerGroupId) throws ProducerFencedException;
  
  /**
   * Commits the ongoing transaction. 
   *
   * @throws ProducerFencedException if another producer is with the same
   *         transaction.app.id is active. 
   */
  void commitTransaction() throws ProducerFencedException;
  
  /**
   * Aborts the ongoing transaction.
   *
   * @throws ProducerFencedException if another producer is with the same
   *         transaction.app.id is active. 


   */
  void abortTransaction() throws ProducerFencedException;


  /**
   * Send the given record asynchronously and return a future which will eventually contain the response information.
   *
   * @param record The record to send
   * @return A future which will eventually contain the response information
   *
   * @throws OutOfOrderSequenceException if the broker receives an out of range sequence number. This could happen
   * due to a buggy client or in clusters configured for high availability rather than durability.
   */
  public Future<RecordMetadata> send(ProducerRecord<K, V> record) throws OutOfOrderSequenceException;


  /**
   * Send a record and invoke the given callback when the record has been acknowledged by the server
   */
   * @throws OutOfOrderSequenceException if the broker receives an out of range sequence number. This could happen
   * due to a buggy client or in clusters configured for high availability rather than durability.
   */
  public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) throws OutOfOrderSequenceException;
}

The OutOfOrderSequence Exception

The Producer will raise an OutOfOrderSequenceException if the broker detects data loss. In other words, if it receives a sequence number which is greater than the sequence it expected. This exception will be returned in the Future and passed to the Callback, if any. This is a fatal exception, and future invocations of Producer methods like send, beginTransaction, commitTransaction, etc. will raise an IlegalStateException.

An Example Application

Here is an simple application which demonstrates the use of the APIs introduced above.

...