Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

It might also be necessary, to delete the state directory manually before starting the application. This will not result in data loss – the state will be recreated from the underlying changelog topic.0

How can I access record metadata?

Record metadata is accessible via ProcessorContext that is provided via Processor#init()Transformer#init(), and ValueTransformer#init(). The context object is updated under the hood and contains the metadata for the currently processed record each time process() is called.

Code Block
languagejava
titleAccessing record metadata via ProcessorContext
linenumberstrue
public class MyProcessor implements Processor<KEY_TYPE, VALUE_TYPE> {
  private ProcessorContext context;

  @Override
  public void init(ProcessorContext context) {
    this.context = context;
  }

  @Override
  void process(KEY_TYPE key, VALUE_TYPE value) {
      this.context.offset(); // returns the current record's offset
      // you can also access #partition(), #timestamp(), and #topic()
  }
 
  // other methods omitted for brevity
}

 

Producers

How should I set metadata.broker.list?

...