THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
// tombstones are visible in a KStream and must be filtered out KStream<String, String> subtopologyStream = customerFlightStatus .toStream() .filter((key, value) -> { if (value == null) { // tombstone! skip this return false; } // other filtering conditions... return true; }); // tombstone forwarding is different in KTables. The filter below is not evaluated for a tombstone KTable<String, String> subtopologyKtable = customerFlightStatus .filter((key, value) -> { // the tombstone never makes it here. no need to check for null // other filtering conditions... return true; }); |
How to purge data from KTables based on age
...
I'm not certain I would recommend this in general, but I've been asked to recommend a pattern for effectively implementing a TTL in a KTable. In principle, this could be done straightforwardly with a custom state store, but it raises questions about the integrity of the data provenance and the soundness of the application.
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package io.confluent.example; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.test.ConsumerRecordFactory; import java.time.Duration; import java.util.Map; import java.util.Properties; /** * This is one approach to purging old records from a data set. * Instead of implementing a TTL in a state store directly, it * consumes from an input topic and builds a table representing * the last non-tombstone update for each key. * * Twice a day, it scans the state store looking for records that are * over a day old. When it finds one, it emits a tombstone back to the * topic. These tombstones will be consumed later on, cleaning up that * record from this state store, as well as from any other application * building tables from the same topic. * * Thus, this application can be run as a co-process alongside other * Streams applications (and other Consumers in general), enforcing * a data retention policy, etc. */ public class PurgeApp { private static final String TOPIC = "input"; private static final Duration SCAN_FREQUENCY = Duration.ofHours(12); private static final Duration MAX_AGE = Duration.ofDays(1); private static final String STATE_STORE_NAME = "purge-worker-store"; /** * Not technically necessary, but I wanted to document that the * transformer only emits null values, so I've made the return type Void. * This means that we need a serde for it as well, which is trivial to implement, * since Void values can only be null. */ public static class VoidSerde implements Serde<Void>, Serializer<Void>, Deserializer<Void> { @Override public Void deserialize(final String topic, final byte[] data) { if (data != null) { throw new IllegalArgumentException(); } else { return null; } } @Override public Void deserialize(final String topic, final Headers headers, final byte[] data) { return deserialize(topic, data); } @Override public void configure(final Map<String, ?> configs, final boolean isKey) {} @Override public byte[] serialize(final String topic, final Void data) { if (data != null) { throw new IllegalArgumentException(); } else { return null; } } @Override public byte[] serialize(final String topic, final Headers headers, final Void data) { return serialize(topic, data); } @Override public void close() {} @Override public Serializer<Void> serializer() { return this; } @Override public Deserializer<Void> deserializer() { return this; } } public static void main(String[] args) { final StreamsBuilder builder = new StreamsBuilder(); // state store for maintaining the latest updated timestamp for the records final StoreBuilder<KeyValueStore<Long, Long>> storeBuilder = Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore(STATE_STORE_NAME), Serdes.Long()/* keys are long valued in this example */, Serdes.Long()/* we store only the record timestamp, since it's all we're basing the purge on */ ); builder // first, we register the state store .addStateStore(storeBuilder) // then, we set up to consume from the topic (assuming the keys are long-valued and the values are Strings .stream(TOPIC, Consumed.with(Serdes.Long(), Serdes.String())) // Then, we add our transformer, .transform(new TransformerSupplier<Long, String, KeyValue<Long, Void>>() { @Override public Transformer<Long, String, KeyValue<Long, Void>> get() { return new Transformer<Long, String, KeyValue<Long, Void>>() { private ProcessorContext context; private KeyValueStore<Long, Long> stateStore; @Override public void init(final ProcessorContext context) { this.context = context; this.stateStore = (KeyValueStore<Long, Long>) context.getStateStore(STATE_STORE_NAME); // This is where the magic happens. This causes Streams to invoke the Punctuator // on an interval, using stream time. That is, time is only advanced by the record timestamps // that Streams observes. This has several advantages over wall-clock time for this application: // * It'll produce the exact same sequence of updates given the same sequence of data. // This seems nice, since the purpose is to modify the data stream itself, you want to have // a clear understanding of when stuff is going to get deleted. For example, if something // breaks down upstream for this topic, and it stops getting new data for a while, wall clock // time would just keep deleting data on schedule, whereas stream time will wait for // new updates to come in. context.schedule( SCAN_FREQUENCY, PunctuationType.STREAM_TIME, timestamp -> { final long cutoff = timestamp - MAX_AGE.toMillis(); // scan over all the keys in this partition's store // this can be optimized, but just keeping it simple. // this might take a while, so the Streams timeouts should take this into account try (final KeyValueIterator<Long, Long> all = stateStore.all()) { while (all.hasNext()) { final KeyValue<Long, Long> record = all.next(); if (record.value != null && record.value < cutoff) { // if a record's last update was older than our cutoff, emit a tombstone. context.forward(record.key, null); } } } } ); } @Override public KeyValue<Long, Void> transform(final Long key, final String value) { // this gets invoked for each new record we consume. If it's a tombstone, delete // it from our state store. Otherwise, store the record timestamp. if (value == null) { stateStore.delete(key); } else { stateStore.put(key, context.timestamp()); } return null; // no need to return anything here. the punctuator will emit the tombstones when necessary } @Override public void close() {} // no need to close anything; Streams already closes the state store. }; } }, STATE_STORE_NAME // register that this Transformer needs to be connected to our state store. ) // emit our tombstones back to the same topic. .to(TOPIC, Produced.with(Serdes.Long(), new VoidSerde())); // Just keeping it simple and testing inline: // build the topopology final Topology build = builder.build(); System.out.println( build.describe().toString() ); // testing a simple scenario with TopologyTestDriver: final Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-host"); // you'll also want to pay close attention to how long that punctuation takes to run, and set the appropriate timeouts accordingly // alternatively, since the punctuation doesn't modify the state store, you could put the scan in a separate thread. final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(build, config); final ConsumerRecordFactory<Long, String> recordFactory = new ConsumerRecordFactory<>(Serdes.Long().serializer(), Serdes.String().serializer()); // send one record at timestamp 0 topologyTestDriver.pipeInput(recordFactory.create(TOPIC, 1L, "first", 0L)); // prints "null" because the app doesn't emit anything System.out.println(topologyTestDriver.readOutput(TOPIC, Serdes.Long().deserializer(), Serdes.String().deserializer())); // send another record two days later. This should cause the first record to get purged, since it's now 2 days old. topologyTestDriver.pipeInput(recordFactory.create(TOPIC, 2L, "second", 1000L * 60 * 60 * 24 * 2)); // prints ProducerRecord(topic=input, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=1, value=null, timestamp=172800000) // because the punctuation has run, scanning over the state store, and determined that key=1 can be purged, since it is now 2 days old System.out.println(topologyTestDriver.readOutput(TOPIC, Serdes.Long().deserializer(), Serdes.String().deserializer())); // prints "null" because the app doesn't emit anything else System.out.println(topologyTestDriver.readOutput(TOPIC, Serdes.Long().deserializer(), Serdes.String().deserializer())); } } |