Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleAccessing record metadata via ProcessorContext
linenumberstrue
public class MyProcessor implements Processor<KEY_TYPE, VALUE_TYPE> {
  private ProcessorContext context;

  @Override
  public void init(final ProcessorContext context) {
    this.context = context;
  }

  @Override
  void process(final KEY_TYPE key, final VALUE_TYPE value) {
      this.context.offset(); // returns the current record's offset
      // you can also access #partition(), #timestamp(), and #topic()
  }
 
  // other methods omitted for brevity
}
Why do I get an IllegalStateException when accessing record metadata?

If you attach a new Processor/Transformer/ValueTransformer to your topology using a corresponding supplier, you need to make sure that the supplier returns a new instance each time get() is called. If you return the same object, a single Processor/Transformer/ValueTransformer would be shared over multiple tasks resulting in an IllegalStateException with error message "This should not happen as topic() should only be called while a record is processed" (depending on the method you are calling it could also be partition()offset(), or timestamp() instead of topic()).

Producers

How should I set metadata.broker.list?

...