Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Public Interfaces

We propose to add a two new class SourceConnectorContext which implements from the ConnectorContext to provide access to an OffsetStorageReader. This class will delegate the implementation of methods requestTaskReconfiguration and raiseErrorinterfaces SourceConnectorContext and SinkConnectorContext (similar to SourceTaskContext and SinkTaskContext). Those methods could be used then to expose specifics methods to SourceConnector and SinkConnector.

Code Block
languagejava
titleSourceConnectorContext
/**
 * SourceConnectorContextSinkConnectorContext is provided to SinkConnector SourceConnectorto allow them to track source offsets with the OffsetStorageReaderinteract with the underlying
 * runtime.
 *
 * This interface can be used to add specifics methods to a SinkConnector.
 */
public classinterface SourceConnectorContextSinkConnectorContext implementsextends ConnectorContext {
}
Code Block
languagejava
/**
 * SourceConnectorContext is provided to SourceConnector privateto finalallow OffsetStorageReader reader;

    /**
     * Delegate contextthem to interact with the underlying
 * runtime.
 *
 * This interface can be used standaloneto add orspecifics anmethods herderto connectora contextSourceConnector.
 */
public interface SourceConnectorContext extends */
ConnectorContext {

    /**
     * private final ConnectorContext delegateContext;


Get the OffsetStorageReader for this SourceConnectorContext.
     */
    OffsetStorageReader offsetStorageReader();
}

 

The SourceConnectorContext class will provide access to an OffsetStorageReader.

In addition, we propose to add a new method context() to the Connector class. 

Code Block
languagejava
public ConnectorContext context(public SourceConnectorContext(OffsetStorageReader reader, ConnectorContext delegateContext) {
    return context;
}

 

This method can be overrided by the SourceConnector and SinkConnector classes in order to change the return type to be a subtype.


Code Block
languagejava
/**
 * SinkConnectors this.readerimplement =the reader;
Connector interface to send Kafka data to another this.delegateContext = delegateContext;
    }system.
 */
public abstract class SinkConnector extends Connector {

    /**
     * Get<p>
     * Configuration key for the OffsetStorageReader list of input topics for this SourceConnectorconnector.
     * </p>
     * <p>
    public * OffsetStorageReaderUsually offsetStorageReader() {
        return reader;
    }

this setting is only relevant to the Kafka Connect framework, but is provided here for
     @Override
* the convenience of publicConnector voiddevelopers requestTaskReconfiguration() {
        this.delegateContext.requestTaskReconfiguration();if they also need to know the set of topics.
    }

    @Override * </p>
     */
    public static final String TOPICS_CONFIG = "topics";

    public voidSinkConnectorContext raiseErrorcontext(Exception e) {
        return this.delegateContext.raiseError(eSinkConnectorContext) context;
    }
}

...


}


Code Block
languagejavatitleSourceConnector
/**
 * SourceConnectors implement the connector interface to pull data from another system and send
 * it to Kafka.
 */
public abstract class SourceConnector extends Connector {

    @Override
    public OffsetStorageReaderSourceConnectorContext offsetStorageReadercontext() {
        return ((SourceConnectorContext) context).offsetStorageReader();
    }
}


Proposed Changes

A straightforward first pass is GitHub PR 4794

...