Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state:  Under DiscussionAccepted

Discussion thread: here 

JIRA: https://issues.apache.org/jira/browse/KAFKA-10629

...

For many use cases of TopologyTestDriver, we don't need to specify properties parameter. As of https://github.com/apache/kafka/pull/9477, many TopologyTestDriver usages will have no configurations at all to specify, so we should provide a constructor that doesn't take a Properties argument. Right now, such configuration-free usages have to provide an empty Properties object.

There can be other use cases where we would like to specify initial clock time without specifying properties. So we can have another constructor which takes in parameter of initial clock time along with topology parameter.

Application-id set by default should be randomized so that no two tests can interfere with each other. 

...

Code Block
languagejava
themeEclipse
titlePublic Constructor
linenumberstrue
	/**
     * Create a new test diver instance.
     * Default test properties are used to initialize the driver instance
     *
     * @param topology the topology to be tested
     */
    public TopologyTestDriver(final Topology topology) {
        this(topology, DEFAULT_TEST_PROPSnew Properties());
    }

	private static final Properties DEFAULT_TEST_PROPS;
/**
     * Create a new test diver instance.
     *
    static {
        DEFAULT_TEST_PROPS = new Properties();
 * @param topology the topology to be tested
     * @param initialWallClockTimeMs the initial value of internally mocked wall-clock time
     */
	 public TopologyTestDriver(final Topology topology,
                       DEFAULT_TEST_PROPS.put(APPLICATION_ID_CONFIG, "dummy-app-id");    final Instant initialWallClockTimeMs) {
     	this(topology,   DEFAULT_TEST_PROPS.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"new Properties(), initialWallClockTimeMs);
   	 }


Proposed Changes

We propose to add new constructor to TopologyTestDriver class. This constructor will have only Topology as parameter. Kafka streams config has APPLICATION_ID_CONFIG and BOOTSTRAP_SERVERS_CONFIG as required parameters. Those values will be provided by DEFAULT_TEST_PROPS variable which will initialize those two parameters.in the private constructor of TopologyTestDriver. We want to set randomized application id to avoid conflicts with tests running in parallel. Proposing to change private constructor of TopologyTestDriver in following way.

Code Block
languagejava
linenumberstrue
 private TopologyTestDriver(final InternalTopologyBuilder builder,
                               final Properties config,
                               final long initialWallClockTimeMs) {
        final Properties configCopy = new Properties();
        configCopy.putAll(config);
        if (!configCopy.containsKey(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)) {
            configCopy.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-bootstrap-host:0");
        }
        if (!configCopy.containsKey(StreamsConfig.APPLICATION_ID_CONFIG)) {
            // provide randomized dummy app-id if it's not specified
            configCopy.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,
                    "dummy-topology-test-driver-app-id-" + ThreadLocalRandom.current().nextInt());
        }
		.
		.
		.
		.
}



Classes which will have usage for this constructor are 

...