Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleKafkaProducer.java
linenumberstrue
public interface Producer<K,V> extends Closeable {
  
  /**
   * Needs to be called before any of the other transaction methods. Assumes that 
   * the transactional.id is specified in the producer configuration.
   *
   * This method does the following:
   *   1. Ensures any transactions initiated by previous instances of the producer
   *      are committed or rolled back.completed. If the previous instance had failed with a transaction in 
   *      progress, it will be aborted. 
   *   2. Gets the internal producer id and epoch, used in all future transactional
   *      messages issued by the producer.
   *
   * @throws IllegalStateException if the TransactionalId for the producer is not set 
   *         in the configuration.
   */
  void initTransactions() throws IllegalStateException;
  
  /**
   * Should be called before the start of each new transaction. 
   *
   * @throws ProducerFencedException if another producer is with the same
   *         transactional.id is active. 
   */
  void beginTransaction() throws ProducerFencedException;
  
  /** 
   * Sends a list of consumed offsets to the consumer group coordinator, and also marks 
   * those offsets as part of the current transaction. These offsets will be considered 
   * consumed only if the transaction is committed successfully.
   *
   * This method should be used when you need to batch consumed and produced messages 
   * together, typically in a consume-transform-produce pattern.
   * 
   * @throws ProducerFencedException if another producer is with the same
   *         transactional.id is active. 
   */
  void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, 
                                String consumerGroupId) throws ProducerFencedException;
  
  /**
   * Commits the ongoing transaction. 
   *
   * @throws ProducerFencedException if another producer is with the same
   *         transactional.id is active. 
   */
  void commitTransaction() throws ProducerFencedException;
  
  /**
   * Aborts the ongoing transaction.
   *
   * @throws ProducerFencedException if another producer is with the same
   *         transactional.id is active. 


   */
  void abortTransaction() throws ProducerFencedException;


  /**
   * Send the given record asynchronously and return a future which will eventually contain the response information.
   *
   * @param record The record to send
   * @return A future which will eventually contain the response information
   *
   */
  public Future<RecordMetadata> send(ProducerRecord<K, V> record);

  /**
   * Send a record and invoke the given callback when the record has been acknowledged by the server
   */
  public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
}

...

As the batch size increases, the overhead of the new format grows smaller compared to the old format because of the eliminated redundancy. The overhead per message in the old format is fixed at 34 bytes. For the new format, the message set overhead is 53 bytes, while per-message overhead ranges from 6 to 25 bytes. This makes it more costly to send individual messages, but space is quickly recovered with even modest batching. For example, assuming a fixed message size of 1K with 100 byte keys and reasonably close timestamps, the overhead increases by only 7 bytes for each additional batched message (2 bytes for the message size, 1 byte for attributes, 2 bytes for timestamp delta, 1 byte for offset delta, and 1 byte for key size) :

 

Batch Size

Old Format Overhead

New Format Overhead

1

34*1 = 34

53 + 1*7 = 60

3

34*3 = 102

53 + 3*7 = 74

10

34*10 = 340

53 + 10*7 = 123

50

34*50 = 1700

53 + 50*7 = 403

100

34*100 = 3400

45 + 100*7 = 745

 

...