Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Below each module is presented by its interface.

(NOTE: )

 

Code Block
languagescala
titleGroup Membership Protocol
linenumberstrue
collapsetrue
/**
 * A connector for group membership protocol. Supports two modes:
 * 1) "joining" (becoming the member, leaving the group, subscribing to change notifications)
 * 2) "observing" (fetching group state, subscribing to change notifications)
 *
 */
trait GroupMembershipClient {
  /**
   * Each instance of this class is tightly coupled with exactly one group,
   * once set (during initialization) cannot be changed
   * @return unique group identifier among all application groups
   */
  def group: String

  /**
   * Become a member of this group. Throw an exception in case of ID conflict
   * @param id unique member identifier among members of this group
   * @param data supplemental data to be stored along with member ID
   */
  def join(id: String, data: String): Unit

  /**
   * Stop membership in this group
   * @param id unique member identifier among members of this group
   */
  def leave(id: String): Unit

  /**
   * Fetch membership of this group
   * @return IDs of the members of this group
   */
  def membershipList(): Set[String]

  /**
   * Fetch detailed membership of this group
   * @return IDs and corresponding supplemental data of the members of this group
   */
  def membership(): Map[String, String]

  /**
   * Register Apermanent callbackon firedgroup onchange eventlistener.
   */
 There traitis Listenerno {
guarantee listener will  /**
     * Event fired when the group membership has changed (member(s) joined and/or left)
     * @param membership new membership of the group
     */
    def onGroupChange(membership: Set[String])
  }be fired on ALL events (due to session reconnects etc)
   * @param listener see [[GroupChangeListener]]
   */
  def addListener(listener: GroupChangeListener)

  /**
   * Register permanentDeregister on group change listener.
   * There is no guarantee listener will be fired on ALL events (due to session reconnects etc)
   * @param listener @param listener see [[ListenerGroupChangeListener]]
   */
  def addListenerremoveListener(listener: ListenerGroupChangeListener)

  /**
   * Setup Deregistereverything onneeded groupfor changeconcrete listenerimplementation
   * @param listenercontext see [[Listener]]
   */
  def removeListener(listener: Listener)

  /**
   * Setup everything needed for concrete implementation
   * @param context TBD. TBD. Should be abstract enough to be used by different implementations and
   *                at the same time specific because will be uniformly called from the Kafka code -
   *                regardless of the implementation
   */
  def init(context: Any): Unit

  /**
   * Release all acquired resources
   */
  def close(): Unit
}
 
/**
  * A callback fired on group change event
*/
trait GroupChangeListener {
    /**
     * Event fired when the group membership has changed (member(s) joined and/or left)
     * @param membership new membership of the group
     */
    def onGroupChange(membership: Set[String])
}
Code Block
languagescala
titleLeader Election
linenumberstrue
collapsetrue
/**
 * A connector for leadership election protocol. Supports two modes:
 * 1) "running for election" (joining the candidates for leadership, resigning as a leader, subscribing to change notifications)
 * 2) "observing" (getting current leader, subscribing to change notifications)
 *
 */
trait LeaderElectionClient{
  /**
   * Each instance of this class is tightly coupled with leadership over exactly one service (resource),
   * once set (during initialization) cannot be changed
   *
   * @return unique group identifier among all application services (resources)
   */
  def service: String

  /**
   * Get current leader of the resource (if any)
   * @return the leader id if it exists
   */
  def getLeader: Option[String]

  /**
   * Make this candidate eligible for leader election and try to obtain leadership for this service if it's vacant
   *
   * @param candidate ID of the candidate which is eligible for election
   * @return true if given candidate is now a leader
   */
  def nominate(candidate: String): Boolean

  /**
   * Voluntarily resign as a leader and initiate new leader election.
   * It's a client responsibility to stop all leader duties before calling this method to avoid more-than-one-leader cases
   *
   * @param leader current leader ID (will be ignored if not a leader)
   */
  def resign(leader: String): Unit

  /**
   * Register permanent on leader change listener
   * There is no guarantee listener will be fired */
on ALL defevents resign(leader: String): Unit

  /**due to session reconnects etc)
   * A@param callbacklistener fired on leader change eventsee [[LeaderChangeListener]]
   */
  trait Listener {def addListener(listener: LeaderChangeListener)

    /**
     * Event fired* whenDeregister theon leader haschange changedlistener
 (resigned or acquired* a@param leadership)
listener see [[LeaderChangeListener]]
   */
 @param leader new leader for the given service if one has been elected, otherwise None
     */
    def onLeaderChange(leader: Option[String])
  }

  /**def removeListener(listener: LeaderChangeListener)

  /**
   * Setup everything needed for concrete implementation
   * @param context TBD. Should be abstract enough to be used by different implementations and
   * Register permanent on leader change listener
   * There is no guarantee listener will be fired on ALL events (due to session reconnects etc)
   * @param listener see [[Listener]]at the same time specific because will be uniformly called from the Kafka code -
   */
  def addListener(listener: Listener)
  /**
   * Deregister on leader change listener
  regardless * @param listener see [[Listener]]of the implementation
   */
  def removeListenerinit(listenercontext: ListenerAny): Unit

  /**
   * SetupRelease everythingall needed for concrete implementationacquired resources
   */
 @param context TBD. Should be abstract enough to be used by different implementations and
 def close(): Unit
}
 
/**
  * A callback fired on leader change event
*/
trait LeaderChangeListener {
    /**
   at the same* timeEvent specific because will be uniformly called fromfired when the Kafkaleader codehas -
changed (resigned or *       acquired a leadership)
     * @param leader new regardlessleader offor the implementation
given service if */
one has def init(context: Any): Unit
been elected, otherwise None
  /**
   * Release all acquired resources
 /
  */
  def closeonLeaderChange()leader: UnitOption[String])
}
Code Block
languagescala
titleStorage
linenumberstrue
collapsetrue
/**
 * Interface to a (persistent) key value storage
 */
trait Storage {
  /**
   * Get data by its key
   * @param key data ID in this storage
   * @return future result of the value (if exists) associated with the key
   */
  def fetch(key: String): Future[Option[String]]

  /**
   * Persist value with its associated key. The contract is to throw an exception
   * if such key already exists
   *
   * @param key data ID in this storage
   * @param data value associated with the key
   */
  def put(key: String, data: String)

  /**
   * Update value by its associated key. The contract is to throw an exception
   * if such key doesn't exist
   *
   * @param key data ID in this storage
   * @param data value associated with the key
   */
  def update(key: String, data: String)

  /**
   * Setup everything needed for concrete implementation
   * @param context TBD. Should be abstract enough to be used by different implementations and
   *                at the same time specific because will be uniformly called from the Kafka code -
   *                regardless of the implementation
   */
  def init(context: Any): Unit

  /**
   * Release all acquired resources
   */
  def close(): Unit
}

...