...
Could have been implemented with a combiner like query5 but deliberately implemented using Max(prices) as a side input and illustrate fanout.
Fanout is a redistribution using an intermediate implicit combine step to reduce the load in the final step of the Max transform
input: (fixed)windowed collection of bids events
ParDo to replacebids by their price
Max.withFanout to get the max per window and use it as a side input for next step. Fanout is useful if there are many events to be computed in a window using the Max transform.
ParDo on the bids with side input to output the bid if bid.price equals maxPrice (that comes from side input)
Expand | ||
---|---|---|
| ||
SELECT Rstream(B.auction, B.price, B.bidder) FROM Bid [RANGE 1 MINUTE SLIDE 1 MINUTE] B WHERE B.price = ( SELECT MAX(B1.price) FROM BID [RANGE 1 MINUTE SLIDE 1 MINUTE] B1); |
...
Query 8: Who has entered the system and created an auction in the last period?
Illustrates simple join
Filter+ ParDo to extractpersons out of events
Apply fixed windows to have a period
ParDo to key collection by personId
Filter+ ParDo to extractauctions out of events
Apply fixed windows to have a period
ParDo to key collection by sellerId
CoGroupByKey to grouppersons and auctions by personId/sellerId +tag persons and auctions
ParDo to output IdNameReserve(person.id, person.name, auction.reserve) for each auction
Expand | ||
---|---|---|
| ||
SELECT Rstream(P.id, P.name, A.reserve) FROM Person [RANGE 12 HOUR] P, Auction [RANGE 12 HOUR] A WHERE P.id = A.seller; |
Query 9 Winning-bids (not part of original NexMark): extract the most recent of the highest bids
Illustrates custom window function to reconcile auctions and bids + join them
input: collection of events
Apply custom windowingfunction to temporarilyreconcileauctions and bids events in the same customwindow (AuctionOrBidWindow)
assignauctions to window [auction.timestamp, auction.expiring]
assignbids to window [bid.timestamp, bid.timestamp + expectedAuctionDuration (generator configuration parameter)]
merge all 'bid'windows into their corresponding 'auction'window, provided the auction has not expired.
Filter + ParDos to extractauctions out of events and key them by auction id
Filter + ParDos to extractbids out of events and key them by auction id
CogroupByKey (groups values of PCollections<KV> that share the same key) to groupauctions and bids by auction id + tags to distinguish auctions and bids
ParDo to
determine bestbidprice: verification of validbid, sort prices by price ASC then time DESC and keep the max price
and output AuctionBid(auction, bestBid) objects
...
Query 10 (not part of original NexMark):Log all events to GCS files
windows with large side effects on firing
ParDo to key events by their shardId (number of shards is a config item)
Apply fixed windows with composite triggering that fires when each sub-triger (executed in order) fires
repeatedly
after at least maxLogEvents in pane
or finally when watermark pass the end of window
Repeatedly
after at least maxLogEvents in pane
or processing time pass the first element in pane + lateDelay
With allowedLateness of 1 day (so that any late date will stall the pipeline and be noticeable)
GroupByKey to group events by shardId
ParDo to construct the outputStreams (fileNames contain shardId) and encode each event to that outputStream + form pairs with key = null key and value = outputFile (represents a fileName with various added information)
apply fixed window with default trigger and lateness of 1 day to clear complex triggerring
GroupByKey all outputFiles together (they have the samekey) to have one file per window
ParDo to write all the lines to files in Google Cloud Storage
...
...
Query 11 (not part of original NexMark): How many bids did a user make in each session he was active?
Illustrates session windows + triggering on the bids collection
input: collection of bids events
ParDo to replacebids with their bidder id
Apply session windows with gap duration = windowDuration (configuration item) and triggerrepeatedly after at least nbEvents in pane => each window (i.e. session) will contain bid ids received since last windowDuration period of inactivity and materialized every nbEvents bids
Count.perElement to countbids per bidder id (number of occurrences of bidder id)
output idsPerSession(bidder, bidsCount) objects
...