THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Caching tables will not support updates.
Code Example- Samza Table API
Partial Update Code Example
A simple code example below for writing updates to a table using Samza table API:
Code Block | ||||
---|---|---|---|---|
| ||||
final RemoteTableDescriptor outputTableDesc = new RemoteTableDescriptor<Integer, EnrichedPageView, EnrichedPageView>("enriched-page-view-table-1");
final Table<KV<Integer, Profile>> joinTable = appDesc.
getTable(outputTableDesc);
appDesc.getInputStream(isd)
.map(pv -> new KV<>(pv.getMemberId(), pv))
.join(joinTable, new PageViewToProfileJoinFunction())
.map(m -> new KV(m.getMemberId(), UpdatePair.of(m, m)))
.sendUpdateTo(outputTable);
|
Test Plan
- Test plan would include unit tests to capture changes to the Table API and to the operator graph
- Add tests for update in different table types: TestBatchTable, TestAsyncRetriableTable, TestRemoteTable, TestAsyncRateLimitedTable
- Update End to end tests to test sendUpdateTo operator: TestRemoteTableEndToEnd, TestRemoteTableWithBatchEndToEnd
- Samza remote store integrations will be tested with unit tests and test flows
...