Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Motivation

Table in Samza is an abstraction for a data source that supports random access by key. A table could be a remote data-store, Couchbase, for example, or a local InMemory or RocksDb backed store. The Samza table API [1] currently supports gets, puts and deletes. Partial updates to existing records is a commonly requested feature in the current Table API and is supported by many stores. This document describes the proposed approach to provide support for partial updates in Table API.

Current State

Let’s first start with discussing the key interfaces of the Table API design-

...

TableWriteFunction implementing classes typically have distinct genetic type parameters K, V specific to the table. V is the type of the record stored in the remote data store. Partial update record type is not always of the same type as the write record. Due to this type constraint, it will not be possible to change putAsync in Table API to support updates as well. 

Proposed Solution

In order to support Partial updates, we will need to add an update API in ReadWriteTable and related interfaces. Update is a variant of write but sometimes works with a different record type when compared to Write record type. AsyncReadWriteTable works with generic KV where K is the key type and V is the value type of data in the Table. We need to add a generic type U to represent an update type. Adding another generic type parameter to denote an update is a backward incompatible change and would result in changes all across the Table API. The type K, V, U for a given table will be fixed.

Samza Table API Changes with Partial Update

The following changes have to be made:

...

Code Block
languagejava
titleMessageStream
collapsetrue
public interface MessageStream<M> {
/**
  * Allows sending update messages in this {@link MessageStream} to a {@link Table} and then propagates this
  * {@link MessageStream} to the next chained operator. The type of input message is expected to be {@link KV},
  * otherwise a {@link ClassCastException} will be thrown. The value is an UpdateMessage- update and default value.
  * Defaults are optional and can be used if the Remote Table integration supports inserting a default through PUT in
  * the event an update fails due to an existing record being absent.
  * <p>
  * Note: The update will be written but may not be flushed to the underlying table before its propagated to the
  * chained operators. Whether the message can be read back from the Table in the chained operator depends on whether
  * it was flushed and whether the Table offers read after write consistency. Messages retain the original partitioning
  * scheme when propagated to next operator.
  *
  * @param table the table to write messages to
  * @param contract Update contract which defines how the update will be performed   
  * @param <K> the type of key in the table
  * @param <V> the type of record value in the table
  * @param <U> the type of update value for the table
  * @return this {@link MessageStream}
  */
 <K, V, U> MessageStream<KV<K, UpdateMessage<U, V>>> sendTo(Table<KV<K, V>> table, UpdateContract contract);
}

Handling First Time Updates

While partial updates are intended to update existing records, there will be certain cases which require support for first-time partial updates i.e update to a record which doesn’t exist. To account for such cases, the design needs to have a provision to optionally provide a default record which can be PUT in the absence of an existing record. The update can then be applied on top of the default record.

...

Code Block
languagejava
titleSendUpdateToTableOperatorImpl
collapsetrue
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.samza.operators.impl;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.apache.samza.SamzaException;
import org.apache.samza.context.Context;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.UpdateMessage;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.SendUpdateToTableOperatorSpec;
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.RecordNotFoundException;
import org.apache.samza.table.batching.CompactBatchProvider;
import org.apache.samza.table.remote.RemoteTable;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * Implementation of a send-update-stream-to-table operator that applies updates to existing records
 * in the table. If there is no pre-existing record, based on Table's implementation it might attempt to write a
 * default record and then applies an update.
 *
 * @param <K> the type of the record key
 * @param <V> the type of the record value
 * @param <U> the type of the update applied to this table
 */
public class SendUpdateToTableOperatorImpl<K, V, U>
    extends OperatorImpl<KV<K, UpdateMessage<U, V>>, KV<K, UpdateMessage<U, V>>> {
  private static final Logger LOG = LoggerFactory.getLogger(SendUpdateToTableOperatorImpl.class);

  private final SendUpdateToTableOperatorSpec<K, V, U> sendUpdateToTableOpSpec;
  private final ReadWriteTable<K, V, U> table;

  public SendUpdateToTableOperatorImpl(SendUpdateToTableOperatorSpec<K, V, U>  sendUpdateToTableOpSpec, Context context) {
    this.sendUpdateToTableOpSpec = sendUpdateToTableOpSpec;
    this.table = context.getTaskContext().getTable(sendUpdateToTableOpSpec.getTableId());
    if (context.getTaskContext().getTable(sendUpdateToTableOpSpec.getTableId()) instanceof RemoteTable) {
      RemoteTable<K, V, U> remoteTable = (RemoteTable<K, V, U>) table;
      if (remoteTable.getBatchProvider() instanceof CompactBatchProvider) {
        throw new SamzaException("Batching is not supported with Compact Batches for partial updates");
      }
    }
  }

  @Override
  protected void handleInit(Context context) {
  }

  @Override
  protected CompletionStage<Collection<KV<K, UpdateMessage<U, V>>>> handleMessageAsync(KV<K, UpdateMessage<U, V>> message,
      MessageCollector collector, TaskCoordinator coordinator) {
    final CompletableFuture<Void> updateFuture = table.updateAsync(message.getKey(), message.getValue().getUpdate(),
        sendUpdateToTableOpSpec.getArgs());

    return updateFuture
        .handle((result, ex) -> {
          if (ex == null) {
            // success, no need to Put a default value
            return false;
          } else if (ex.getCause() instanceof RecordNotFoundException && message.getValue().getDefault() != null) {
            // If update fails for a given key due to a RecordDoesNotExistException exception thrown and a default is
            // provided, then attempt to PUT a default record for the key and then apply the update
            return true;
          } else {
            throw new SamzaException("Update failed with exception: ", ex);
          }
        })
        .thenCompose(shouldPutDefault -> {
          if (shouldPutDefault) {
            final CompletableFuture<Void> putFuture = table.putAsync(message.getKey(), message.getValue().getDefault(),
                sendUpdateToTableOpSpec.getArgs());
            return putFuture
                .exceptionally(ex -> {
                  LOG.warn("PUT default failed due to an exception. Ignoring the exception and proceeding with update. "
                          + "The exception encountered is: ", ex);
                  return null;
                })
                .thenCompose(res -> table.updateAsync(message.getKey(), message.getValue().getUpdate(),
                sendUpdateToTableOpSpec.getArgs()));
          } else {
            return CompletableFuture.completedFuture(null);
          }
        }).thenApply(result -> Collections.singleton(message));
  }

  @Override
  protected void handleClose() {
    table.close();
  }

  @Override
  protected OperatorSpec<KV<K, UpdateMessage<U, V>>, KV<K, UpdateMessage<U, V>>> getOperatorSpec() {
    return sendUpdateToTableOpSpec;
  }
}

Update support in different Table types

Batching Table

Batching tables supports batching of table operations. The key classes/interfaces are:

...

Code Block
languagejava
titleUpdateOperation
collapsetrue
/**
 * Update operation.
 *
 * @param <K> The type of the key.
 * @param <U> The type of the update
 */
public class UpdateOperation<K, V, U> implements Operation<K, V, U> {
  final private K key;
  final private U update;
  final private Object[] args;

  public UpdateOperation(K key, U update, Object ... args) {
    Preconditions.checkNotNull(key);
    Preconditions.checkNotNull(update);
    Preconditions.checkNotNull(args);
    this.key = key;
    this.update = update;
    this.args = args;
  }

  /**
   * @return The key to be updated in the table.
   */
  @Override
  public K getKey() {
    return key;
  }

  /**
   * @return null.
   */
  @Override
  public V getValue() {
    return null;
  }

  /**
   * @return The Update to be applied to the table for the key.
   */
  @Override
  public U getUpdate() {
    return update;
  }

  @Override
  public Object[] getArgs() {
    return args;
  }
}

Retriable Table

AsyncRetriableTable currently uses Failsafe library for handling retries. Reads (get) and Writes (puts, deletes) each currently have a RetryPolicy and metrics associated. The metrics reported are retry count, success count, perm failure count and retry timer. We will reuse the Write RetryPolicy for updates as well and metrics reported would be the same as for writes.

Rate Limited Table

Rate limited table currently uses Guava’s RateLimiter, one each for read and write operations uniquely identified by tags. We will reuse the write rate limiter for write for updates as well.

Local Table

Backed by Samza’s KeyValueStore. It will not support updates as the underlying store doesn’t support updates.

Caching Table

Caching tables will not support updates.

Partial Update Code Example

A simple code example below for writing updates to a table using Samza table API:

Code Block
languagejava
titleCode Example
final RemoteTableDescriptor outputTableDesc = new RemoteTableDescriptor<Integer, EnrichedPageView, EnrichedPageView>("enriched-page-view-table-1");
 
final Table<KV<Integer, Profile>> joinTable = appDesc.
   getTable(outputTableDesc);
 
appDesc.getInputStream(isd)
 .map(pv -> new KV<>(pv.getMemberId(), pv))
 .join(joinTable, new PageViewToProfileJoinFunction())
 .map(m -> new KV(m.getMemberId(), UpdateMessage.of(m, m)))
 .sendTo(outputTable, UpdateContract.UPDATE_WITH_DEFAULTS);


Test Plan

  • Test plan would include unit tests to capture changes to the Table API and to the operator graph
  • Add tests for update in different table types: TestBatchTable, TestAsyncRetriableTable, TestRemoteTable, TestAsyncRateLimitedTable
  • Update End to end tests to test sendUpdateTo operator: TestRemoteTableEndToEnd, TestRemoteTableWithBatchEndToEnd
  • Samza remote store integrations will be tested with unit tests and test flows

Rollout

The plan is to release this feature with Samza 1.7  release. The Table API changes are backward incompatible as AsyncReadWriteTable will now add a new generic type U to indicate an update in the class definition. Table integrations will have to be updated as well.

References

1. Samza Table API: https://samza.apache.org/learn/documentation/1.0.0/api/table-api.html

...