...
Generator:
generation of timestamped events (bids, persons, auctions) correlated between each other
NexmarkLauncher:
creates sources that use the generator
queries pipelines launching, monitoring
Output metrics:
Each query includes ParDos to update metrics
execution time, processing event rate, number of results, but also invalid auctions/bids, …
Modes:
Batch mode: test data is finite and uses a BoundedSource
Streaming mode: test data is finite but uses an UnboundedSource to trigger streaming mode in runners
Queries pseudo code
Query 0 (not part of original NexMark): Pass-through.
...
Illustrates sliding windows and combiners (i.e. reducers) to compare the elements in auctions Collection
Input: (sliding) window (to have a result over 1h period updated every 1 min) collection of bids events
ParDo to replacebid elements by their auction id
Count.PerElement to count the occurrences of each auction id
Combine.globally to select only the auctions with the maximumnumber of bids
BinaryCombineFnto compare one to one the elements of the collection (auction id occurrences, i.e. number of bids)
Return KV(auction id, max occurrences)
output:AuctionCount(auction id, max occurrences) objects
Query 6: What is the average selling price per seller for their last 10 closed auctions?
...
Illustrates specialized combiner
...
Apply winning-bids
...
ParDo to key the winning-bids by sellerId
...
apply GlobalWindow + trigerring at each element (to have a continuous flow of updates at each new winning-bid)
...
Combine.perKey to calculate average of last 10 winning bids for each seller. Need specialized CombineFn because of 10 closed auctions
...
create Arraylist accumulators for chunks of data
...
extractOutput: sum all the prices of the bids and divide by accumulator size
...
Query 7: What are the highest bids per period?
...
Could have been implemented with a combiner like query5 but deliberately implemented using Max(prices) as a side input and illustrate fanout.
...
Fanout is a redistribution using an intermediate implicit combine step to reduce the load in the final step of the Max transform
...
Max.withFanout to get the max per window and use it as a side input for next step. Fanout is useful if there are many events to be computed in a window using the Max transform.
ParDo on the bids with side input to output the bid if bid.price equals maxPrice (that comes from side input)
Query 8: Who has entered the system and created an auction in the last period?
...
Illustrates simple join
...
Apply fixed windows to have a period
...
ParDo to key collection by personId
...
Apply fixed windows to have a period
...
ParDo to key collection by sellerId
...
ParDo to output IdNameReserve(person.id, person.name, auction.reserve) for each auction
Query 9 Winning-bids (not part of original NexMark): extract the most recent of the highest bids
...
Illustrates custom window function to reconcile auctions and bids + join them
...
input: collection of events
...
ParDo to
...
and output AuctionBid(auction, bestBid) objects
Query 10 (not part of original NexMark):Log all events to GCS files
...
windows with large side effects on firing
...
ParDo to key events by their shardId (number of shards is a config item)
...
Apply fixed windows with composite triggering that fires when each sub-triger (executed in order) fires
repeatedly
after at least maxLogEvents in pane
or finally when watermark pass the end of window
Repeatedly
after at least maxLogEvents in pane
or processing time pass the first element in pane + lateDelay
With allowedLateness of 1 day (so that any late date will stall the pipeline and be noticeable)
...
GroupByKey to group events by shardId
...
ParDo to construct the outputStreams (fileNames contain shardId) and encode each event to that outputStream + form pairs with key = null key and value = outputFile (represents a fileName with various added information)
...
apply fixed window with default trigger and lateness of 1 day to clear complex triggerring
...
ParDo to write all the lines to files in Google Cloud Storage
...
Query 11 (not part of original NexMark): How many bids did a user make in each session he was active?
...
Illustrates session windows + triggering on the bids collection
...
input: collection of bids events
...
output idsPerSession(bidder, bidsCount) objects
Query 12 (not part of original NexMark): How many bids does a user make within a fixed processing time limit?
...
Illustrates working in processing time in the Global window to count occurrences of bidder
...
input: collection of bid events
...
output BidsPerWindow(bidder, bidsCount) objects
Nexmark is a suite of queries (pipelines) used to measure performance and non-regression in Beam (see https://beam.apache.org/documentation/sdks/java/nexmark/).
Links to the original Nexmark papers:
Some queries can be very complex. To ease their maintenance, here is a presentation of the architecture along with pseudo-code of the queries:
Components of NexMark
Generator:
generation of timestamped events (bids, persons, auctions) correlated between each other
NexmarkLauncher:
creates sources that use the generator
queries pipelines launching, monitoring
Output metrics:
Each query includes ParDos to update metrics
execution time, processing event rate, number of results, but also invalid auctions/bids, …
Modes:
Batch mode: test data is finite and uses a BoundedSource
Streaming mode: test data is finite but uses an UnboundedSource to trigger streaming mode in runners
Queries pseudo code
Query 0 (not part of original NexMark): Pass-through.
Allows us to measure the monitoring overhead.
serializes and deserializes using coder
Uses Aggregator for byte size counter
Query 1: What are the bid values in Euro's? (Currency Conversion)
Simple map
Filter + ParDo to extract bids out of events
ParDo that outputs Bid objects with price converted
Expand | ||
---|---|---|
| ||
SELECT Istream(auction, DOLTOEUR(price), bidder, datetime) FROM bid [ROWS UNBOUNDED]; |
Query 2: Find bids with specific auction ids and show their bid price.
...
Illustrates simple filter
...
ParDo that outputs AuctionPrice(auction, price) objects
Expand | ||
---|---|---|
| ||
SELECT Rstream(auction, price) FROM Bid [NOW] WHERE auction = 1007 OR auction = 1020 OR auction = 2001 OR auction = 2019 OR auction = 2087; |
Query 3: Who is selling in particular US states?
...
Illustrates incremental join of the auctions and the persons collections
...
uses global window and using per-key state and timer APIs
...
input1: collection of auctions events filtered by category and keyed by seller id
...
input2: collection of persons events filtered by US state codes and keyed by person id
...
person element stored in persistent state in order to match future auctions by that person. Set a timer to clear the person state after a TTL
auction elements stored in persistent state until we have seen the corresponding person record. Then, it can be output and cleared
...
output NameCityStateId(person.name, person.city, person.state, auction.id) objects
...
title | SQL Interpretation |
---|
Expand | ||
---|---|---|
| ||
SELECT Rstream(auction) FROM ( SELECT B1.auction, count(*) AS num FROM Bid [RANGE 60 MINUTE SLIDE 1 MINUTE] B1 GROUP BY B1.auction) WHERE num >= ALL ( SELECT count(*) FROM Bid [RANGE 60 MINUTE SLIDE 1 MINUTE] B2 GROUP BY B2.auction); |
...
Query 6: What is the average selling price per seller for their last 10 closed auctions?
Illustrates specialized combiner
Apply winning-bids
ParDo to key the winning-bids by sellerId
apply GlobalWindow + trigerring at each element (to have a continuous flow of updates at each new winning-bid)
Combine.perKey to calculate average of last 10 winning bids for each seller. Need specialized CombineFn because of 10 closed auctions
create Arraylist accumulators for chunks of data
add all elements of the chunks to the accumulators, sort them by bid timeStamp then pricekeeping last 10 elements
iteratively merge the accumulators until there is only one: just add all bids of all accumulators to a final accumulator and sort by timeStamp then pricekeeping last 10 elements
extractOutput: sum all the prices of the bids and divide by accumulator size
ParDo that outputs SellerPrice(sellerId, avgPrice)
Expand | ||
---|---|---|
| ||
SELECT Istream(AVG(Q.final), Q.seller) FROM ( SELECT Rstream(MAX(B.price) AS final, A.seller) FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED] WHERE A.id=B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME GROUP BY A.id, A.seller) [PARTITION BY A.seller ROWS 10] Q GROUP BY Q.seller; |
...
Query 7: What are the highest bids per period?
Could have been implemented with a combiner like query5 but deliberately implemented using Max(prices) as a side input and illustrate fanout.
Fanout is a redistribution using an intermediate implicit combine step to reduce the load in the final step of the Max transform
input: (fixed)windowed collection of bids events
ParDo to replacebids by their price
Max.withFanout to get the max per window and use it as a side input for next step. Fanout is useful if there are many events to be computed in a window using the Max transform.
ParDo on the bids with side input to output the bid if bid.price equals maxPrice (that comes from side input)
Expand | ||
---|---|---|
| ||
SELECT Rstream(B.auction, B.price, B.bidder) FROM Bid [RANGE 1 MINUTE SLIDE 1 MINUTE] B WHERE B.price = ( SELECT MAX(B1.price) FROM BID [RANGE 1 MINUTE SLIDE 1 MINUTE] B1); |
...
SELECT Istream(P.name, P.city, P.state, A.id) FROM Auction A [ROWS UNBOUNDED], Person P [ROWS UNBOUNDED] WHERE A.seller = P.id AND (P.state = `OR' OR P.state = `ID' OR P.state = `CA') AND A.category = 10;
Query 4: What is the average selling price for each auction category?
Illustrates sliding windows and aggregation
Apply Wining-bids
ParDo to key winning-bids by category
apply sliding windows to have a period of time
apply Mean.perKey (key = category)
ParDo that outputs CategoryPrice(categoryId, avgPrice)
Query 5: Which auctions have seen the most bids in the last period?
...
Illustrates sliding windows and combiners (i.e. reducers) to compare the elements in auctions Collection
...
Input: (sliding) window (to have a result over 1h period updated every 1 min) collection of bids events
...
Count.PerElement to count the occurrences of each auction id
...
Return KV(auction id, max occurrences)
...
Query 6: What is the average selling price per seller for their last 10 closed auctions?
...
Illustrates specialized combiner
...
Apply winning-bids
...
ParDo to key the winning-bids by sellerId
...
apply GlobalWindow + trigerring at each element (to have a continuous flow of updates at each new winning-bid)
...
Combine.perKey to calculate average of last 10 winning bids for each seller. Need specialized CombineFn because of 10 closed auctions
...
create Arraylist accumulators for chunks of data
...
extractOutput: sum all the prices of the bids and divide by accumulator size
...
Query 7: What are the highest bids per period?
...
Could have been implemented with a combiner like query5 but deliberately implemented using Max(prices) as a side input and illustrate fanout.
...
Fanout is a redistribution using an intermediate implicit combine step to reduce the load in the final step of the Max transform
...
Max.withFanout to get the max per window and use it as a side input for next step. Fanout is useful if there are many events to be computed in a window using the Max transform.
...
ParDo on the bids with side input to output the bid if bid.price equals maxPrice (that comes from side input)
Query 8: Who has entered the system and created an auction in the last period?
Illustrates simple join
Filter+ ParDo to extractpersons out of events
Apply fixed windows to have a period
ParDo to key collection by personId
Filter+ ParDo to extractauctions out of events
Apply fixed windows to have a period
ParDo to key collection by sellerId
CoGroupByKey to grouppersons and auctions by personId/sellerId +tag persons and auctionsParDo to output IdNameReserve(person.id, person.name, auction.reserve) for each auction
ParDo to output IdNameReserve(person.id, person.name, auction.reserve) for each auction
Expand | ||
---|---|---|
| ||
SELECT Rstream(P.id, P.name, A.reserve) FROM Person [RANGE 12 HOUR] P, Auction [RANGE 12 HOUR] A WHERE P.id = A.seller; |
Query 9 Winning-bids (not part of original NexMark): extract the most recent of the highest bids
Illustrates custom window function to reconcile auctions and bids + join them
input: collection of events
Apply custom windowingfunction to temporarilyreconcileauctions and bids events in the same customwindow (AuctionOrBidWindow)
assignauctions to window [auction.timestamp, auction.expiring]
assignbids to window [bid.timestamp, bid.timestamp + expectedAuctionDuration (generator configuration parameter)]
merge all 'bid'windows into their corresponding 'auction'window, provided the auction has not expired.
Filter + ParDos to extractauctions out of events and key them by auction id
Filter + ParDos to extractbids out of events and key them by auction id
CogroupByKey (groups values of PCollections<KV> that share the same key) to groupauctions and bids by auction id + tags to distinguish auctions and bids
ParDo to
determine bestbidprice: verification of validbid, sort prices by price ASC then time DESC and keep the max price
and output AuctionBid(auction, bestBid) objects
...
Query 10 (not part of original NexMark):Log all events to GCS files
windows with large side effects on firing
ParDo to key events by their shardId (number of shards is a config item)
Apply fixed windows with composite triggering that fires when each sub-triger (executed in order) fires
repeatedly
after at least maxLogEvents in pane
or finally when watermark pass the end of window
Repeatedly
after at least maxLogEvents in pane
or processing time pass the first element in pane + lateDelay
With allowedLateness of 1 day (so that any late date will stall the pipeline and be noticeable)
GroupByKey to group events by shardId
ParDo to construct the outputStreams (fileNames contain shardId) and encode each event to that outputStream + form pairs with key = null key and value = outputFile (represents a fileName with various added information)
apply fixed window with default trigger and lateness of 1 day to clear complex triggerring
GroupByKey all outputFiles together (they have the samekey) to have one file per window
ParDo to write all the lines to files in Google Cloud Storage
...
Query 11 (not part of original NexMark): How many bids did a user make in each session he was active?
Illustrates session windows + triggering on the bids collection
input: collection of bids events
ParDo to replacebids with their bidder id
Apply session windows with gap duration = windowDuration (configuration item) and triggerrepeatedly after at least nbEvents in pane => each window (i.e. session) will contain bid ids received since last windowDuration period of inactivity and materialized every nbEvents bids
Count.perElement to countbids per bidder id (number of occurrences of bidder id)
output idsPerSession(bidder, bidsCount) objects
...