Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Add a new, dedicated Task State: PAUSED

Table of Contents

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Presently, Kafka Streams provides users with two options for handling a DeserializationException  via the DeserializationExceptionHandler  interface:

...

This KIP introduces a third option: SUSPEND PAUSE.

Public Interfaces

Modified Interfaces

Code Block
languagejava
firstline54
titleorg.apache.kafka.streams.errors.DeserializationExceptionHandler.DeserializationHandlerResponse
linenumberstrue
        /* suspendpause processing the current Task, but continue other Tasks */
        SUSPEND(2, "SUSPENDPAUSE");

New Interfaces

Code Block
languagejava
firstline1
titleorg.apache.kafka.streams.errors.LogAndSuspendExceptionHandlerLogAndPauseExceptionHandler
linenumberstrue
public class LogAndSuspendExceptionHandlerLogAndPauseExceptionHandler implements DeserializationExceptionHandler {
    private static final Logger log = LoggerFactory.getLogger(LogAndSuspendExceptionHandlerLogAndPauseExceptionHandler.class);

    @Override
    public DeserializationHandlerResponse handle(final ProcessorContext context,
                                                 final ConsumerRecord<byte[], byte[]> record,
                                                 final Exception exception) {

        log.error("Exception caught during Deserialization, " +
                        "taskId: {}, topic: {}, partition: {}, offset: {}",
                context.taskId(), record.topic(), record.partition(), record.offset(),
                exception);

        return DeserializationHandlerResponse.SUSPENDPAUSE;
    }

    @Override
    public void configure(final Map<String, ?> configs) {
        // ignore
    }
}

...

Code Block
languagejava
titleorg.apache.kafka.streams.KafkaStreams
/**
 * Resume processing the {@link Task} specified by its {@link TaskId id}.
 * <p>
 * This method resumes a {@link Task} that was {@link Task.State.SUSPENDEDPAUSED} due to an {@link
 * DeserializationExceptionHandler.DeserializationHandlerResponse.SUSPENDPAUSE error}.
 * <p>
 * If the given {@link Task} is not {@link Task.State.SUSPENDEDPAUSED}, no action will be taken and
 * {@code false} will be returned.
 * <p>
 * Otherwise, this method will attempt to transition the {@link Task} to {@link Task.State.RUNNING},
 * and return {@code true}, if successful.
 * 
 * @return {@code true} if the {@link Task} was {@link Task.State.SUSPENDEDPAUSED} and was successfully
 *         transitioned to {@link Task.State.RUNNING}, otherwise {@code false}.
 */
public boolean resume(final TaskId task);

/**
 * Gets all of the currently paused Tasks running on threads of this Kafka Streams instance.
 * <p>
 * @return A {@link Set} containing the metadata for Tasks that are assigned to this Kafka
 *         Streams instance, but are currently paused.
 */
public Set<TaskMetadata> pausedLocalTasks() {
    return metadataForLocalThreads().stream()
        .flatMap(thread -> Stream.concat(thread.activeTasks().stream(), thread.standbyTasks().stream()))
        .filter(task -> !task.isRunning())
        .collect(Collectors.toSet());
}


Code Block
languagejava
titleorg.apache.kafka.streams.processor.TaskMetadata
/**
 * Determines if this Task is running or paused.
 * <p>
 * @return {@code true} if this Task is running; {@code false} if it is paused.
 */
public boolean isRunning();


Proposed Changes

DeserializationHandlerResponse.SUSPEND suspends PAUSE pauses the Task that has encountered the error, but continues to process other Tasks normally. When a Task is SUSPENDED PAUSED, it is still assigned as an active Task to the instance, but it will not consume or process any records.

...

  1. If the record should be valid: fixing the bug, in the application that causes the record to fail to deserialize. Once the bug has been fixed, the user would shutdown the application, deploy a fixed build and restart it. Once restarted, any SUSPENDED PAUSED Tasks would automatically start running again from the record that originally produced the error.
  2. If the record is invalid (e.g. corrupt data): advancing the consumer offsets, either via an external tool, or by a user-supplied application API. Once the offsets have been advanced, the user could either restart their application instance, or provide an API that resumes the SUSPENDED use the KafkaStreams#reume(TaskId) API to resume the PAUSED Task, if they wish to minimize downtime.

Implementation details

When a DeserializationExceptionHandler  returns SUSPEND PAUSE, the current Task will be suspended paused via InternalProcessorContext. However, if the Task is a TaskType.GLOBAL  Task, it will automatically upgrade the response to FAIL, as the GlobalTask cannot be SUSPENDEDPAUSED; suspending pausing the global Task without also suspending pausing all other Tasks on the instance would cause them to work with stale data if they read from or join against any global tables.

When the Task is SUSPENDEDPAUSED, we will ensure that the offset of the last successfully processed record(s) on that Task are committed. This ensures that:

  1. If the user fixes a bug and restarts the application, it will continue from the record that failed, and will not re-process a previously successfully processed record.
  2. If the user wants to advance the consumer offsets past the "bad" record, they can simply use: kafka-consumer-groups --reset-offsets --topic <topic>:<partition> --shift-by 1  to skip the bad message before resuming the Task.

Compatibility, Deprecation, and Migration Plan

  • Since this is new functionality, it should not modify the behaviour of the system unless the new SUSPEND PAUSE response is used in a DeserializationExceptionHandler.
  • No APIs are deprecated or need migration.

Test Plan

  • An integration test will verify that, when suspending pausing a failed Task, the consumer offset of the last successfully processed record(s) are committed.
  • A unit test suite will verify that the LogAndSuspendExceptionHandler LogAndPauseExceptionHandler properly suspends pauses the StreamTask.
    • A unit test will also verify that Global Tasks are never SUSPENDEDPAUSED.

Rejected Alternatives

No alternatives have been considered.