Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Illustrates specialized combiner

    • Apply winning-bids

    • ParDo to key the winning-bids by sellerId

    • apply GlobalWindow + trigerring at each element (to have a continuous flow of updates at each new winning-bid)

    • Combine.perKey to calculate average of last 10 winning bids for each seller. Need specialized CombineFn because of 10 closed auctions

      • create Arraylist accumulators for chunks of data

      • add all elements of the chunks to the accumulators, sort them by bid timeStamp then pricekeeping last 10 elements

      • iteratively merge the accumulators until there is only one: just add all bids of all accumulators to a final accumulator and sort by timeStamp then pricekeeping last 10 elements

      • extractOutput: sum all the prices of the bids and divide by accumulator size

    • ParDo that outputs SellerPrice(sellerId, avgPrice)

Expand
titleSQL Interpretation
SELECT Istream(AVG(Q.final), Q.seller)
FROM (
 SELECT Rstream(MAX(B.price) AS final, A.seller)
 FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED]
 WHERE A.id=B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME GROUP BY A.id, A.seller) [PARTITION BY A.seller ROWS 10] Q
GROUP BY Q.seller;

...

Query 7: What are the highest bids per period?

  • Could have been implemented with a combiner like query5 but deliberately implemented using Max(prices) as a side input and illustrate fanout.

  • Fanout is a redistribution using an intermediate implicit combine step to reduce the load in the final step of the Max transform

    • input: (fixed)windowed collection of bids events

    • ParDo to replacebids by their price

    • Max.withFanout to get the max per window and use it as a side input for next step. Fanout is useful if there are many events to be computed in a window using the Max transform.

    • ParDo on the bids with side input to output the bid if bid.price equals maxPrice (that comes from side input)


...