Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


  • Generator:

    • generation of timestamped events (bids, persons, auctions) correlated between each other

  • NexmarkLauncher:

    • creates sources that use the generator

    • queries pipelines launching, monitoring

  • Output metrics:

    • Each query includes ParDos to update metrics

    • execution time, processing event rate, number of results,                      but also invalid auctions/bids, …

  • Modes:

    • Batch mode: test data is finite and uses a BoundedSource

    • Streaming mode: test data is finite but uses an UnboundedSource to trigger streaming mode in runners

Queries pseudo code

Query 0 (not part of original NexMark): Pass-through.


titleSQL Interpretation
SELECT Istream(AVG( 
FROM Category C, (
 SELECT Rstream(MAX(B.price) AS final, A.category)
 WHERE AND B.datetime < A.expires AND A.expires < CURRENT_TIME GROUP BY, A.category) Q 
WHERE Q.category = GROUP BY;


  • Illustrates sliding windows and combiners (i.e. reducers) to compare the elements in auctions Collection

    • Input: (sliding) window (to have a result over 1h period updated every 1 min) collection of bids events

    • ParDo to replacebid elements by their auction id

    • Count.PerElement to count the occurrences of each auction id

    • Combine.globally to select only the auctions with the maximumnumber of bids

      • BinaryCombineFnto compare one to one the elements of the collection (auction id occurrences, i.e. number of bids)

      • Return KV(auction id, max occurrences)

    • output:AuctionCount(auction id, max occurrences) objects

Query 6:  What is the average selling price per seller for their last 10 closed auctions?


Illustrates specialized combiner


Apply winning-bids


ParDo to key the winning-bids by sellerId


apply GlobalWindow + trigerring at each element (to have a continuous flow of updates at each new winning-bid)


Combine.perKey to calculate average of last 10 winning bids for each seller. Need specialized CombineFn because of 10 closed auctions


create Arraylist accumulators for chunks of data


extractOutput: sum all the prices of the bids and divide by accumulator size


Query 7: What are the highest bids per period?


Could have been implemented with a combiner like query5 but deliberately implemented using Max(prices) as a side input and illustrate fanout.


Fanout is a redistribution using an intermediate implicit combine step to reduce the load in the final step of the Max transform


Max.withFanout to get the max per window and use it as a side input for next step. Fanout is useful if there are many events to be computed in a window using the Max transform.

ParDo on the bids with side input to output the bid if bid.price equals maxPrice (that comes from side input)

Query 8: Who has entered the system and created an auction in the last period?


Illustrates simple join


Apply fixed windows to have a period


ParDo to key collection by personId


Apply fixed windows to have a period


ParDo to key collection by sellerId


ParDo to output IdNameReserve(,, auction.reserve) for each auction

Query 9 Winning-bids (not part of original NexMark): extract the most recent of the highest bids


Illustrates custom window function to reconcile auctions and bids + join them


input: collection of events


ParDo to


and output AuctionBid(auction, bestBid) objects

Query 10 (not part of original NexMark):Log all events to GCS files


windows with large side effects on firing


ParDo to key events by their shardId (number of shards is a config item)


Apply fixed windows with composite triggering that fires when each sub-triger (executed in order) fires

  • repeatedly

    • after at least maxLogEvents in pane

    • or finally when watermark pass the end of window

  • Repeatedly

    • after at least maxLogEvents in pane

    • or processing time pass the first element in pane + lateDelay

  • With allowedLateness of 1 day (so that any late date will stall the pipeline and be noticeable)


GroupByKey to group events by shardId


ParDo to construct the outputStreams (fileNames contain shardId) and encode each event to that outputStream + form pairs with key = null key and value = outputFile (represents a fileName with various added information)


apply fixed window with default trigger and lateness of 1 day to clear complex triggerring


ParDo to write all the lines to files in Google Cloud Storage


Query 11 (not part of original NexMark): How many bids did a user make in each session he was active?


Illustrates session windows + triggering on the bids collection


input: collection of bids events


output idsPerSession(bidder, bidsCount) objects

Query 12 (not part of original NexMark): How many bids does a user make within a fixed processing time limit?


Illustrates working in processing time in the Global window to count occurrences of bidder


input: collection of bid events


output BidsPerWindow(bidder, bidsCount) objects

Nexmark is a suite of queries (pipelines) used to measure performance and non-regression in Beam (see

Links to the original Nexmark papers:

Some queries can be very complex. To ease their maintenance, here is a presentation of the architecture along with pseudo-code of the queries:

Components of NexMark

Image Removed

  • Generator:

    • generation of timestamped events (bids, persons, auctions) correlated between each other

  • NexmarkLauncher:

    • creates sources that use the generator

    • queries pipelines launching, monitoring

  • Output metrics:

    • Each query includes ParDos to update metrics

    • execution time, processing event rate, number of results,                      but also invalid auctions/bids, …

  • Modes:

    • Batch mode: test data is finite and uses a BoundedSource

    • Streaming mode: test data is finite but uses an UnboundedSource to trigger streaming mode in runners

Queries pseudo code

Query 0 (not part of original NexMark): Pass-through.

  • Allows us to measure the monitoring overhead.

    • serializes and deserializes using coder

    • Uses Aggregator for byte size counter

Query 1: What are the bid values in Euro's? (Currency Conversion)

  • Simple map

    • Filter + ParDo to extract bids out of events

    • ParDo that outputs Bid objects with price converted

titleSQL Interpretation
SELECT Istream(auction, DOLTOEUR(price), bidder, datetime)

Query 2: Find bids with specific auction ids and show their bid price.


Illustrates simple filter


ParDo that outputs AuctionPrice(auction, price) objects

titleSQL Interpretation
SELECT Rstream(auction, price)
 auction = 1007
 OR auction = 1020
 OR auction = 2001
 OR auction = 2019
 OR auction = 2087;
Query 3: Who is selling in particular US states?


Illustrates incremental join of the auctions and the persons collections


uses global window and using per-key state and timer APIs


input1: collection of auctions events filtered by category and keyed by seller id


input2: collection of persons events filtered by US state codes and keyed by person id


  • person element stored in persistent state in order to match future auctions by that person. Set a timer to clear the person state after a TTL

  • auction elements stored in persistent state until we have seen the corresponding person record. Then, it can be output and cleared


output NameCityStateId(,, person.state, objects


titleSQL Interpretation

titleSQL Interpretation
SELECT Rstream(auction)
 SELECT, count(*) AS num
WHERE num >= ALL (
 SELECT count(*)


Query 6:  What is the average selling price per seller for their last 10 closed auctions?

  • Illustrates specialized combiner

    • Apply winning-bids

    • ParDo to key the winning-bids by sellerId

    • apply GlobalWindow + trigerring at each element (to have a continuous flow of updates at each new winning-bid)

    • Combine.perKey to calculate average of last 10 winning bids for each seller. Need specialized CombineFn because of 10 closed auctions

      • create Arraylist accumulators for chunks of data

      • add all elements of the chunks to the accumulators, sort them by bid timeStamp then pricekeeping last 10 elements

      • iteratively merge the accumulators until there is only one: just add all bids of all accumulators to a final accumulator and sort by timeStamp then pricekeeping last 10 elements

      • extractOutput: sum all the prices of the bids and divide by accumulator size

    • ParDo that outputs SellerPrice(sellerId, avgPrice)

titleSQL Interpretation
SELECT Istream(AVG(, Q.seller)
 SELECT Rstream(MAX(B.price) AS final, A.seller)
 WHERE AND B.datetime < A.expires AND A.expires < CURRENT_TIME GROUP BY, A.seller) [PARTITION BY A.seller ROWS 10] Q
GROUP BY Q.seller;


Query 7: What are the highest bids per period?

  • Could have been implemented with a combiner like query5 but deliberately implemented using Max(prices) as a side input and illustrate fanout.

  • Fanout is a redistribution using an intermediate implicit combine step to reduce the load in the final step of the Max transform

    • input: (fixed)windowed collection of bids events

    • ParDo to replacebids by their price

    • Max.withFanout to get the max per window and use it as a side input for next step. Fanout is useful if there are many events to be computed in a window using the Max transform.

    • ParDo on the bids with side input to output the bid if bid.price equals maxPrice (that comes from side input)

titleSQL Interpretation
SELECT Rstream(, B.price, B.bidder)
WHERE B.price = (
 SELECT MAX(B1.price)


SELECT Istream(,, P.state, FROM Auction A [ROWS UNBOUNDED], Person P [ROWS UNBOUNDED] WHERE A.seller = AND (P.state = `OR' OR P.state = `ID' OR P.state = `CA') AND A.category = 10;

Query 4: What is the average selling price for each auction category?

  • Illustrates sliding windows and aggregation

    • Apply Wining-bids

    • ParDo to key winning-bids by category

    • apply sliding windows to have a period of time

    • apply Mean.perKey (key = category)

    • ParDo that outputs CategoryPrice(categoryId, avgPrice)

Query 5: Which auctions have seen the most bids in the last period?


Illustrates sliding windows and combiners (i.e. reducers) to compare the elements in auctions Collection


Input: (sliding) window (to have a result over 1h period updated every 1 min) collection of bids events


Count.PerElement to count the occurrences of each auction id


Return KV(auction id, max occurrences)


Query 6:  What is the average selling price per seller for their last 10 closed auctions?


Illustrates specialized combiner


Apply winning-bids


ParDo to key the winning-bids by sellerId


apply GlobalWindow + trigerring at each element (to have a continuous flow of updates at each new winning-bid)


Combine.perKey to calculate average of last 10 winning bids for each seller. Need specialized CombineFn because of 10 closed auctions


create Arraylist accumulators for chunks of data


extractOutput: sum all the prices of the bids and divide by accumulator size


Query 7: What are the highest bids per period?


Could have been implemented with a combiner like query5 but deliberately implemented using Max(prices) as a side input and illustrate fanout.


Fanout is a redistribution using an intermediate implicit combine step to reduce the load in the final step of the Max transform


Max.withFanout to get the max per window and use it as a side input for next step. Fanout is useful if there are many events to be computed in a window using the Max transform.


ParDo on the bids with side input to output the bid if bid.price equals maxPrice (that comes from side input)

Query 8: Who has entered the system and created an auction in the last period?

  • Illustrates simple join

    • Filter+ ParDo to extractpersons out of events

    • Apply fixed windows to have a period

    • ParDo to key collection by personId

    • Filter+ ParDo to extractauctions out of events

    • Apply fixed windows to have a period

    • ParDo to key collection by sellerId

    • CoGroupByKey to grouppersons and auctions by personId/sellerId +tag persons and auctionsParDo to output IdNameReserve(,, auction.reserve) for each auction

    • ParDo to output IdNameReserve(,, auction.reserve) for each auction

titleSQL Interpretation
SELECT Rstream(,, A.reserve)
FROM Person [RANGE 12 HOUR] P, Auction [RANGE 12 HOUR] A
WHERE = A.seller;

Query 9 Winning-bids (not part of original NexMark): extract the most recent of the highest bids

  • Illustrates custom window function to reconcile auctions and bids + join them

    • input: collection of events

    • Apply custom windowingfunction to temporarilyreconcileauctions and bids events in the same customwindow (AuctionOrBidWindow)

      • assignauctions to window [auction.timestamp, auction.expiring]

      • assignbids to window [bid.timestamp, bid.timestamp + expectedAuctionDuration (generator configuration parameter)]

      • merge all 'bid'windows into their corresponding 'auction'window, provided the auction has not expired.

    • Filter + ParDos to extractauctions out of events and key them by auction id

    • Filter + ParDos to extractbids out of events and key them by auction id

    • CogroupByKey (groups values of PCollections<KV> that share the same key) to groupauctions and bids by auction id + tags to distinguish auctions and bids

    • ParDo to

      • determine bestbidprice: verification of validbid, sort prices by price ASC then time DESC and keep the max price

      • and output AuctionBid(auction, bestBid) objects


Query 10 (not part of original NexMark):Log all events to GCS files

  • windows with large side effects on firing

    • ParDo to key events by their shardId (number of shards is a config item)

    • Apply fixed windows with composite triggering that fires when each sub-triger (executed in order) fires

      • repeatedly

        • after at least maxLogEvents in pane

        • or finally when watermark pass the end of window

      • Repeatedly

        • after at least maxLogEvents in pane

        • or processing time pass the first element in pane + lateDelay

      • With allowedLateness of 1 day (so that any late date will stall the pipeline and be noticeable)

    • GroupByKey to group events by shardId

    • ParDo to construct the outputStreams (fileNames contain shardId) and encode each event to that outputStream + form pairs with key = null key and value = outputFile (represents a fileName with various added information)

    • apply fixed window with default trigger and lateness of 1 day to clear complex triggerring

    • GroupByKey all outputFiles together (they have the samekey) to have one file per window

    • ParDo to write all the lines to files in Google Cloud Storage 


Query 11 (not part of original NexMark): How many bids did a user make in each session he was active?

  • Illustrates session windows + triggering on the bids collection

    • input: collection of bids events

    • ParDo to replacebids with their bidder id

    • Apply session windows with gap duration = windowDuration (configuration item) and triggerrepeatedly after at least nbEvents in pane => each window (i.e. session) will contain bid ids received since last windowDuration period of inactivity and materialized every nbEvents bids

    • Count.perElement to countbids per bidder id (number of occurrences of bidder id)

    • output idsPerSession(bidder, bidsCount) objects
