Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleConsumer.java
ConsumerGroupMetadata getMetadatagroupMetadata();

So that EOS users could get refreshed group metadata as needed.

...

Code Block
TxnOffsetCommitRequest => TransactionalId GroupId ProducerId ProducerEpoch Offsets GenerationId
  TransactionalId     => String
  GroupId             => String
  ProducerId  		  => int64		
  ProducerEpoch       => int16
  Offsets  	          => Map<TopicPartition, CommittedOffset>
  GenerationId        => int32, default -1 // NEW
  MemberId			  => nullable String // NEW
  GroupInstanceId 	  => nullable String // NEW

If one of the field is not matching correctly on server side, the client will be fenced immediately. An edge case is defined as:

...

Code Block
linenumberstrue
  KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
  KafkaProducer producer = new KafkaProducer();

  producer.initTransactions();	
  while (true) {
    // Read some records from the consumer and collect the offsets to commit
    ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time.
    Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);

    // Do some processing and build the records we want to produce
    List<ProducerRecord> processed = process(consumed);

    // Write the records and commit offsets under a single transaction
    producer.beginTransaction();
    for (ProducerRecord record : processed)
      producer.send(record);

	// Pass the entire consumer group metadata
    producer.sendOffsetsToTransaction(consumedOffsets, consumer.getMetadatagroupMetadata());

    producer.commitTransaction();
  }

...