Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: add purge pattern

...

Code Block
languagejava
// tombstones are visible in a KStream and must be filtered out
KStream<String, String> subtopologyStream = customerFlightStatus
        .toStream()
        .filter((key, value) -> {
            if (value == null) {
                // tombstone! skip this
                return false;
            }
            // other filtering conditions...
            return true;
        });

// tombstone forwarding is different in KTables. The filter below is not evaluated for a tombstone
KTable<String, String> subtopologyKtable = customerFlightStatus
        .filter((key, value) -> {
            // the tombstone never makes it here. no need to check for null
 
			// other filtering conditions...
            return true;
        });

 

 

How to purge data from KTables based on age (or some other criteria)

I'm not certain I would recommend this in general, but I've been asked to recommend a pattern for effectively implementing a TTL in a KTable. In principle, this could be done straightforwardly with a custom state store, but it raises questions about the integrity of the data provenance and the soundness of the application.

If the data in question is truly not needed anymore after 24 hours (or whatever other criteria), I think a better approach is to emit tombstones into the topic that populates the KTable in question. This circumvents a lot of tricky distributed systems questions.

Even better than this would be to purge the data from the source system and let those deletes naturally propagate into the topic and then into the KTable, but I've been asked to design a solution that only considers Kakfa and Streams.

So, here you go: the comments should explain everything:

Code Block
languagejava
linenumberstrue
collapsetrue
package io.confluent.example;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.test.ConsumerRecordFactory;

import java.time.Duration;
import java.util.Map;
import java.util.Properties;

/**
 * This is one approach to purging old records from a data set.
 * Instead of implementing a TTL in a state store directly, it
 * consumes from an input topic and builds a table representing
 * the last non-tombstone update for each key.
 *
 * Twice a day, it scans the state store looking for records that are
 * over a day old. When it finds one, it emits a tombstone back to the
 * topic. These tombstones will be consumed later on, cleaning up that
 * record from this state store, as well as from any other application
 * building tables from the same topic.
 *
 * Thus, this application can be run as a co-process alongside other
 * Streams applications (and other Consumers in general), enforcing
 * a data retention policy, etc.
 */
public class PurgeApp {
    private static final String TOPIC = "input";
    private static final Duration SCAN_FREQUENCY = Duration.ofHours(12);
    private static final Duration MAX_AGE = Duration.ofDays(1);
    private static final String STATE_STORE_NAME = "purge-worker-store";

    /**
     * Not technically necessary, but I wanted to document that the
     * transformer only emits null values, so I've made the return type Void.
     * This means that we need a serde for it as well, which is trivial to implement,
     * since Void values can only be null.
     */
    public static class VoidSerde implements Serde<Void>, Serializer<Void>, Deserializer<Void> {

        @Override
        public Void deserialize(final String topic, final byte[] data) {
            if (data != null) {
                throw new IllegalArgumentException();
            } else {
                return null;
            }
        }

        @Override
        public Void deserialize(final String topic, final Headers headers, final byte[] data) {
            return deserialize(topic, data);
        }

        @Override
        public void configure(final Map<String, ?> configs, final boolean isKey) {}

        @Override
        public byte[] serialize(final String topic, final Void data) {
            if (data != null) {
                throw new IllegalArgumentException();
            } else {
                return null;
            }
        }

        @Override
        public byte[] serialize(final String topic, final Headers headers, final Void data) {
            return serialize(topic, data);
        }

        @Override
        public void close() {}

        @Override
        public Serializer<Void> serializer() {
            return this;
        }

        @Override
        public Deserializer<Void> deserializer() {
            return this;
        }
    }

    public static void main(String[] args) {
        final StreamsBuilder builder = new StreamsBuilder();

        // state store for maintaining the latest updated timestamp for the records
        final StoreBuilder<KeyValueStore<Long, Long>> storeBuilder = Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore(STATE_STORE_NAME),
            Serdes.Long()/* keys are long valued in this example */,
            Serdes.Long()/* we store only the record timestamp, since it's all we're basing the purge on */
        );

        builder
            // first, we register the state store
            .addStateStore(storeBuilder)
            // then, we set up to consume from the topic (assuming the keys are long-valued and the values are Strings
            .stream(TOPIC, Consumed.with(Serdes.Long(), Serdes.String()))
            // Then, we add our transformer,
            .transform(new TransformerSupplier<Long, String, KeyValue<Long, Void>>() {
                           @Override
                           public Transformer<Long, String, KeyValue<Long, Void>> get() {
                               return new Transformer<Long, String, KeyValue<Long, Void>>() {
                                   private ProcessorContext context;
                                   private KeyValueStore<Long, Long> stateStore;

                                   @Override
                                   public void init(final ProcessorContext context) {
                                       this.context = context;
                                       this.stateStore = (KeyValueStore<Long, Long>) context.getStateStore(STATE_STORE_NAME);
                                       // This is where the magic happens. This causes Streams to invoke the Punctuator
                                       // on an interval, using stream time. That is, time is only advanced by the record timestamps
                                       // that Streams observes. This has several advantages over wall-clock time for this application:
                                       // * It'll produce the exact same sequence of updates given the same sequence of data.
                                       //   This seems nice, since the purpose is to modify the data stream itself, you want to have
                                       //   a clear understanding of when stuff is going to get deleted. For example, if something
                                       //   breaks down upstream for this topic, and it stops getting new data for a while, wall clock
                                       //   time would just keep deleting data on schedule, whereas stream time will wait for
                                       //   new updates to come in.
                                       context.schedule(
                                           SCAN_FREQUENCY,
                                           PunctuationType.STREAM_TIME,
                                           timestamp -> {
                                               final long cutoff = timestamp - MAX_AGE.toMillis();

                                               // scan over all the keys in this partition's store
                                               // this can be optimized, but just keeping it simple.
                                               // this might take a while, so the Streams timeouts should take this into account
                                               try (final KeyValueIterator<Long, Long> all = stateStore.all()) {
                                                   while (all.hasNext()) {
                                                       final KeyValue<Long, Long> record = all.next();
                                                       if (record.value != null && record.value < cutoff) {
                                                           // if a record's last update was older than our cutoff, emit a tombstone.
                                                           context.forward(record.key, null);
                                                       }
                                                   }
                                               }
                                           }
                                       );
                                   }

                                   @Override
                                   public KeyValue<Long, Void> transform(final Long key, final String value) {
                                       // this gets invoked for each new record we consume. If it's a tombstone, delete
                                       // it from our state store. Otherwise, store the record timestamp.
                                       if (value == null) {
                                           stateStore.delete(key);
                                       } else {
                                           stateStore.put(key, context.timestamp());
                                       }
                                       return null; // no need to return anything here. the punctuator will emit the tombstones when necessary
                                   }

                                   @Override
                                   public void close() {} // no need to close anything; Streams already closes the state store.
                               };
                           }
                       },
                       STATE_STORE_NAME // register that this Transformer needs to be connected to our state store.
            )
            // emit our tombstones back to the same topic.
            .to(TOPIC, Produced.with(Serdes.Long(), new VoidSerde()));

        
        // Just keeping it simple and testing inline:
        
        // build the topopology
        final Topology build = builder.build();
        System.out.println(
            build.describe().toString()
        );

        // testing a simple scenario with TopologyTestDriver:
        final Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-host");
        // you'll also want to pay close attention to how long that punctuation takes to run, and set the appropriate timeouts accordingly
        // alternatively, since the punctuation doesn't modify the state store, you could put the scan in a separate thread.
        final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(build, config);

        final ConsumerRecordFactory<Long, String> recordFactory = new ConsumerRecordFactory<>(Serdes.Long().serializer(), Serdes.String().serializer());

        // send one record at timestamp 0
        topologyTestDriver.pipeInput(recordFactory.create(TOPIC, 1L, "first", 0L));
        // prints "null" because the app doesn't emit anything
        System.out.println(topologyTestDriver.readOutput(TOPIC, Serdes.Long().deserializer(), Serdes.String().deserializer()));

        // send another record two days later. This should cause the first record to get purged, since it's now 2 days old.
        topologyTestDriver.pipeInput(recordFactory.create(TOPIC, 2L, "second", 1000L * 60 * 60 * 24 * 2));
        // prints ProducerRecord(topic=input, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=1, value=null, timestamp=172800000)
        // because the punctuation has run, scanning over the state store, and determined that key=1 can be purged, since it is now 2 days old
        System.out.println(topologyTestDriver.readOutput(TOPIC, Serdes.Long().deserializer(), Serdes.String().deserializer()));
        // prints "null" because the app doesn't emit anything else
        System.out.println(topologyTestDriver.readOutput(TOPIC, Serdes.Long().deserializer(), Serdes.String().deserializer()));
    }
}