Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Excerpt Include
Global sums for currently active sessions
Global sums for currently active sessions
nopaneltrue

How to manage the size of state stores using tombstones?

An application that makes heavy use of aggregations can make better use of it's resources by removing records it no longer needs from its state stores. Kafka Streams makes this possible through the usage of tombstone records, which are records that contain a non-null key, and a null value. The usage of tombstone records will become apparent in the example below, but it's important to note that any record with a null key will be internally dropped, and will not be seen by your aggregation. Therefore, it is necessary that your aggregation include the logic for recognizing when a record can be dropped from the state store, and by returning null when this condition is met. Once a tombstone message is returned from an aggregation, the record is immediately deleted from the state store.

Consider the following example. An airline wants to track the various stages of a customer's flight. For this example, a customer can be in one of 4 stages: booked,  boarded, landed, and post-flight survey completed. Once the customer has completed the post-flight survey, the airline no longer needs to track the customer. Until then, the airline would like to know what stage the customer is in, and perform various aggregations on the customer's data. This can be accomplished using the following topology.

 

Code Block
languagejava
// customer flight statuses
final String BOOKED = "booked";
final String BOARDED = "boarded";
final String LANDED = "landed";
final String COMPLETED_FLIGHT_SURVEY = "survey";

// topics
final String SOURCE_TOPIC = "someInputTopic";
final String STORE_NAME = "someStateStore";

// topology
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> stream = builder.stream(SOURCE_TOPIC);

KTable<String, String> customerFlightStatus = stream
    .groupByKey()
    .reduce(
        new Reducer<String>() {
            @Override
            public String apply(String value1, String value2) {
                if (value2.equals(COMPLETED_FLIGHT_SURVEY)) {
                    // we no longer need to keep track of this customer since
                    // they completed the flight survey. Create a tombstone
                    return null;
                }
                // keeping this simple for brevity
                return value2;
            }
        }, STORE_NAME);

Returning null in the Reducer,  but only when the customer has completed their post-flight survey, allows us to perform aggregations until we no longer are interested in tracking this key. A tombstone is created by returning null, and the record is deleted from the state store immediately. We can verify this with the following tests:

 

Code Block
languagejava
// Simple test
final String customerFlightNumber = "customer123_dl90210";
File stateDir = createStateDir(); // implementation omitted
KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, Serdes.String(), Serdes.String());
driver.setTime(0L);

// get a reference to the state store
KeyValueStore<String, String> store = (KeyValueStore<String, String>) driver.context().getStateStore(STORE_NAME);

// the customer has booked their flight
driver.process(SOURCE_TOPIC, customerFlightNumber, BOOKED);
store.flush();
assertEquals(BOOKED, store.get(customerFlightNumber));

// the customer has boarded their flight
driver.process(SOURCE_TOPIC, customerFlightNumber, BOARDED);
store.flush();
assertEquals(BOARDED, store.get(customerFlightNumber));

// the customer's flight has landed
driver.process(SOURCE_TOPIC, customerFlightNumber, LANDED);
store.flush();
assertEquals(LANDED, store.get(customerFlightNumber));

// the customer has filled out the post-flight survey, so we no longer need to track them
// in the state store. make sure the key was deleted
driver.process(SOURCE_TOPIC, customerFlightNumber, COMPLETED_FLIGHT_SURVEY);
store.flush();
assertEquals(null, store.get(customerFlightNumber));

Finally, it's important to note how tombstones are forwarded downstream. Whether or a not a tombstone is visible to additional sub-topologies depends on which abstraction (e.g. KTable or KStream) a sub-topology uses for streaming it's input data. The following code snippets highlight these differences. Tombstones are visible in record streams, and it is common to filter them out before performing additional transformations (see below). However, in changelog streams, the tombstones are forwarded directly, and not visible when using operators like filter, mapValues, etc.

Code Block
languagejava
// tombstones are visible in a KStream and must be filtered out
KStream<String, String> subtopologyStream = customerFlightStatus
        .toStream()
        .filter((key, value) -> {
            if (value == null) {
                // tombstone! skip this
                return false;
            }
            // other filtering conditions...
            return true;
        });

// tombstone forwarding is different in KTables. The filter below is not evaluated for a tombstone
KTable<String, String> subtopologyKtable = customerFlightStatus
        .filter((key, value) -> {
            // the tombstone never makes it here. no need to check for null
 
			// other filtering conditions...
            return true;
        });