Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state:  Under Discussion Accepted

Discussion thread: here

JIRA: KAFKA-6138 here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

StreamsBuilder#addGlobalStore does not follow provide a good user experience as users are forced to specify names for processor names – processor name are a Processor API detail should be hidden in the DSL. The current API is the following:

...

We should remove the two parameters sourceName and processorName in both InternalStreamsBuilder#addGlobalStateStore and StreamsBuilder#addGlobalStateStore. The two parameter will be generated by the following code.

final String sourceName = newProcessorName(KStreamImpl.SOURCE_NAME);
final String processorName = newProcessorName(KTableImpl.SOURCE_NAME);

This is the same implementation used in InternalStreamsBuilder#globalTable.

The following code is an example of the new API usage.

String globalTopicName = "testGlobalTopic";
String globalStoreName = "testAddGlobalStore";
final StreamsBuilder builder = new StreamsBuilder();
KeyValueStoreBuilder t = new KeyValueStoreBuilder();
final KeyValueStoreBuilder globalStoreBuilder = EasyMock.createNiceMock(KeyValueStoreBuilder.class);
EasyMock.expect(globalStoreBuilder.name()).andReturn(globalStoreName).anyTimes();
EasyMock.replay(globalStoreBuilder);
builder.addGlobalStore(globalStoreBuilder,globalTopicName,new ConsumedInternal(),new MockProcessorSupplier());

Compatibility, Deprecation, and Migration Plan

  • To be backward compatible, the current method must be deprecated and a new method should be added with reduced number of parameters
  • Upgraded application should stay with old API since the new API will affected naming of generated internal topic. However, new application can use the new API