...
Illustrates specialized combiner
Apply winning-bids
ParDo to key the winning-bids by sellerId
apply GlobalWindow + trigerring at each element (to have a continuous flow of updates at each new winning-bid)
Combine.perKey to calculate average of last 10 winning bids for each seller. Need specialized CombineFn because of 10 closed auctions
create Arraylist accumulators for chunks of data
add all elements of the chunks to the accumulators, sort them by bid timeStamp then pricekeeping last 10 elements
iteratively merge the accumulators until there is only one: just add all bids of all accumulators to a final accumulator and sort by timeStamp then pricekeeping last 10 elements
extractOutput: sum all the prices of the bids and divide by accumulator size
ParDo that outputs SellerPrice(sellerId, avgPrice)
Expand | ||
---|---|---|
| ||
SELECT Istream(AVG(Q.final), Q.seller) FROM ( SELECT Rstream(MAX(B.price) AS final, A.seller) FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED] WHERE A.id=B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME GROUP BY A.id, A.seller) [PARTITION BY A.seller ROWS 10] Q GROUP BY Q.seller; |
...
Query 7: What are the highest bids per period?
Could have been implemented with a combiner like query5 but deliberately implemented using Max(prices) as a side input and illustrate fanout.
Fanout is a redistribution using an intermediate implicit combine step to reduce the load in the final step of the Max transform
input: (fixed)windowed collection of bids events
ParDo to replacebids by their price
Max.withFanout to get the max per window and use it as a side input for next step. Fanout is useful if there are many events to be computed in a window using the Max transform.
ParDo on the bids with side input to output the bid if bid.price equals maxPrice (that comes from side input)
...