Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleSendUpdateToTableOperatorImplSendToTableWithUpdateOperatorImpl
collapsetrue
/*
public *class LicensedSendToTableWithUpdateOperatorImpl<K, toV, theU>
 Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.samza.operators.impl;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.apache.samza.SamzaException;
import org.apache.samza.context.Context;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.UpdateMessage;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.SendUpdateToTableOperatorSpec;
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.RecordNotFoundException;
import org.apache.samza.table.batching.CompactBatchProvider;
import org.apache.samza.table.remote.RemoteTable;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * Implementation of a send-update-stream-to-table operator that applies updates to existing records
 * in the table. If there is no pre-existing record, based on Table's implementation it might attempt to write a
 * default record and then applies an update.
 *
 * @param <K> the type of the record key
 * @param <V> the type of the record value
 * @param <U> the type of the update applied to this table
 */
public class SendUpdateToTableOperatorImpl<K, V, U>
    extends OperatorImpl<KV<K, UpdateMessage<U, V>>, KV<K, UpdateMessage<U, V>>> {
  private static final Logger LOG = LoggerFactory.getLogger(SendUpdateToTableOperatorImpl.class);

  private final SendUpdateToTableOperatorSpec<K, V, U> sendUpdateToTableOpSpec;
  private final ReadWriteTable<K, V, U> table;

  public SendUpdateToTableOperatorImpl(SendUpdateToTableOperatorSpec<K, V, U>  sendUpdateToTableOpSpec, Context context) {
    this.sendUpdateToTableOpSpec = sendUpdateToTableOpSpec;
    this.table = context.getTaskContext().getTable(sendUpdateToTableOpSpec.getTableId());
    if (context.getTaskContext().getTable(sendUpdateToTableOpSpec.getTableId()) instanceof RemoteTable)extends OperatorImpl<KV<K, UpdateMessage<U, V>>, KV<K, UpdateMessage<U, V>>> {
  private static final Logger LOG = LoggerFactory.getLogger(SendToTableWithUpdateOperatorImpl.class);

  private final SendToTableWithUpdateOperatorSpec<K, V, U> spec;
  private final ReadWriteTable<K, V, U> table;

  public SendToTableWithUpdateOperatorImpl(SendToTableWithUpdateOperatorSpec<K, V, U> spec, Context context) {
    this.spec = spec;
    this.table = context.getTaskContext().getTable(spec.getTableId());
    if (context.getTaskContext().getTable(spec.getTableId()) instanceof RemoteTable) {
      RemoteTable<K, V, U> remoteTable = (RemoteTable<K, V, U>) table;
      if (remoteTable.getBatchProvider() instanceof CompactBatchProvider) {
        throw new SamzaException("Batching is not supported with Compact Batches for partial updates");
      }
    }
  }

  @Override
  protected void handleInit(Context context) {
  }

  @Override
  protected CompletionStage<Collection<KV<K, UpdateMessage<U, V>>>> handleMessageAsync(KV<K, UpdateMessage<U, V>> message,
      MessageCollector collector, TaskCoordinator coordinator) {
    final UpdateContract contract = spec.getContract();
    final CompletableFuture<Void> updateFuture = table.updateAsync(message.getKey(), message.getValue().getUpdate());

    return updateFuture
        .handle((result, ex) -> {
      RemoteTable<K, V, U> remoteTable =if (RemoteTable<K, V, U>) table;
ex == null) {
         if (remoteTable.getBatchProvider() instanceof CompactBatchProvider) {
        throw new SamzaException("Batching is not supported with Compact Batches for partial updates")// success, no need to Put a default value
            return false;
      }
    }
 else }

  @Override
  protected void handleInit(Context contextif (ex.getCause() instanceof RecordNotFoundException && message.getValue().hasDefault()) {
  }

  @Override
  protected CompletionStage<Collection<KV<K, UpdateMessage<U, V>>>> handleMessageAsync(KV<K, UpdateMessage<U, V>> message,
      MessageCollector collector, TaskCoordinator coordinator) {
    final CompletableFuture<Void> updateFuture = table.updateAsync(message.getKey(), message.getValue().getUpdate(),
// If an update fails for a given key due to a RecordDoesNotExistException exception thrown and
          sendUpdateToTableOpSpec.getArgs());

  // a returndefault updateFuture
is provided and the UpdateContract is set  .handle((result, ex) -> {to UPDATE_WITH_DEFAULTS, then attempt
          if (ex == null) {
            // success, no need to Put a default value
       // to PUT a default record for the key and then apply the update.
            if (contract == UpdateContract.UPDATE_WITH_DEFAULTS) {
     return false;
        return true;
 } else if (ex.getCause() instanceof RecordNotFoundException && message.getValue().getDefault() != null) {
  } else {
        // If update fails for a giventhrow key due to a RecordDoesNotExistException exception thrown and a default is
      new SamzaException("Put default failed for update as the Update contract was set to " + contract +
      // provided, then attempt to PUT a default record for the key and". thenPlease apply the updateuse UpdateContract.UPDATE_WITH_DEFAULTS instead.");
            return true;}
          } else {
            throw new SamzaException("Update failed with exception: ", ex);
          }
        })
        .thenCompose(shouldPutDefault -> {
          if (shouldPutDefault) {
            final CompletableFuture<Void> putFuture = table.putAsync(message.getKey(), message.getValue().getDefault(),
                sendUpdateToTableOpSpec.getArgs());
            return putFuture
                .exceptionally(ex -> {
                  LOG.warn("PUT default failed due to an exception. Ignoring the exception and proceeding with update. "
                          + "The exception encountered is: ", ex);
                  return null;
                })
                .thenCompose(res -> table.updateAsync(message.getKey(), message.getValue().getUpdate(),
                sendUpdateToTableOpSpec.getArgs()));
          } else {
            return CompletableFuture.completedFuture(null);
          }
        }).thenApply(result -> Collections.singleton(message));
  }

  @Override
  protected void handleClose() {
    table.close();
  }

  @Override
  protected OperatorSpec<KV<K, UpdateMessage<U, V>>, KV<K, UpdateMessage<U, V>>> getOperatorSpec() {
    return sendUpdateToTableOpSpecspec;
  }
}

  

Update support in different Table types

...