Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The PATCH /connectors/{connector}/offsets  endpoint will be useful for altering the offsets of stopped connectors. Requests will be rejected if the connector does not exist on the cluster (based on the config topic), if a rebalance is pending, or if the connector is not in the STOPPED  state (described below), or if it does not have an empty set of tasks in the config topic. If successful, the request will be met with a 204 ("no content") response and an empty response body. The request will fail with a 404 response if the connector does not exist on the cluster, and will fail with a 400 response if the connector does exist but is not in the correct state.

...

The DELETE /connectors/connector/offsets  endpoint will be useful for resetting the offsets of stopped connectors. Requests will be rejected if the connector does not exist on the cluster (based on the config topic), if a rebalance is pending, or if the connector is not in the STOPPED  state (described below), or if it does not have an empty set of tasks in the config topic. If successful, the request will be met with a 204 ("no content") response and an empty response body. The request will fail with a 404 response if the connector does not exist on the cluster, and will fail with a 400 response if the connector does exist but is not in the correct state. This endpoint will be idempotent; multiple consecutive requests to reset offsets for the same connector with no new offsets produced in between those requests will all result in a 204 response (if they are successful).

...

Code Block
languagejava
titleSourceConnector offset hook
public abstract class SourceConnector extends Connector {

    /**
     * Invoked when users request to manually alter/reset the offsets for this connector via the REST
     * API. Connectors that manage offsets externally can propagate offset changes to their external
     * system in this method. Connectors may also validate these offsets if, for example, the source
     * partition is in a format that will not be recognized by them or their tasks.
     * <p/>
     * Connectors that do not manage offsets externally or require custom offset validation need not
     * implement this method beyond simply returning {@code true}.
     * <p/>
     * User requests to alter/reset offsets will be handled by the Connect runtime and will be reflected
     * in the offsets returned by any
     * {@link org.apache.kafka.connect.storage.OffsetStorageReader OffsetStorageReader instances}
     * provided to this connector and its tasks.
     * <p/>
     * Similar Itto is guaranteed that{@link #validate(Map) validate}, this method willmay be invoked after called by the runtime before the
 connector   is already* {@link #start(Map) started}start} method is invoked.
     *
 and  {@link #initialize(ConnectorContext) initialized}, and before it is {@link #stop() stopped}. * @param connectorConfig the configuration of the connector
     * @param offsets a map from source partition to source offset, containing the offsets that the
     *                user has requested to alter/reset. For any source partitions that are being reset instead
     *                of altered, their corresponding value in the map will be {@code null}.
     * @return whether this method has been overridden by the connector; the default implementation returns
     * {@code false}, and all other implementations (that do not unconditionally throw exceptions) should return
     * {@code true}
     * @throws UnsupportedOperationException if it is impossible to alter/reset the offsets for this connector
     * @throws org.apache.kafka.connect.errors.ConnectException if the offsets for this connector cannot be
     * reset for any other reason (for example, they have failed custom validation logic specific to this connector)
     */
    public boolean alterOffsets(Map<String, String> connectorConfig, Map<Map<String, ?>, Map<String, ?>> offsets) {
        return false;
    }

}

...

Code Block
languagejava
titleSinkConnector offset hook
public abstract class SinkConnector extends Connector {

    /**
     * Invoked when users request to manually alter/reset the offsets for this connector via the REST
     * API. Connectors that manage offsets externally can propagate offset changes to their external
     * system in this method. Connectors may also validate these offsets if, for example, an offset
     * is out of range for what can be feasibly written to the external system.
     * <p/>
     * Connectors that do not manage offsets externally or require custom offset validation need not
     * implement this method beyond simply returning {@code true}.
     * <p/>
     * User requests to alter/reset offsets will be handled by the Connect runtime and will be reflected
     * in the offsets for this connector's consumer group.
     * <p/>
     * It is guaranteed thatSimilar to {@link #validate(Map) validate}, this method willmay be invoked aftercalled by the runtime before the
 connector   is already* {@link #start(Map) started}start} method is invoked.
     *
  and {@link #initialize(ConnectorContext) initialized}, and before it is {@link #stop() stopped}.
      * @param connectorConfig the configuration of the connector
     * @param offsets a map from topic partition to offset, containing the offsets that the
     *                user has requested to alter/reset. For any topic partitions that are being reset instead
     *                of altered, their corresponding value in the map will be {@code null}.
     * @return whether this method has been overridden by the connector; the default implementation returns
     * {@code false}, and all other implementations (that do not unconditionally throw exceptions) should return
     * {@code true}
     * @throws UnsupportedOperationException if it is impossible to alter/reset the offsets for this connector
     * @throws org.apache.kafka.connect.errors.ConnectException if the offsets for this connector cannot be
     * reset for any other reason (for example, they have failed custom validation logic specific to this connector)
     */
    public boolean alterOffsets(Map<TopicPartition, Long> offsets) {
        return false;
    }

}

...