Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion threadhere

JIRAhere

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Zookeeper has advanced low-level primitives for coordinating distributed systems – ephemeral nodes, key-value storage, watchers. Such primitives concepts may not be available in other consensus frameworks. At the same time such low-level primitives (especially ephemeral nodes) are error prone and usually a cause of subtle bugs in Kafka coordination code.

...

Below each module is presented by its interface.

(NOTE: Initial version of the interfaces is in Scala to make it cleaner and shorter. The final version (actual Kafka interfaces) is planned to be written in Java).

 

Code Block
languagescala
titleGroup Membership Protocol
linenumberstrue
collapsetrue
/**
 * A connector for group membership protocol. Supports bothtwo partsmodes:
 * 1) "joining" (becoming the member, leaving the group, subscribing to change notifications)
 * 2) "observing" (fetching group state, subscribing to change notifications)
 *
 */
trait GroupMembershipClient {
  /**
   * Each instance of this class is tightly coupled with exactly one group,
   * once set (during initialization) cannot be changed
   * @return unique group identifier among all application groups
   */
  def group: String

  /**
   * Become a member of this group. Throw an exception in case of ID conflict
   * @param id unique member identifier among members of this group
   * @param data supplemental data to be stored along with member ID
   */
  def join(id: String, data: String): Unit

  /**
   * Stop membership in this group
   * @param id unique member identifier among members of this group
   */
  def leave(id: String): Unit

  /**
   * Fetch membership of this group
   * @return IDs of the members of this group
   */
  def membershipList(): Set[String]

  /**
   * Fetch detailed membership of this group
   * @return IDs and corresponding supplemental data of the members of this group
   */
  def membership(): Map[String, String]

  /**
   * Register Apermanent callbackon firedgroup onchange eventlistener.
   */
 There traitis Listenerno {
guarantee listener   /**
     * Eventwill be fired whenon theALL groupevents membership(due hasto changed (member(s) joined and/or left)
  session reconnects etc)
   * @param membershiplistener new membership of the group
     */
    def onGroupChange(membership: Set[String])
  }see [[GroupChangeListener]]
   */
  def addListener(listener: GroupChangeListener)

  /**
   * Register permanentDeregister on group change listener.
   * There@param islistener no guarantee listener will be fired on ALL events (due to session reconnects etc)
   * @param listener see [[Listener]]see [[GroupChangeListener]]
   */
  def removeListener(listener: GroupChangeListener)

  /**
   * Setup everything needed for concrete implementation
   */
 @param def addListener(listener: Listener)

  /**
   * Deregister on group change listenercontext TBD. Should be abstract enough to be used by different implementations and
   * @param listener see [[Listener]]
   */
  def removeListener(listener: Listener)

  /**
   *at Setupthe everythingsame neededtime forspecific concretebecause implementation
will be uniformly *called @paramfrom contextthe TBD.Kafka Shouldcode be-
 abstract enough to* be used by different implementations and
   *                at the same time specific because will be uniformly called from the Kafka code -
   *                regardless of the implementation
   */
  def init(context: Any): Unit

  /**
   * Release all acquired resources
   */
  def close(): Unit
}
Code Block
languagescala
titleLeader Election
linenumberstrue
collapsetrue

 
/**
  * A connectorcallback forfired leadershipon electiongroup change event
*/
trait GroupChangeListener {
    /**
     * Event fired when the group membership has changed (member(s) joined and/or left)
     * @param membership new membership of the group
     */
    def onGroupChange(membership: Set[String])
}
Code Block
languagescala
titleLeader Election
linenumberstrue
collapsetrue
/**
 * A connector for leadership election protocol. Supports two modes:
 * 1) "running for election" (joining the candidates for leadership, resigning as a leader, subscribing to change notifications)
 * 2) "observing" (getting current leader, subscribing to change notifications)
 *
 */
trait LeaderElectionClient{protocol. Supports both parts:
 * 1) "running for election" (joining the candidates for leadership, resigning as a leader, subscribing to change notifications)
 * 2) "observing" (getting current leader, subscribing to change notifications)
 *
 */
trait LeaderElectionClient{
  /**
   * Each instance of this class is tightly coupled with leadership over exactly one service (resource),
   * once set (during initialization) cannot be changed
   *
   * @return unique group identifier among all application services (resources)
   */
  def service: String

  /**
   * GetEach current leaderinstance of thethis resourceclass (if any)
   * @return the leader id if it existsis tightly coupled with leadership over exactly one service (resource),
   */
  def getLeader: Option[String]

  /* once set (during initialization) cannot be changed
   *
   * Make@return thisunique candidategroup eligibleidentifier foramong leaderall electionapplication and try to obtain leadership for this service if it's vacant
   services (resources)
   */
  def service: String

  /**
   * @paramGet candidatecurrent IDleader of the candidateresource which is eligible for election(if any)
   * @return truethe ifleader givenid candidateif is now a leaderit exists
   */
  def nominate(candidategetLeader: String): BooleanOption[String]

  /**
   * VoluntarilyMake resignthis ascandidate a leader and initiate neweligible for leader election.
   * It's a client responsibilityand try to stopobtain allleadership leader duties before calling for this methodservice toif avoid more-than-one-leader casesit's vacant
   *
   * @param leadercandidate currentID leader ID (will be ignored if notof the candidate which is eligible for election
   * @return true if given candidate is now a leader)
   */
  def resignnominate(leadercandidate: String): UnitBoolean

  /**
   * AVoluntarily callbackresign firedas ona leader changeand event
initiate new leader */election.
  trait Listener {
    /**
     * Event fired when the leader has changed (resigned or acquired a leadership)
     * It's a client responsibility to stop all leader duties before calling this method to avoid more-than-one-leader cases
   *
   * @param leader newcurrent leader forID the(will givenbe serviceignored if onenot has been elected, otherwise None
  a leader)
   */
    def onLeaderChangeresign(leader: Option[String])
:  }Unit

  /**
   * Register permanent on leader change listener
   * There is no guarantee listener will be fired on ALL events (due to session reconnects etc)
   * @param listener see [[ListenerLeaderChangeListener]]
   */
  def addListener(listener: ListenerLeaderChangeListener)

  /**
   * Deregister on leader change listener
   * @param listener see [[ListenerLeaderChangeListener]]
   */
  def removeListener(listener: ListenerLeaderChangeListener)

  /**
   * Setup everything needed for concrete implementation
   * @param context TBD. Should be abstract enough to be used by different implementations and
   *                at the same time specific because will be uniformly called from the Kafka code -
   *                regardless of the implementation
   */
  def init(context: Any): Unit

  /**
   * Release all acquired resources
   */
  def close(): Unit
}
Code Block
languagescala
titleStorage
linenumberstrue
collapsetrue

 
/**
  * InterfaceA tocallback afired (persistent)on keyleader valuechange storageevent
 */
trait StorageLeaderChangeListener {
    /**
     * GetEvent datafired bywhen itsthe key
leader has changed *(resigned @paramor keyacquired dataa IDleadership)
 in this storage
  * *@param @returnleader futurenew resultleader offor the given valueservice (if exists) associated with the key
one has been elected, otherwise None
     */
    def fetch(key: String)onLeaderChange(leader: Future[Option[String])
}
Code Block
languagescala
titleStorage
linenumberstrue
collapsetrue
]

  /**
   * PersistInterface valueto with its associateda (persistent) key. Thevalue contract is to throw an exceptionstorage
 */
trait Storage {
  /**
   * ifGet suchdata keyby already exists
   *its key
   * @param key data ID in this storage
   * @param data value @return future result of the value (if exists) associated with the key
   */
  def putfetch(key: String, data): String)Future[Option[String]]

  /**
   * UpdatePersist value bywith its associated key. The contract is to throw an exception
   * if such key doesn'talready existexists
   *
   * @param key data ID in this storage
   * @param data value associated with the key
   */
  def updateput(key: String, data: String)

  /**
   * SetupUpdate everythingvalue neededby forits concrete implementation
   * @param context TBD. Should be abstract enough to be used by different implementations andassociated key. The contract is to throw an exception
   * if such key doesn't exist
   *
   * @param key data ID atin thethis samestorage
 time specific because* will@param bedata uniformlyvalue calledassociated fromwith the Kafka code -key
   */
  def update(key: String, data: String)

  /**
   * Setup everything needed for regardless of theconcrete implementation
   */
  def@param init(context: Any): Unit

  /**
   * Release all acquired resourcesTBD. Should be abstract enough to be used by different implementations and
   */
        def close(): Unit
}
Code Block
languagescala
titleListener Registry
linenumberstrue
collapsetrue
/**
 * A registry for async dataat changethe notifications
 */
trait ListenerRegistry {
  /**
   * Register permanent callback for data change event
   * @param key the listenable data identifier
   * @param eventListener see [[EventListener]]
   * @tparam T type of the data IDsame time specific because will be uniformly called from the Kafka code -
   *                regardless of the implementation
   */
  def addListener[T](key: T, eventListener: EventListenerinit(context: Any): Unit

  /**
   * DeregisterRelease permanentall callbackacquired forresources
 data change event*/
  def close(): Unit
}
Code Block
languagescala
titleListener Registry
linenumberstrue
collapsetrue
/**
 * @paramA keyregistry thefor listenableasync data change identifiernotifications
 */
trait ListenerRegistry {
  /**
 @param eventListener see [[EventListener]] * Register permanent callback for data change event
   * @tparam T type of the data ID @param key the listenable data identifier
   * @param eventListener see [[ValueChangeListener]]
   */
  def removeLister[T]addValueChangeListener(key: TString, eventListener: EventListenerValueChangeListener): Unit

  /**
   * SetupDeregister everythingpermanent neededcallback for data concretechange implementationevent
   * @param contextkey TBD.the Shouldlistenable bedata abstractidentifier
 enough to be* used@param byeventListener different implementations andsee [[EventListener]]
   */
  def removeValueChangeListener(key: String, eventListener: ValueChangeListener): Unit
 
  /**
   * Register permanent callback atfor thekey-set samechange timeevent
 specific because will* be uniformly called from@param namespace the Kafka code,
   *                regardless of the implementation
   *listenable key-set identifier (e.g. parent path in Zookeeper, table name in Database etc)
   * @param eventListener see [[ValueChangeListener]]
   */
  def init(contextaddKeySetChangeListener(namespace: String, eventListener: AnyKeySetChangeListener): Unit

  /**
   * ReleaseDeregister allpermanent acquiredcallback resources
for key-set change */event
  def close(): Unit
}

/**
 *@param Basenamespace classthe forlistenable change events
 */
sealed trait Event

/**
 * Event fired if the listenable data state has changed. In terms of k-v storage - if the
 * value associated with the listenable key has changed
 * @tparam T type
 */
trait DataChangeEvent[T] extends Event {
  /**key-set identifier (e.g. parent path in Zookeeper, table name in Database etc)
   * @param eventListener see [[ValueChangeListener]]
   */
  def removeKeySetChangeListener(namespace: String, eventListener: KeySetChangeListener): Unit

  /**
   * Setup everything needed for concrete implementation
   * @return@param newcontext dataTBD. state
Should be abstract */
enough to defbe data: T
}

/**
 * Event fired if the listenable collection of data has changed. In terms of k-v storage - if the
 * key set in the particular namespace has changed
 * @tparam T collection data type
 */
trait CollectionChangeEvent[T] extends Event {
  /**
   * @return new collection statesused by different implementations and
   *                at the same time specific because will be uniformly called from the Kafka code,
   *                regardless of the implementation
   */
  def collection: Set[T]init(context: Any): Unit

  /**
   * Release all acquired resources
   */
  def close(): Unit
}

/**
 * ACallback callbackon firedvalue onchange event
 */
trait EventListenerValueChangeListener {
  def onEventvalueChanged(eventnewValue: EventOption[String])
}

/**
 * Callback on key-set change event
 */
trait KeySetChangeListener {
  def keySetChanged(newKeySet: Set[String])
}

 

Compatibility, Deprecation, and Migration Plan

Shared interface for plugable consensus and metadata storage systems should be compatible for Zookeeper-based implementation. Also this implementation will likely be the default one.

As part of this KIP it will be required to rework some system and replication tools. It will not be possible anymore to rely on Zookeeper as a default metadata storage system, also it will not be possible to use it to trigger particular administrative commands. Most of the tools are related to topic management (create topics, reassign partitions etc) and consumer group management (offset checker etc).

The approach to topic tools is covered in KIP-4 - we will move all administrative logic to brokers. KIP-4 is currently under development and has agreed Wire Protocol changes.

The consumer group tools should be covered separately. Having New Java Consumer in 0.9 release with server-side coordinator may let us deprecate old consumer and thus all tools related to it. Consumer group tools should work as usual if brokers are run with Zookeeper based implementation of the shared interfaceThe use of zookeeper.connect in server.properties will still be honored. In the code we should have the default value of the new configuration.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

...