Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Could have been implemented with a combiner like query5 but deliberately implemented using Max(prices) as a side input and illustrate fanout.

  • Fanout is a redistribution using an intermediate implicit combine step to reduce the load in the final step of the Max transform

    • input: (fixed)windowed collection of bids events

    • ParDo to replacebids by their price

    • Max.withFanout to get the max per window and use it as a side input for next step. Fanout is useful if there are many events to be computed in a window using the Max transform.

    • ParDo on the bids with side input to output the bid if bid.price equals maxPrice (that comes from side input)


Expand
titleSQL Interpretation
SELECT Rstream(B.auction, B.price, B.bidder)
FROM Bid [RANGE 1 MINUTE SLIDE 1 MINUTE] B
WHERE B.price = (
 SELECT MAX(B1.price)
 FROM BID [RANGE 1 MINUTE SLIDE 1 MINUTE] B1);

...

Query 8: Who has entered the system and created an auction in the last period?

  • Illustrates simple join

    • Filter+ ParDo to extractpersons out of events

    • Apply fixed windows to have a period

    • ParDo to key collection by personId

    • Filter+ ParDo to extractauctions out of events

    • Apply fixed windows to have a period

    • ParDo to key collection by sellerId

    • CoGroupByKey to grouppersons and auctions by personId/sellerId +tag persons and auctions

    • ParDo to output IdNameReserve(person.id, person.name, auction.reserve) for each auction



Expand
titleSQL Interpretation
SELECT Rstream(P.id, P.name, A.reserve)
FROM Person [RANGE 12 HOUR] P, Auction [RANGE 12 HOUR] A
WHERE P.id = A.seller;


Query 9 Winning-bids (not part of original NexMark): extract the most recent of the highest bids

  • Illustrates custom window function to reconcile auctions and bids + join them

    • input: collection of events

    • Apply custom windowingfunction to temporarilyreconcileauctions and bids events in the same customwindow (AuctionOrBidWindow)

      • assignauctions to window [auction.timestamp, auction.expiring]

      • assignbids to window [bid.timestamp, bid.timestamp + expectedAuctionDuration (generator configuration parameter)]

      • merge all 'bid'windows into their corresponding 'auction'window, provided the auction has not expired.

    • Filter + ParDos to extractauctions out of events and key them by auction id

    • Filter + ParDos to extractbids out of events and key them by auction id

    • CogroupByKey (groups values of PCollections<KV> that share the same key) to groupauctions and bids by auction id + tags to distinguish auctions and bids

    • ParDo to

      • determine bestbidprice: verification of validbid, sort prices by price ASC then time DESC and keep the max price

      • and output AuctionBid(auction, bestBid) objects

...

Query 10 (not part of original NexMark):Log all events to GCS files

  • windows with large side effects on firing

    • ParDo to key events by their shardId (number of shards is a config item)

    • Apply fixed windows with composite triggering that fires when each sub-triger (executed in order) fires

      • repeatedly

        • after at least maxLogEvents in pane

        • or finally when watermark pass the end of window

      • Repeatedly

        • after at least maxLogEvents in pane

        • or processing time pass the first element in pane + lateDelay

      • With allowedLateness of 1 day (so that any late date will stall the pipeline and be noticeable)

    • GroupByKey to group events by shardId

    • ParDo to construct the outputStreams (fileNames contain shardId) and encode each event to that outputStream + form pairs with key = null key and value = outputFile (represents a fileName with various added information)

    • apply fixed window with default trigger and lateness of 1 day to clear complex triggerring

    • GroupByKey all outputFiles together (they have the samekey) to have one file per window

    • ParDo to write all the lines to files in Google Cloud Storage

...

...

Query 11 (not part of original NexMark): How many bids did a user make in each session he was active?

  • Illustrates session windows + triggering on the bids collection

    • input: collection of bids events

    • ParDo to replacebids with their bidder id

    • Apply session windows with gap duration = windowDuration (configuration item) and triggerrepeatedly after at least nbEvents in pane => each window (i.e. session) will contain bid ids received since last windowDuration period of inactivity and materialized every nbEvents bids

    • Count.perElement to countbids per bidder id (number of occurrences of bidder id)

    • output idsPerSession(bidder, bidsCount) objects

...