Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder VoteAccepted (vote thread)

Discussion thread: here

JIRA: here

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4794

Motivation

Currently the offsets storage is only accessible from SourceTask to able to initialize properly tasks after a restart, a crash or a reconfiguration request.
To implement more complex connectors that need to track the progression of each task it would helpful to have access to an OffsetStorageReader instance from the SourceConnector.

...

A connector which needs to stream files can use a background-thread to periodically scan directories. When new inputs files are detected a tasks reconfiguration is requested. The connector assigns a file subset to each task.
Each task stores sources offsets for the last sent record. The source offsets data are the size of file, the bytes offset and the bytes size.

A Task becomes idle as soons soon as all the assigned files are completed.
The connector should be able to track offsets for each assigned file. When all tasks has finished the connector can stop them or assigned new files by requesting tasks reconfiguration.

...

Code Block
languagejava
/**
 * SourceConnectorContext is provided to SourceConnector to allow them to interact with the underlying
 * runtime.
 *
 * This interface can be used to add specifics methods to a SourceConnector.
 */
public interface SourceConnectorContext extends ConnectorContext {

    /**
     * Get the OffsetStorageReader for this SourceConnectorContext.
     */
    OffsetStorageReader offsetStorageReader();
}

...


The SourceConnectorContext class will provide access to an OffsetStorageReader.

...

Code Block
languagejava
public ConnectorContext context() {
    return context;
}

 


This method is then overridden by the SourceConnector and SinkConnector classes to change the return type to be SourceConnectorContext and SinkConnectorContext, respectively.

...

This proposal is simple and straightforward, and reflects the patterns used in the API in other areas. Another similar approach was considered but evolved to this design following a discussion. Another possibility of moving the context field from Connector to SourceConnector and SinkConnector (with the appropriate type) was ruled out as it would not be as binary-compatible as the current approach.