Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion threadhere

JIRAhere

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Below each module is presented by its interface.

(NOTE: Initial version of the interfaces is in Scala to make it cleaner and shorter. The final version (actual Kafka interfaces) is planned to be written in Java).

 

Code Block
languagescala
titleGroup Membership Protocol
linenumberstrue
collapsetrue
/**
 * A connector for group membership protocol. Supports two modes:
 * 1) "joining" (becoming the member, leaving the group, subscribing to change notifications)
 * 2) "observing" (fetching group state, subscribing to change notifications)
 *
 */
trait GroupMembershipClient {
  /**
   * Each instance of this class is tightly coupled with exactly one group,
   * once set (during initialization) cannot be changed
   * @return unique group identifier among all application groups
   */
  def group: String

  /**
   * Become a member of this group. Throw an exception in case of ID conflict
   * @param id unique member identifier among members of this group
   * @param data supplemental data to be stored along with member ID
   */
  def join(id: String, data: String): Unit

  /**
   * Stop membership in this group
   * @param id unique member identifier among members of this group
   */
  def leave(id: String): Unit

  /**
   * Fetch membership of this group
   * @return IDs of the members of this group
   */
  def membershipList(): Set[String]

  /**
   * Fetch detailed membership of this group
   * @return IDs and corresponding supplemental data of the members of this group
   */
  def membership(): Map[String, String]

  /**
   * Register Apermanent callbackon firedgroup onchange eventlistener.
   */
 There traitis Listenerno {
guarantee listener will be /**
fired on ALL events (due *to Eventsession fired when the group membership has changed (member(s) joined and/or left)
  reconnects etc)
   * @param membershiplistener new membership of the group
  see [[GroupChangeListener]]
   */
    def onGroupChangeaddListener(membershiplistener: Set[String])
  }GroupChangeListener)

  /**
   * Register permanentDeregister on group change listener.
   * There is no guarantee listener will be fired on ALL events (due to session reconnects etc)
   * @param listener see [[ListenerGroupChangeListener]]
   */
  def addListenerremoveListener(listener: ListenerGroupChangeListener)

  /**
   * Deregister on group change listener
   * @param listener see [[Listener]]
   */
  def removeListener(listener: Listener)

  /**
   * Setup everythingSetup everything needed for concrete implementation
   * @param context TBD. Should be abstract enough to be used by different implementations and
   *                at the same time specific because will be uniformly called from the Kafka code -
   *                regardless of the implementation
   */
  def init(context: Any): Unit

  /**
   * Release all acquired resources
   */
  def close(): Unit
}
Code Block
languagescala
titleLeader Election
linenumberstrue
collapsetrue

 
/**
  * A connectorcallback forfired leadershipon electiongroup change event
*/
trait GroupChangeListener {
    /**
     * Event fired when the group membership has changed (member(s) joined and/or left)
     * @param membership new membership of the group
     */
    def onGroupChange(membership: Set[String])
}
Code Block
languagescala
titleLeader Election
linenumberstrue
collapsetrue
/**
 * A connector for leadership election protocol. Supports two modes:
 * 1) "running for election" (joining the candidates for leadership, resigning as a leader, subscribing to change notifications)
 * 2) "observing" (getting current leader, subscribing to change notifications)
 *
 */
trait LeaderElectionClient{protocol. Supports two modes:
 * 1) "running for election" (joining the candidates for leadership, resigning as a leader, subscribing to change notifications)
 * 2) "observing" (getting current leader, subscribing to change notifications)
 *
 */
trait LeaderElectionClient{
  /**
   * Each instance of this class is tightly coupled with leadership over exactly one service (resource),
   * once set (during initialization) cannot be changed
   *
   * @return unique group identifier among all application services (resources)
   */
  def service: String

  /**
   * GetEach currentinstance leaderof ofthis theclass resourceis (if any)
   * @return the leader id if it exists
   */
  def getLeader: Option[String]

  /*tightly coupled with leadership over exactly one service (resource),
   * once set (during initialization) cannot be changed
   *
   * Make@return thisunique candidategroup eligibleidentifier foramong leaderall electionapplication and try to obtain leadership for this service if it's vacant
   services (resources)
   */
  def service: String

  /**
   * @paramGet candidatecurrent IDleader of the candidateresource which is eligible for election(if any)
   * @return truethe ifleader givenid candidateif is now a leaderit exists
   */
  def nominate(candidategetLeader: String): BooleanOption[String]

  /**
   * Make Voluntarilythis resigncandidate aseligible afor leader election and try initiateto newobtain leaderleadership election.
for this service *if Itit's avacant
 client responsibility to*
 stop all leader* duties@param beforecandidate callingID thisof methodthe tocandidate avoid more-than-one-leader cases
   *which is eligible for election
   * @param leader current leader ID (will be ignored if not@return true if given candidate is now a leader)
   */
  def resignnominate(leadercandidate: String): UnitBoolean

  /**
   * AVoluntarily callbackresign firedas ona leader changeand event
initiate new  */
  trait Listener {
 leader election.
   /**
 It's a client responsibility *to Eventstop firedall whenleader theduties leaderbefore hascalling changedthis (resignedmethod orto acquired a leadership)
  avoid more-than-one-leader cases
   *
   * @param leader newcurrent leader forID the(will givenbe serviceignored if onenot has been elected, otherwise None
  a leader)
   */
    def onLeaderChangeresign(leader: Option[String])
:  }Unit

  /**
   * Register permanent on leader change listener
   * There is no guarantee listener will be fired on ALL events (due to session reconnects etc) reconnects etc)
   * @param listener see [[LeaderChangeListener]]
   */
  def addListener(listener: LeaderChangeListener)

  /**
   * Deregister on leader change listener
   * @param listener see [[ListenerLeaderChangeListener]]
   */
  def addListenerremoveListener(listener: Listener)
  /**LeaderChangeListener)

  /**
   * Setup everything needed for concrete implementation
   * @param context TBD. Should be abstract enough to be used by different implementations and
   * Deregister on leader change listener
   * @param listener see [[Listener]]
   */
 at def removeListener(listener: Listener)

  /**
   * Setup everything needed for concrete implementationthe same time specific because will be uniformly called from the Kafka code -
   *      @param context TBD. Should be abstract enough to be used byregardless differentof implementationsthe andimplementation
   */
  def init(context: Any): Unit

  /**
   * Release all acquired resources
   */
 at the same time specific because will be uniformly called from the Kafka code -
   *def close(): Unit
}
 
/**
  * A callback fired on leader change event
*/
trait LeaderChangeListener {
    /**
     * Event fired when the leader has changed (resigned or acquired regardlessa ofleadership)
 the implementation
   */
 @param leader def init(context: Any): Unit

  /**
   * Release all acquired resources
new leader for the given service if one has been elected, otherwise None
     */
    def closeonLeaderChange()leader: UnitOption[String])
}
Code Block
languagescala
titleStorage
linenumberstrue
collapsetrue
/**
 * Interface to a (persistent) key value storage
 */
trait Storage {
  /**
   * Get data by its key
   * @param key data ID in this storage
   * @return future result of the value (if exists) associated with the key
   */
  def fetch(key: String): Future[Option[String]]

  /**
   * Persist value with its associated key. The contract is to throw an exception
   * if such key already exists
   *
   * @param key data ID in this storage
   * @param data value associated with the key
   */
  def put(key: String, data: String)

  /**
   * Update value by its associated key. The contract is to throw an exception
   * if such key doesn't exist
   *
   * @param key data ID in this storage
   * @param data value associated with the key
   */
  def update(key: String, data: String)

  /**
   * Setup everything needed for concrete implementation
   * @param context TBD. Should be abstract enough to be used by different implementations and
   *                at the same time specific because will be uniformly called from the Kafka code -
   *                regardless of the implementation
   */
  def init(context: Any): Unit

  /**
   * Release all acquired resources
   */
  def close(): Unit
}
Code Block
languagescala
titleListener Registry
linenumberstrue
collapsetrue
/**
 * A registry for async data change notifications
 */
trait ListenerRegistry {
  /**
   * Register permanent callback for data change event
   * @param key the listenable data identifier
   * @param eventListener see [[ValueChangeListener]]
   */
  def addValueChangeListener(key: String, eventListener: ValueChangeListener): Unit

  /**
   * Deregister permanent callback for data change event
   * @param key the listenable data identifier
   * @param eventListener see [[EventListener]]
   * @tparam T type of the data ID
   */
  def removeValueChangeListener(key: String, eventListener: ValueChangeListener): Unit
 
  /**
   * Register permanent callback for key-set change event
   * @param namespace the listenable key-set identifier (e.g. parent path in Zookeeper, table name in Database etc)
   * @param eventListener see [[ValueChangeListener]]
   */
  def addKeySetChangeListener(namespace: String, eventListener: KeySetChangeListener): Unit

  /**
   * Deregister permanent callback for key-set change event
   * @param namespace the listenable key-set identifier (e.g. parent path in Zookeeper, table name in Database etc)
   * @param eventListener see [[ValueChangeListener]]
   */
  def removeKeySetChangeListener(namespace: String, eventListener: KeySetChangeListener): Unit

  /**
   * Setup everything needed for concrete implementation
   * @param context TBD. Should be abstract enough to be used by different implementations and
   *                at the same time specific because will be uniformly called from the Kafka code,
   *                regardless of the implementation
   */
  def init(context: Any): Unit

  /**
   * Release all acquired resources
   */
  def close(): Unit
}

/**
 * Callback on value change event
 */
trait ValueChangeListener {
  def valueChanged(newValue: Option[String])
}

/**
 * Callback on key-set change event
 */
trait KeySetChangeListener {
  def keySetChanged(newKeySet: Set[String])
}

...

As part of this KIP it will be required to rework some system and replication tools. It will not be possible anymore to rely on Zookeeper as a default metadata storage system, also it will also not be possible to use it to trigger particular administrative commands. Most of the tools are related to topic management (create topics, reassign partitions etc) and consumer group management (offset checker etc).

...

The consumer group tools should be covered separately. Having New Java Consumer in 0.9 release with server-side coordinator may let us deprecate old consumer and thus all tools related with to it. Consumer group tools should work as usual if brokers are run with Zookeeper based implementation of the shared interface.

...