Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
// tombstones are visible in a KStream and must be filtered out
KStream<String, String> subtopologyStream = customerFlightStatus
        .toStream()
        .filter((key, value) -> {
            if (value == null) {
                // tombstone! skip this
                return false;
            }
            // other filtering conditions...
            return true;
        });

// tombstone forwarding is different in KTables. The filter below is not evaluated for a tombstone
KTable<String, String> subtopologyKtable = customerFlightStatus
        .filter((key, value) -> {
            // the tombstone never makes it here. no need to check for null
 
			// other filtering conditions...
            return true;
        });

 

 

How to purge data from KTables based on age

...

I'm not certain I would recommend this in general, but I've been asked to recommend a pattern for effectively implementing a TTL in a KTable. In principle, this could be done straightforwardly with a custom state store, but it raises questions about the integrity of the data provenance and the soundness of the application.

...

Code Block
languagejava
linenumberstrue
collapsetrue
package io.confluent.example;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.test.ConsumerRecordFactory;

import java.time.Duration;
import java.util.Map;
import java.util.Properties;

/**
 * This is one approach to purging old records from a data set.
 * Instead of implementing a TTL in a state store directly, it
 * consumes from an input topic and builds a table representing
 * the last non-tombstone update for each key.
 *
 * Twice a day, it scans the state store looking for records that are
 * over a day old. When it finds one, it emits a tombstone back to the
 * topic. These tombstones will be consumed later on, cleaning up that
 * record from this state store, as well as from any other application
 * building tables from the same topic.
 *
 * Thus, this application can be run as a co-process alongside other
 * Streams applications (and other Consumers in general), enforcing
 * a data retention policy, etc.
 */
public class PurgeApp {
    private static final String TOPIC = "input";
    private static final Duration SCAN_FREQUENCY = Duration.ofHours(12);
    private static final Duration MAX_AGE = Duration.ofDays(1);
    private static final String STATE_STORE_NAME = "purge-worker-store";

    /**
     * Not technically necessary, but I wanted to document that the
     * transformer only emits null values, so I've made the return type Void.
     * This means that we need a serde for it as well, which is trivial to implement,
     * since Void values can only be null.
     */
    public static class VoidSerde implements Serde<Void>, Serializer<Void>, Deserializer<Void> {

        @Override
        public Void deserialize(final String topic, final byte[] data) {
            if (data != null) {
                throw new IllegalArgumentException();
            } else {
                return null;
            }
        }

        @Override
        public Void deserialize(final String topic, final Headers headers, final byte[] data) {
            return deserialize(topic, data);
        }

        @Override
        public void configure(final Map<String, ?> configs, final boolean isKey) {}

        @Override
        public byte[] serialize(final String topic, final Void data) {
            if (data != null) {
                throw new IllegalArgumentException();
            } else {
                return null;
            }
        }

        @Override
        public byte[] serialize(final String topic, final Headers headers, final Void data) {
            return serialize(topic, data);
        }

        @Override
        public void close() {}

        @Override
        public Serializer<Void> serializer() {
            return this;
        }

        @Override
        public Deserializer<Void> deserializer() {
            return this;
        }
    }

    public static void main(String[] args) {
        final StreamsBuilder builder = new StreamsBuilder();

        // state store for maintaining the latest updated timestamp for the records
        final StoreBuilder<KeyValueStore<Long, Long>> storeBuilder = Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore(STATE_STORE_NAME),
            Serdes.Long()/* keys are long valued in this example */,
            Serdes.Long()/* we store only the record timestamp, since it's all we're basing the purge on */
        );

        builder
            // first, we register the state store
            .addStateStore(storeBuilder)
            // then, we set up to consume from the topic (assuming the keys are long-valued and the values are Strings
            .stream(TOPIC, Consumed.with(Serdes.Long(), Serdes.String()))
            // Then, we add our transformer,
            .transform(new TransformerSupplier<Long, String, KeyValue<Long, Void>>() {
                           @Override
                           public Transformer<Long, String, KeyValue<Long, Void>> get() {
                               return new Transformer<Long, String, KeyValue<Long, Void>>() {
                                   private ProcessorContext context;
                                   private KeyValueStore<Long, Long> stateStore;

                                   @Override
                                   public void init(final ProcessorContext context) {
                                       this.context = context;
                                       this.stateStore = (KeyValueStore<Long, Long>) context.getStateStore(STATE_STORE_NAME);
                                       // This is where the magic happens. This causes Streams to invoke the Punctuator
                                       // on an interval, using stream time. That is, time is only advanced by the record timestamps
                                       // that Streams observes. This has several advantages over wall-clock time for this application:
                                       // * It'll produce the exact same sequence of updates given the same sequence of data.
                                       //   This seems nice, since the purpose is to modify the data stream itself, you want to have
                                       //   a clear understanding of when stuff is going to get deleted. For example, if something
                                       //   breaks down upstream for this topic, and it stops getting new data for a while, wall clock
                                       //   time would just keep deleting data on schedule, whereas stream time will wait for
                                       //   new updates to come in.
                                       context.schedule(
                                           SCAN_FREQUENCY,
                                           PunctuationType.STREAM_TIME,
                                           timestamp -> {
                                               final long cutoff = timestamp - MAX_AGE.toMillis();

                                               // scan over all the keys in this partition's store
                                               // this can be optimized, but just keeping it simple.
                                               // this might take a while, so the Streams timeouts should take this into account
                                               try (final KeyValueIterator<Long, Long> all = stateStore.all()) {
                                                   while (all.hasNext()) {
                                                       final KeyValue<Long, Long> record = all.next();
                                                       if (record.value != null && record.value < cutoff) {
                                                           // if a record's last update was older than our cutoff, emit a tombstone.
                                                           context.forward(record.key, null);
                                                       }
                                                   }
                                               }
                                           }
                                       );
                                   }

                                   @Override
                                   public KeyValue<Long, Void> transform(final Long key, final String value) {
                                       // this gets invoked for each new record we consume. If it's a tombstone, delete
                                       // it from our state store. Otherwise, store the record timestamp.
                                       if (value == null) {
                                           stateStore.delete(key);
                                       } else {
                                           stateStore.put(key, context.timestamp());
                                       }
                                       return null; // no need to return anything here. the punctuator will emit the tombstones when necessary
                                   }

                                   @Override
                                   public void close() {} // no need to close anything; Streams already closes the state store.
                               };
                           }
                       },
                       STATE_STORE_NAME // register that this Transformer needs to be connected to our state store.
            )
            // emit our tombstones back to the same topic.
            .to(TOPIC, Produced.with(Serdes.Long(), new VoidSerde()));

        
        // Just keeping it simple and testing inline:
        
        // build the topopology
        final Topology build = builder.build();
        System.out.println(
            build.describe().toString()
        );

        // testing a simple scenario with TopologyTestDriver:
        final Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-host");
        // you'll also want to pay close attention to how long that punctuation takes to run, and set the appropriate timeouts accordingly
        // alternatively, since the punctuation doesn't modify the state store, you could put the scan in a separate thread.
        final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(build, config);

        final ConsumerRecordFactory<Long, String> recordFactory = new ConsumerRecordFactory<>(Serdes.Long().serializer(), Serdes.String().serializer());

        // send one record at timestamp 0
        topologyTestDriver.pipeInput(recordFactory.create(TOPIC, 1L, "first", 0L));
        // prints "null" because the app doesn't emit anything
        System.out.println(topologyTestDriver.readOutput(TOPIC, Serdes.Long().deserializer(), Serdes.String().deserializer()));

        // send another record two days later. This should cause the first record to get purged, since it's now 2 days old.
        topologyTestDriver.pipeInput(recordFactory.create(TOPIC, 2L, "second", 1000L * 60 * 60 * 24 * 2));
        // prints ProducerRecord(topic=input, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=1, value=null, timestamp=172800000)
        // because the punctuation has run, scanning over the state store, and determined that key=1 can be purged, since it is now 2 days old
        System.out.println(topologyTestDriver.readOutput(TOPIC, Serdes.Long().deserializer(), Serdes.String().deserializer()));
        // prints "null" because the app doesn't emit anything else
        System.out.println(topologyTestDriver.readOutput(TOPIC, Serdes.Long().deserializer(), Serdes.String().deserializer()));
    }
}