Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Illustrates working in processing time in the Global window to count occurrences of bidder

    • input: collection of bid events

    • ParDo to replacebids by their bidder id

    • Apply global window with triggerrepeatedly after processingTime pass the first element in pane + windowDuration (configuration item) => each pane will contain elements processed within windowDuration time

    • Count.perElement to countbids per bidder id (occurrences of bidder id)

    • output BidsPerWindow(bidder, bidsCount) objects

Nexmark is a suite of queries (pipelines) used to measure performance and non-regression in Beam (see https://beam.apache.org/documentation/sdks/java/nexmark/).

Links to the original Nexmark papers:

Some queries can be very complex. To ease their maintenance, here is a presentation of the architecture along with pseudo-code of the queries:

Components of NexMark

Image Removed

  • Generator:

    • generation of timestamped events (bids, persons, auctions) correlated between each other

  • NexmarkLauncher:

    • creates sources that use the generator

    • queries pipelines launching, monitoring

  • Output metrics:

    • Each query includes ParDos to update metrics

    • execution time, processing event rate, number of results,                      but also invalid auctions/bids, …

  • Modes:

    • Batch mode: test data is finite and uses a BoundedSource

    • Streaming mode: test data is finite but uses an UnboundedSource to trigger streaming mode in runners

Queries pseudo code

Query 0 (not part of original NexMark): Pass-through.

  • Allows us to measure the monitoring overhead.

    • serializes and deserializes using coder

    • Uses Aggregator for byte size counter

Query 1: What are the bid values in Euro's? (Currency Conversion)

  • Simple map

    • Filter + ParDo to extract bids out of events

    • ParDo that outputs Bid objects with price converted

Expand
titleSQL Interpretation
SELECT Istream(auction, DOLTOEUR(price), bidder, datetime)
FROM bid [ROWS UNBOUNDED];

Query 2: Find bids with specific auction ids and show their bid price.

...

Illustrates simple filter

...

ParDo that outputs AuctionPrice(auction, price) objects

Expand
titleSQL Interpretation
SELECT Rstream(auction, price)
FROM Bid [NOW]
WHERE
 auction = 1007
 OR auction = 1020
 OR auction = 2001
 OR auction = 2019
 OR auction = 2087;
Query 3: Who is selling in particular US states?

...

Illustrates incremental join of the auctions and the persons collections

...

uses global window and using per-key state and timer APIs

...

input1: collection of auctions events filtered by category and keyed by seller id

...

input2: collection of persons events filtered by US state codes and keyed by person id

...

  • person element stored in persistent state in order to match future auctions by that person. Set a timer to clear the person state after a TTL

  • auction elements stored in persistent state until we have seen the corresponding person record. Then, it can be output and cleared

...

output NameCityStateId(person.name, person.city, person.state, auction.id) objects

...

titleSQL Interpretation
SELECT Istream(P.name, P.city, P.state, A.id) FROM Auction A [ROWS UNBOUNDED], Person P [ROWS UNBOUNDED] WHERE A.seller = P.id AND (P.state = `OR' OR P.state = `ID' OR P.state = `CA') AND A.category = 10;

Query 4: What is the average selling price for each auction category?

  • Illustrates sliding windows and aggregation

    • Apply Wining-bids

    • ParDo to key winning-bids by category

    • apply sliding windows to have a period of time

    • apply Mean.perKey (key = category)

    • ParDo that outputs CategoryPrice(categoryId, avgPrice)

Query 5: Which auctions have seen the most bids in the last period?

...

Illustrates sliding windows and combiners (i.e. reducers) to compare the elements in auctions Collection

...

Input: (sliding) window (to have a result over 1h period updated every 1 min) collection of bids events

...

Count.PerElement to count the occurrences of each auction id

...

Return KV(auction id, max occurrences)

...

Query 6:  What is the average selling price per seller for their last 10 closed auctions?

...

Illustrates specialized combiner

...

Apply winning-bids

...

ParDo to key the winning-bids by sellerId

...

apply GlobalWindow + trigerring at each element (to have a continuous flow of updates at each new winning-bid)

...

Combine.perKey to calculate average of last 10 winning bids for each seller. Need specialized CombineFn because of 10 closed auctions

...

create Arraylist accumulators for chunks of data

...

extractOutput: sum all the prices of the bids and divide by accumulator size

...

Query 7: What are the highest bids per period?

...

Could have been implemented with a combiner like query5 but deliberately implemented using Max(prices) as a side input and illustrate fanout.

...

Fanout is a redistribution using an intermediate implicit combine step to reduce the load in the final step of the Max transform

...

Max.withFanout to get the max per window and use it as a side input for next step. Fanout is useful if there are many events to be computed in a window using the Max transform.

...

ParDo on the bids with side input to output the bid if bid.price equals maxPrice (that comes from side input)

Query 8: Who has entered the system and created an auction in the last period?

...

Illustrates simple join

...

Apply fixed windows to have a period

...

ParDo to key collection by personId

...

Apply fixed windows to have a period

...

ParDo to key collection by sellerId

...

ParDo to output IdNameReserve(person.id, person.name, auction.reserve) for each auction

Query 9 Winning-bids (not part of original NexMark): extract the most recent of the highest bids

...

Illustrates custom window function to reconcile auctions and bids + join them

...

input: collection of events

...

ParDo to

...

and output AuctionBid(auction, bestBid) objects

Query 10 (not part of original NexMark):Log all events to GCS files

...

windows with large side effects on firing

...

ParDo to key events by their shardId (number of shards is a config item)

...

Apply fixed windows with composite triggering that fires when each sub-triger (executed in order) fires

  • repeatedly

    • after at least maxLogEvents in pane

    • or finally when watermark pass the end of window

  • Repeatedly

    • after at least maxLogEvents in pane

    • or processing time pass the first element in pane + lateDelay

  • With allowedLateness of 1 day (so that any late date will stall the pipeline and be noticeable)

...

GroupByKey to group events by shardId

...

ParDo to construct the outputStreams (fileNames contain shardId) and encode each event to that outputStream + form pairs with key = null key and value = outputFile (represents a fileName with various added information)

...

apply fixed window with default trigger and lateness of 1 day to clear complex triggerring

...

ParDo to write all the lines to files in Google Cloud Storage 

Query 11 (not part of original NexMark): How many bids did a user make in each session he was active?

...

Illustrates session windows + triggering on the bids collection

...

input: collection of bids events

...

output idsPerSession(bidder, bidsCount) objects

Query 12 (not part of original NexMark): How many bids does a user make within a fixed processing time limit?

...

Illustrates working in processing time in the Global window to count occurrences of bidder

...

input: collection of bid events

...

output BidsPerWindow(bidder, bidsCount) objects