Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Accepted

Under Discussion thread: here

Discussion Vote thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejs
titleSink offsets request
{
  "offsets": [
    {
      "partition": {
        "kafka_topic": // Kafka topic
        "kafka_partition": // Kafka partition
	  },
      "offset": {
        "kafka_offset": // Kafka offset
      }
    }
  ]
}

Altering/resetting offsets (response)

We will want to accurately communicate to users whether they can be certain that offsets have been reset for their connector, which is not possible unless the connector has communicated this information to the Connect runtime by implementing the alterOffsets  method (described below).

As a result, there are three possible cases we will want to address with the responses for requests to alter or reset offsets.

If the connector has implemented alterOffsets  and everything has succeeded, the HTTP status will be 200 and the response body will beFor either kind of connector, the offset field may be null , which will reset the offset for that specific partition. For example, to reset the offset for partition 3 of a topic T read by a sink connector:

Code Block
languagejs
titleOffset alter/reset response (definite successSink offsets request (patch reset)
{
  "messageoffsets": "The[
 offsets for this connector{
   have been <reset|altered> successfully"
}

If the connector has not implemented alterOffsets  but everything else has succeeded, the HTTP status will be 200 and the response body will be:

Code Block
languagejs
titleOffset alter/reset response (possible success)
{
  "message": "The framework-managed offsets for this connector have been <reset|altered> successfully. However, if this connector manages offsets externally, they will need to be manually <altered|reset> in the system that the connector uses."
}
"partition": {
        "kafka_topic": "T"
        "kafka_partition": 3
	  },
      "offset": null
    }
  ]
}

Altering/resetting offsets (response)

We will want to accurately communicate to users whether they can be certain that offsets have been reset for their connector, which is not possible unless the connector has communicated this information to the Connect runtime by implementing the alterOffsets  method (described below).

As a result, there are three possible cases we will want to address with the responses for requests to alter or reset offsets.

If the connector has implemented alterOffsets  and everything has succeeded, the HTTP status will be 200 and the response body will be:

Code Block
languagejs
titleOffset alter/reset response (definite success)
{
  "message": "The offsets for this connector have been <reset|altered> successfully"
}

If the connector has not implemented alterOffsets  but everything else has succeeded, the HTTP status will be 200 and the response body will be:

Code Block
languagejs
titleOffset alter/reset response (possible success)
{
  "message": "The framework-managed offsets for this connector have been <reset|altered> successfully. However, if this connector manages offsets externally, they will need to be manually <altered|reset> in the system that the connector uses."
}

If anything fails (including consumer group deletion for sink If anything fails (including consumer group deletion for sink connectors, zombie fencing for exactly-once source connectors, invoking alterOffsets  on a connector, etc.), the standard 500 response will be returned:

Code Block
languagejs
titleOffset alter/reset response (possible success)
{
  "error_code": 500,
  "message": // Exception message here
}

Endpoints behavior

Reading offsets

Endpoints behavior

Reading offsets

The GET /connectors/{connector}/offsets  endpoint will be useful for examining the offsets of a currently-running connector. It will automatically deduce whether the connector is a source or sink based on the connector's configuration, and then return the offsets for the connector. The request will fail with a 404 response if the connector does not exist on the cluster.

Altering offsets

The PATCH The GET /connectors/{connector}/offsets  endpoint will be useful for examining altering the offsets of a currently-running connector. It will automatically deduce whether the connector is a source or sink based on the connector's configuration, and then return the offsets for the connectorstopped connectors. Requests will be rejected if the connector does not exist on the cluster (based on the config topic), if a rebalance is pending, if the connector is not in the STOPPED  state (described below), or if it does not have an empty set of tasks in the config topic. If successful, the request will be met with a 200 response and the appropriate body. The request will fail with a 404 response if the connector does not exist on the cluster.

Altering offsets

The PATCH /connectors/{connector}/offsets  endpoint will be useful for altering the offsets of stopped connectors. Requests will be rejected if the connector does not exist on the cluster (based on the config topic), if a rebalance is pending, or if the connector is not in the STOPPED  state (described below). If successful, the request will be met with a 204 ("no content") response and an empty response body. The request will fail with a 404 response if the connector does not exist on the cluster, and will fail with a 400 response if the connector does exist but is not in the correct state.

cluster, and will fail with a 400 response if the connector does exist but is not in the correct state.

The connector's alterOffsets  method (described below) will be invoked, before modifying its consumer group offsets or source offsets.

If exactly-once source support is enabled on a worker that receives a request to alter offsets for a source connector, it will fence out all previously-running tasks for the connector (if any exist, based on the presence of a task count record in the config topic) before invoking alterOffsets  and altering the offsets for the connector. Offsets will be altered transactionally for the connector's primary offsets topic (i.e., the one that is written to transactionally by source tasks when the connector is running). The transactional ID used for this operation will be ${groupId}-${connector}, where ${groupId} is the group ID of the Connect cluster and ${connector} is the name of the connectorThe connector's alterOffsets  method (described below) will be invoked, before modifying its consumer group offsets or source offsets.

Requests to this endpoint will only augment any existing offsets for the connector; they will not implicitly remove any offsets for the connector that are not included in the request body.

All offset alter requests will be forwarded to the leader of the cluster.

Resetting offsets

The DELETE /connectors/connector/offsets  endpoint will be useful for resetting the offsets of stopped connectors. Requests will be rejected if the connector does not exist on the cluster (based on the config topic), if a rebalance is pending, or if the connector is not in the STOPPED  state (described below), or if it does not have an empty set of tasks in the config topic. If successful, the request will be met with a 204 ("no content") 200 response and an empty response the appropriate body. The request will fail with a 404 response if the connector does not exist on the cluster, and will fail with a 400 response if the connector does exist but is not in the correct state. This endpoint will be idempotent; multiple consecutive requests to reset offsets for the same connector with no new offsets produced in between those requests will all result in a 204 200 response (if they are successful).

...

If exactly-once source support is enabled on a worker that receives a request to reset offsets for a source connector, it will fence out all previously-running tasks for the connector (if any exist, based on the presence of a task count record in the config topic) before invoking alterOffsets  and resetting the offsets for the connector. Offsets will be reset transactionally for each topic that they exist in: a single transaction will be used to emit all tombstone records for the connector's dedicated offsets topic (if one is used) and another transaction will be used to emit all tombstone records for the worker's global offsets topicthe connector's primary offsets topic (i.e., the one that is written to transactionally by source tasks when the connector is running). The transactional ID for this operation will be ${groupId}-${connector}, where ${groupId} is the group ID of the Connect cluster and ${connector} is the name of the connector. If this topic is different from the worker's global offsets topic, the connector's offsets will be removed from that topic as well, but without the use of a transaction.

Requests to reset sink connector offsets will be satisfied by deleting the consumer group for that sink connector (as opposed to deleting all known offsets for that consumer group). Requests to reset sink connector offsets will fail if there are any active members of the sink connector's consumer group.

...

Code Block
languagejava
titleSourceConnector offset hook
public abstract class SourceConnector extends Connector {

    /**
     * Invoked when users request to manually alter/reset the offsets for this connector via the REST
     * API. Connectors that manage offsets externally can propagate offset changes to their external
     * system in this method. Connectors may also validate these offsets if, for example, the source
     * partition is in a format that will not be recognized by them or their tasks.
     * <p/>
     * Connectors that do not manage offsets externally or require custom offset validation need not
     * implement this method beyond simply returning {@code true}.
     * <p/>
     * User requests to alter/reset offsets will be handled by the Connect runtime and will be reflected
     * in the offsets returned by any
     * {@link org.apache.kafka.connect.storage.OffsetStorageReader OffsetStorageReader instances}
     * provided to this connector and its tasks.
     * <p/>
     * It is guaranteed thatSimilar to {@link #validate(Map) validate}, this method willmay be invoked aftercalled by the runtime before the
   connector is already* {@link #start(Map) started}start} method is invoked.
     *
  and {@link #initialize(ConnectorContext) initialized}, and before it is {@link #stop() stopped}. * @param connectorConfig the configuration of the connector
     * @param offsets a map from source partition to source offset, containing the offsets that the
     *                user has requested to alter/reset. For any source partitions that are being reset instead
     *                of altered, their corresponding value in the map will be {@code null}.
     * @return whether this method has been overridden by the connector; the default implementation returns
     * {@code false}, and all other implementations (that do not unconditionally throw exceptions) should return
     * {@code true}
     * @throws UnsupportedOperationException if it is impossible to alter/reset the offsets for this connector
     * @throws org.apache.kafka.connect.errors.ConnectException if the offsets for this connector cannot be
     * reset for any other reason (for example, they have failed custom validation logic specific to this connector)
     */
    public boolean alterOffsets(Map<String, String> connectorConfig, Map<Map<String, ?>, Map<String, ?>> offsets) {
        return false;
    }

}

...

Code Block
languagejava
titleSinkConnector offset hook
public abstract class SinkConnector extends Connector {

    /**
     * Invoked when users request to manually alter/reset the offsets for this connector via the REST
     * API. Connectors that manage offsets externally can propagate offset changes to their external
     * system in this method. Connectors may also validate these offsets if, for example, an offset
     * is out of range for what can be feasibly written to the external system.
     * <p/>
     * Connectors that do not manage offsets externally or require custom offset validation need not
     * implement this method beyond simply returning {@code true}.
     * <p/>
     * User requests to alter/reset offsets will be handled by the Connect runtime and will be reflected
     * in the offsets for this connector's consumer group.
     * <p/>
     * It is guaranteed that  Similar to {@link #validate(Map) validate}, this method willmay be invoked aftercalled by the runtime before the
 connector   is already* {@link #start(Map) started}start} method is invoked.
     *
  and {@link #initialize(ConnectorContext) initialized}, and before it is {@link #stop() stopped}.
      * @param connectorConfig the configuration of the connector
     * @param offsets a map from topic partition to offset, containing the offsets that the
     *                user has requested to alter/reset. For any topic partitions that are being reset instead
     *                of altered, their corresponding value in the map will be {@code null}.
     * @return whether this method has been overridden by the connector; the default implementation returns
     * {@code false}, and all other implementations (that do not unconditionally throw exceptions) should return
     * {@code true}
     * @throws UnsupportedOperationException if it is impossible to alter/reset the offsets for this connector
     * @throws org.apache.kafka.connect.errors.ConnectException if the offsets for this connector cannot be
     * reset for any other reason (for example, they have failed custom validation logic specific to this connector)
     */
    public boolean alterOffsets(Map<String, publicString> booleanconnectorConfig, alterOffsets(Map<TopicPartition, Long> offsets) {
        return false;
    }

}

...

This feature is fully backwards-compatible with existing Kafka Connect releases. Migration should occur automatically whenever a Kafka Connect cluster is upgraded to a version that supports this featurea Kafka Connect cluster is upgraded to a version that supports this feature.

Additional connector ACLs

If exactly-once source support is enabled on a worker, in order to handle requests to alter or reset offsets, the connector's principal must be able to use a transactional ID of ${groupId}-${connector}, where ${groupId} is the group ID of the Connect cluster and ${connector} is the name of the connector.

STOPPED target state

Diligent readers will note that the addition of a new target state creates problems with cluster downgrades and rolling upgrades. If a connector is stopped, and a worker running an older version of Kafka Connect either joins or currently exists in the cluster, that worker may not know how to handle the new record in the config topic that includes the request to stop the connector.

...

Scope: The new PATCH /connectors/{connector}/offsets  endpoint

Dependencies: The  The STOPPED state feature and Resetting offsets (internal logic) features

Resetting offsets (internal logic)

...

Dependencies: The STOPPED state and Resetting offsets (internal logic) features

Test Plan

Integration Unit, integration and/or system tests will be added for these cases:

...

  • Cannot view the offsets of a nonexistent connector
  • Can view the offsets of a running sink and source connector, and verify that that those offsets reflect an expected level of progress for each connector (i.e., they are greater than or equal to a certain value depending on how the connectors are configured and how long they have been running)connectors are configured and how long they have been running)
  • Send the appropriate REST response when altering a connector's offsets depending on whether its alterOffsets  method is not implemented, throws an UnsupportedOperationException, throws a different exception, or is implemented and returns successfully
  • Cannot alter the offsets of a nonexistent, paused, or running connector
  • Can alter the offsets of a stopped sink and source connector, verify that those offsets are returned in subsequent requests to view the offsets of the connector, and verify that those offsets are used by the connector when it is resumed
  • Send the appropriate REST response when resetting a connector's offsets depending on whether its alterOffsets  method is not implemented, throws an UnsupportedOperationException, throws a different exception, or is implemented and returns successfully
  • Cannot reset the offsets of a nonexistent, paused, or running connector
  • Can reset the offsets of a stopped sink and source connector
  • Second follow-up requests to reset offsets for those connectors after their offsets have already been reset once are also successful
  • Can view/alter/reset the offsets for a sink connector that uses an overridden consumer group ID
  • Can view/alter/reset the offsets for a source connector that uses a custom offsets topic
  • Can view/alter/reset the offsets for a sink and source connector that targets a different Kafka cluster than the one used for the internal topics of the Kafka Connect cluster

...

  • Can stop a running connector
  • Can stop a paused connector
  • Can stop a stopped connector (i.e., stopping an already-stopped connector is idempotent)
  • Can delete a stopped connector
  • Cannot see task configuration or status information in the REST API for a stopped connector
  • Can resume a stopped connector
  • Can pause a stopped connector
  • Stopping a failed connector updates its state to STOPPED  in the REST API
  • Stopping a connector that fails during shutdown after receiving a stop request updates its state to STOPPED  in the REST API
  • Can resume a connector after its Connector  has failed both before and during shutdown after receiving a stop request

...