THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
KTable<Windowed<Key>, Value> oneMinuteWindowed = // where Key and Value stand for your actual key and value types yourKStream .groupByKey() .reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000), "store1m"); //where your adder can be as simple as (val, agg) -> agg + val //for primitive types or as complex as you need KTable<Windowed<Key>, Value> fiveMinuteWindowed = oneMinuteWindowed .groupBy( (windowedKey, value) -> new KeyValue<>( new Windowed<>( windowedKey.key(), new Window<>( windowedKey.window().start() /1000/60/5 *1000*60*5, windowedKey.window().start() /1000/60/5 *1000*60*5 + 1000*60*5 // the above rounds time down to a timestamp divisible by 5 minutes ) ), value ), /* your key serde */, /* your value serde */ ) .reduce(/*your adder*/, /*your subtractor*/, "store5m"); KTable<Windowed<Key>, Value> fifteenMinuteWindowed = fiveMinuteWindowed .groupBy( (windowedKey, value) -> new KeyValue<>( new Windowed<>( windowedKey.key(), new Window<>( windowedKey.window().start() /1000/60/15 *1000*60*15, windowedKey.window().start() /1000/60/15 *1000*60*15 + 1000*60*15 // the above rounds time down to a timestamp divisible by 15 minutes ) ), value ), /* your key serde */, /* your value serde */ ) .reduce(/*your adder*/, /*your subtractor*/, "store15m"); KTable<Windowed<Key>, Value> sixtyMinuteWindowed = fifteeenMinuteWindowed .groupBy( (windowedKey, value) -> new KeyValue<>( new Windowed<>( windowedKey.key(), new Window<>( windowedKey.window().start() /1000/60/60 *1000*60*60, windowedKey.window().start() /1000/60/60 *1000*60*60 + 1000*60*60 // the above rounds time down to a timestamp divisible by 560 minutes ) ), value ), /* your key serde */, /* your value serde */ ) .reduce(/*your adder*/, /*your subtractor*/, "store60m"); |
...