Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

State stores access processor context via the StateStoreContext interface. Today, the AbstractProcessorContext class which implements/extends the StateStore interface already makes record metadata available via recordMetadata(). Unfortunately that method is not part of the StateStoreContext interface, thus, state stores cannot access it. 

...

Note that recordMetadata() returns an Optional which account accounts for the fact that there might not be any recordMetadata available if no records have been processed yet.

...

The change of the StateStoreContext interface does not require any changes to any of the implementations of the interface, it merely exposes a method (recordMetadata) a method that is already implemented in AbstractProcessorContext. Therefore, we do not expect any compatibility issues. Moreover, recordMetadata returns an object of type RecordMetadata which is exclusively read-only, thus, protecting Kafka-internal internals from being tampered by applications. 

...

Code Block
public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K, V> {
	...

    //new put including metadata
	void Putput(K key, V value, RecordMetadata metadata);

    //we keep old put for cases where there is no metadata

	...
}

We rejected this approach because of the extra code complexity it causes. Each processor would have to explicitly manage which call (put(key, value) or put(key, value, metadata)) to invoke, depending on whether it's running in process or in punctuate. Then, the caching layer would also need to select which put call to call on the lower stores, based on whether the cached metadata is present or not. 

Add a new UpdatePosition method to the KeyValueStore interface

...

Code Block
public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K, V> {
	...


	void UpdatePosition(String topic, int partition, int offset);

	...
}

We reject this approach because of an increased risk for bugs. For instance, if a processor forgets to call update after a put it will be impossible to reason about consistency of a state store. Second, decoupling the data and the metadata update is also problematic because it creates a window of inconsistency where the data is already reflected by the store but the metadata is not.