Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In order to support Partial updates, we will need to add an update API in ReadWriteTable and related interfaces. Update is a variant of write but sometimes works with a different record type when compared to Write record type. AsyncReadWriteTable works with generic KV where K is the key type and V is the value type of data in the Table. We need to add a generic type U to represent an update type. Adding another generic type parameter to denote an update is a backward incompatible change and would result in changes all across the Table API.  The type K, V, U for a given table will be fixed.

Samza Table API Changes with Partial Update

...

  • Add new update methods to Table API interfaces- AsyncReadWriteTable & TableWriteFunction
  • Add sendUpdateTo sendTo method with UpdateContract to MessageStream API. This will be used to send updates to a table
  • UpdateContract defines the update contract that is used in the send-to-with-update operation
  • Create a new operator spec and implementation for a “send update to table” operation on a MessageStream
    • SendUpdateToTableOperatorSpec
      SendToTableWithUpdateOperatorSpec
    • SendUpdateToTableOperatorImplSendToTableWithUpdateOperatorImpl: Will attempt to send updates using Table’s updateAsync method. Similar to SendToTableOperatorImpl where writes are done using putAsync method.
  • UpdateMessage class to represent an update and a default value pair instead of using KV (discussed in detail below)

...

Code Block
languagejava
titleMessageStream
collapsetrue
public interface MessageStream<M> {
/**
  * Allows sending update messages in this {@link MessageStream} to a {@link Table} and then propagates this
  * {@link MessageStream} to the next chained operator. The type of input message is expected to be {@link KV},
  * otherwise a {@link ClassCastException} will be thrown. The value is an UpdateMessage- update and default value.
  * Defaults are optional and can be used if the Remote Table integration supports inserting a default through PUT in
  * the event an update fails due to an existing record being absent.
  * <p>
  * Note: The update will be written but may not be flushed to the underlying table before its propagated to the
  * chained operators. Whether the message can be read back from the Table in the chained operator depends on whether
  * it was flushed and whether the Table offers read after write consistency. Messages retain the original partitioning
  * scheme when propagated to next operator.
  *
  * @param table the table to write messages to
  * @param contract argsUpdate additionalcontract argumentswhich passeddefines tohow the table update will be performed   
  * @param <K> the type of key in the table
  * @param <V> the type of record value in the table
  * @param <U> the type of update value for the table
  * @return this {@link MessageStream}
  */
 <K, V, U> MessageStream<KV<K, UpdateMessage<U, V>>> sendUpdateTosendTo(Table<KV<K, V>> table, Object ... argsUpdateContract contract);
}

Handling First Time Updates

...

The approach introduces an UpdateMessage class which captures the update and an optional default. sendUpdateTo  sendTo operator which sends updates to a table is designed to take a key (which uniquely identifies a record) and the UpdateMessage as the value. The user is also required to pass a UpdateContract.UPDATE_WITH_DEFAULTs  param with MessageStream's sendTo to enable this. 

SendToTableWithUpdateOperatorImpl SendUpdateToTableOperatorImpl is the implementation of the send-update-to-table operator and whether this operator supports first-time partial update entirely depends on the Remote store's implementation of the TableWriteFunction. This approach introduces a RecordNotFoundException which is a custom exception to be thrown in the updateAsync method of TableWriteFunction if the update fails due to an existing record not present for the key. If the SendUpdateToTableOperatorImpl operator encounters this exception, it attempts to PUT a default (if one is provided) and then applies an update on top of it.

...

Code Block
languagejava
titleCode Example
final RemoteTableDescriptor outputTableDesc = new RemoteTableDescriptor<Integer, EnrichedPageView, EnrichedPageView>("enriched-page-view-table-1");
 
final Table<KV<Integer, Profile>> joinTable = appDesc.
   getTable(outputTableDesc);
 
appDesc.getInputStream(isd)
 .map(pv -> new KV<>(pv.getMemberId(), pv))
 .join(joinTable, new PageViewToProfileJoinFunction())
 .map(m -> new KV(m.getMemberId(), UpdateMessage.of(m, m)))
 .sendUpdateTosendTo(outputTable, UpdateContract.UPDATE_WITH_DEFAULTS);


Test Plan

  • Test plan would include unit tests to capture changes to the Table API and to the operator graph
  • Add tests for update in different table types: TestBatchTable, TestAsyncRetriableTable, TestRemoteTable, TestAsyncRateLimitedTable
  • Update End to end tests to test sendUpdateTo operator: TestRemoteTableEndToEnd, TestRemoteTableWithBatchEndToEnd
  • Samza remote store integrations will be tested with unit tests and test flows

...

The plan is to release this feature with Samza 1.7  releasesrelease. The Table API changes are backward incompatible as AsyncReadWriteTable will now add a new generic type U to indicate an update in the class definition. Table integrations will have to be updated as well.

...