Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Note that recordMetadata() returns an Optional to which account for the fact that there might not be any recordMetadata available if no records have been processed yet.

The following code snippet illustrates how the above interface can be used within the state store to keep track of the latest offset a store has processed:

Code Block
public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingStore {
     private Map<String, Map<Integer, Long>> seenOffsets = new HashMap<>();
	 ...

     public synchronized void put(final Bytes key,
        final byte[] value) {
        Objects.requireNonNull(key, "key cannot be null");
        validateStoreOpen();
        dbAccessor.put(key.get(), value);

        if (context != null && context.recordMetadata().isPresent()) {
            final RecordMetadata meta = context.recordMetadata().get();
            //update seen offsets
            seenOffsets.computeIfAbsent(meta.topic(), t -> new HashMap<>())
                .put(meta.partition(), meta.offset());
        }*/
    }

	...
}


Compatibility, Deprecation, and Migration Plan

...