...
Illustrates sliding windows and combiners (i.e. reducers) to compare the elements in auctions Collection
Input: (sliding) window (to have a result over 1h period updated every 1 min) collection of bids events
ParDo to replacebid elements by their auction id
Count.PerElement to count the occurrences of each auction id
Combine.globally to select only the auctions with the maximumnumber of bids
BinaryCombineFnto compare one to one the elements of the collection (auction id occurrences, i.e. number of bids)
Return KV(auction id, max occurrences)
output:AuctionCount(auction id, max occurrences) objects
Expand | ||
---|---|---|
| ||
SELECT Rstream(auction) FROM ( SELECT B1.auction, count(*) AS num FROM Bid [RANGE 60 MINUTE SLIDE 1 MINUTE] B1 GROUP BY B1.auction) WHERE num >= ALL ( SELECT count(*) FROM Bid [RANGE 60 MINUTE SLIDE 1 MINUTE] B2 GROUP BY B2.auction); |
...
Query 6: What is the average selling price per seller for their last 10 closed auctions?
Illustrates specialized combiner
Apply winning-bids
ParDo to key the winning-bids by sellerId
apply GlobalWindow + trigerring at each element (to have a continuous flow of updates at each new winning-bid)
Combine.perKey to calculate average of last 10 winning bids for each seller. Need specialized CombineFn because of 10 closed auctions
create Arraylist accumulators for chunks of data
add all elements of the chunks to the accumulators, sort them by bid timeStamp then pricekeeping last 10 elements
iteratively merge the accumulators until there is only one: just add all bids of all accumulators to a final accumulator and sort by timeStamp then pricekeeping last 10 elements
extractOutput: sum all the prices of the bids and divide by accumulator size
ParDo that outputs SellerPrice(sellerId, avgPrice)
...