You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current stateUnder Discussion

Discussion thread: TODO

JIRA Unable to render Jira issues macro, execution error.

Released: target version 1.1.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka Streams topologies can be quite complex and it is important for users to have the ability to test their code. This is especially important if the Processor API is used.

Public Interfaces

We add a new artifact streams-test-utils such that people can easily include it as a dependency to their build. The artifact will have a TopologyTestDriver class that can be used to test topologies.

package org.apache.kafka.streams.test;

public class TopologyTestDriver {


    public TopologyTestDriver(Topology topology, Properties config);
    public TopologyTestDriver(Topology topology, Properties config, Time time);


    public void close();


    // methods for actual testing

    public void process(String topicName, byte[] key, byte[] value, long timestamp);                                                          // main process method
    public void process(String topicName, byte[] key, byte[] value);                                                                          // uses constructor provided Time object
    public <K, V> void process(String topicName, K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, long timestamp); // use objects+serializers
    public <K, V> void process(String topicName, K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer);


    // for null-key messages
    public void process(String topicName, byte[] value, long timestamp);
    public void process(String topicName, byte[] value);
    public <V> void process(String topicName, V value, Serializer<V> valueSerializer, long timestamp)
    public <V> void process(String topicName, V value, Serializer<V> valueSerializer)


    // single input topic topologies can omit `topicName` (repeats all methods from above without topicName parameter
    public void process(byte[] key, byte[] value, long timestamp);
    public void process(byte[] key, byte[] value);
    public <K, V> void process(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, long timestamp);
    public <K, V> void process(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer);
    public void process(byte[] value, long timestamp);
    public void process(byte[] value);
    public <V> void process(V value, Serializer<V> valueSerializer, long timestamp)
    public <V> void process(V value, Serializer<V> valueSerializer)

    // see below; cf. TestRecrodFactory
    public void process(TestRecord record);
    public void process(List<TestRecord> records);

    public void punctuateWallClockTime();
    public void punctuateStreamTime();



    // methods for result verification
 
    public ProducerRecord<byte[], byte[]> readOutput(String topic);
    public <K, V> ProducerRecord<K, V> readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer);


    public StateStore getStateStore(String name);
    public <K, V> KeyValueStore<K, V> getKeyValueStore(String name);
    public <K, V> KeyValueStore<K, V> getWindowStore(String name);
    public <K, V> KeyValueStore<K, V> getSessionStore(String name);
}
package org.apache.kafka.streams.test;

public class TestRecord<K, V> {

    public TestRecord(String topicName, K key, V value, long timestamp, Serializer<K> keySerializer, Serializer<V> valueSerializer);
    public TestRecord(String topicName, byte[] key, byte[] value, long timestamp);

    public String topicName();
    public byte[] key();
    public byte[] value();
    public long timestamp();
}
package org.apache.kafka.streams.test;

public class TestRecordFactory<K, V> {

    // create records for a single input topic
    public TestRecordFactory(String topicName, Serializer<K> keySerializer, Serializer<V> valueSerializer, Time time);


    public TestRecord<K, V> create(K key, V value, long timestamp);
    public TestRecord<K, V> create(K key, V value);
    // null-key
    public TestRecord<K, V> create(V value, long timestamp);
    public TestRecord<K, V> create(V value);
    public TestRecord<K, V> create(KeyValue<K, V> keyValue, long timestamp)
    public TestRecord<K, V> create(KeyValue<K, V> keyValue);


    // create list of records base on list of k-v-pairs
    public List<TestRecord<K, V>> create(List<KeyValue<K, V>> keyValues); // start timestamp from constructor Time object; timestamp auto increment by 1
    public List<TestRecord<K, V>> create(List<KeyValue<K, V>> keyValues, long startTimestamp); // timestamp auto increment by 1
    public List<TestRecord<K, V>> create(List<KeyValue<K, V>> keyValues, long startTimestamp, long advanceMs);
}
package org.apache.kafka.streams.test;

public class MockTime implements Time {

    // Listener that is invoked each time <em>after</em> time was advanced or modified.
    public interface MockTimeListener {
        void tick(long currentTimeMs);
    }

    public MockTime(); // init with System.currentTimeMillis() and System.nanoTime(); no auto-tick
    public MockTime(long autoTickMs); 
    public MockTime(long autoTickMs, long currentTimeMs, long currentHighResTimeNs);

    public void addListener(final MockTimeListener listener) {


    public void setAutoTickMs(long autoTickMs);
    public void setCurrentTimeMs(long newMs);


    // calling those methods triggers auto-tick
    @Override
    public long milliseconds();
    @Override
    public long nanoseconds();
    @Override
    public long hiResClockMs();


    @Override
    public void sleep(final long ms);
}

 

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels