You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Status

Current stateUnder Discussion

Discussion thread: here

JIRA: here

Motivation

Currently the offsets storage is only accessible from SourceTask to able to initialize properly tasks after a restart, a crash or a reconfiguration request.
To implement more complex connectors that need to track the progression of each task it would helpful to have access to an OffsetStorageReader instance from the SourceConnector.

In that way, it would become possible to implement a background thread that could request a tasks reconfiguration based on source offsets.

This improvement proposal comes from a project that needs to periodically scan directories on a shared storage for detecting and for streaming new files into Kafka.

For instance, here is a straightforward implementation is that could be possible with that feature .

A connector which needs to stream files can use a background-thread to periodically scan directories. When new inputs files are detected a tasks reconfiguration is requested. The connector assigns a file subset to each task.
Each task stores sources offsets for the last sent record. The source offsets data are the size of file, the bytes offset and the bytes size.

A Task becomes idle as soons as all the assigned files are completed.
The connector should be able to track offsets for each assigned file. When all tasks has finished the connector can stop them or assigned new files by requesting tasks reconfiguration.

Finally, another advantage of monitoring source offsets from the connector is to detect slow or failed tasks and if necessary to be able to restart all tasks.

Public Interfaces

We propose to add a new class SourceConnectorContext which implements from the ConnectorContext to provide access to an OffsetStorageReader. This class will delegate the implementation of methods requestTaskReconfiguration and raiseError.

SourceConnectorContext
/**
 * SourceConnectorContext is provided to SourceConnector allow them to track source offsets with the OffsetStorageReader.
 */
public class SourceConnectorContext implements ConnectorContext {

    private final OffsetStorageReader reader;

    /**
     * Delegate context can be standalone or an herder connector context.
     */
    private final ConnectorContext delegateContext;


    public SourceConnectorContext(OffsetStorageReader reader, ConnectorContext delegateContext) {
        this.reader = reader;
        this.delegateContext = delegateContext;
    }

    /**
     * Get the OffsetStorageReader for this SourceConnector.
     */
    public OffsetStorageReader offsetStorageReader() {
        return reader;
    }

    @Override
    public void requestTaskReconfiguration() {
        this.delegateContext.requestTaskReconfiguration();
    }

    @Override
    public void raiseError(Exception e) {
        this.delegateContext.raiseError(e);
    }
}

In addition, we propose to add a new method offsetStorageReader() to the class SourceConnector in order to hide the cast of the ConnectorContext instance:

SourceConnector
/**
 * SourceConnectors implement the connector interface to pull data from another system and send
 * it to Kafka.
 */
public abstract class SourceConnector extends Connector {

    public OffsetStorageReader offsetStorageReader() {
        return ((SourceConnectorContext) context).offsetStorageReader();
    }
}


Proposed Changes

A straightforward first pass is GitHub PR 4794

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels