Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Below each module is presented by its interfacesinterface.

 

Code Block
languagescala
titleGroup Membership Protocol
linenumberstrue
collapsetrue
/**
 * A connector for group membership protocol. Supports both parts:
 * 1) "joining" (becoming the member, leaving the group, subscribing to change notifications)
 * 2) "observing" (fetching group state, subscribing to change notifications)
 *
 * @tparam K type of the member ID - unique identifier among members of this group
 * @tparam V type of the additional data that comes with ID
 */
trait GroupMembershipClient[K, V] {
  /**
   * Each instance of this class is tightly coupled with exactly one group,
   * once set (during initialization) cannot be changed
   * @return unique group identifier among all application groups
   */
  def group: String

  /**
   * Become a member of this group. Throw an exception in case of ID conflict
   * @param id unique member identifier among members of this group
   * @param data supplemental data to be stored along with member ID
   */
  def join(id: KString, data: VString): Unit

  /**
   * Stop membership in this group
   * @param id unique member identifier among members of this group
   */
  def leave(id: KString): Unit

  /**
   * Fetch membership of this group
   * @return IDs of the members of this group
   */
  def membershipList(): Set[KString]

  /**
   * Fetch detailed membership of this group
   * @return IDs and corresponding supplemental data of the members of this group
   */
  def membership(): Map[KString, VString]

  /**
   * A callback fired on event
   */
  trait Listener {
    /**
     * Event fired when the group membership has changed (member(s) joined and/or left)
     * @param membership new membership of the group
     */
    def onGroupChange(membership: Set[KString])
  }

  /**
   * Register permanent on group change listener.
   * There is no guarantee listener will be fired on ALL events (due to session reconnects etc)
   * @param listener see [[Listener]]
   */
  def addListener(listener: Listener)

  /**
   * Deregister on group change listener
   * @param listener see [[Listener]]
   */
  def removeListener(listener: Listener)

  /**
   * Setup everything needed for concrete implementation
   * @param context TBD. Should be abstract enough to be used by different implementations and
   *                at the same time specific because will be uniformly called from the Kafka code -
   *                regardless of the implementation
   */
  def init(context: Any): Unit

  /**
   * Release all acquired resources
   */
  def close(): Unit
}
Code Block
languagescala
titleLeader Election
linenumberstrue
collapsetrue
/**
 * A connector for leadership election protocol. Supports both parts:
 * 1) "running for election" (joining the candidates for leadership, resigning as a leader, subscribing to change notifications)
 * 2) "observing" (getting current leader, subscribing to change notifications)
 *
 * @tparam K type of the candidate ID - unique identifier among candidates participating in leader election *
 */
trait LeaderElectionClient[K]{
  /**
   * Each instance of this class is tightly coupled with leadership over exactly one service (resource),
   * once set (during initialization) cannot be changed
   *
   * @return unique group identifier among all application services (resources)
   */
  def service: String

  /**
   * Get Fetchcurrent leader of thisthe serviceresource until one is elected(if any)
   * @return futurethe resultleader ofid theif leaderit IDexists
   */
  def getLeader: FutureOption[KString]

  /**
   * Make this candidate eligible for leader election and try to obtain leadership for this service if it's vacant
   *
   * @param candidate ID of the candidate which is eligible for election
   * @return future result of the leader election true if given candidate is now a leader
   */
  def nominate(candidate: KString): Future[K]Boolean

  /**
   * Voluntarily resign as a leader and initiate new leader election.
   * It's a client responsibility to stop all leader duties before calling this method to avoid more-than-one-leader cases
   *
   * @param leader current leader ID (will be ignored if not a leader)
   * @return future result of the leader election
   */
  def resign(leader: KString): Future[K]Unit

  /**
   * A callback fired on leader change event
   */
  trait Listener {
    /**
     * Event fired when the leader has changed and(resigned theor newacquired one has been electeda leadership)
     * @param leader new leader for the given servicefor the given service if one has been elected, otherwise None
     */
    def onLeaderChange(leader: KOption[String])
  }

  /**
   * Register permanent on leader change listener
   * There is no guarantee listener will be fired on ALL events (due to session reconnects etc)
   * @param listener see [[Listener]]
   */
  def addListener(listener: Listener)
  /**
   * Deregister on leader change listener
   * @param listener see [[Listener]]
   */
  def removeListener(listener: Listener)

  /**
   * Setup everything needed for concrete implementation
   * @param context TBD. Should be abstract enough to be used by different implementations and
   *                at the same time specific because will be uniformly called from the Kafka code -
   *                regardless of the implementation
   */
  def init(context: Any): Unit

  /**
   * Release all acquired resources
   */
  def close(): Unit
}
Code Block
languagescala
titleStorage
linenumberstrue
collapsetrue
/**
 * Interface to a (persistent) key value storage
 * @tparam K type of data key
 * @tparam V type of the fetched data
 */
trait Storage[K, V] {
  /**
   * Get data by its key
   * @param key data ID in this storage
   * @return future result of the value (if exists) associated with the key
   */
  def fetch(key: KString): Future[Option[VString]]

  /**
   * Persist value with its associated key. The contract is to throw an exception
   * if such key already exists
   *
   * @param key data ID in this storage
   * @param data value associated with the key
   */
  def put(key: KString, data: VString)

  /**
   * Update value by its associated key. The contract is to throw an exception
   * if such key doesn't exist
   *
   * @param key data ID in this storage
   * @param data value associated with the key
   */
  def update(key: KString, data: VString)

  /**
   * Setup everything needed for concrete implementation
   * @param context TBD. Should be abstract enough to be used by different implementations and
   *                at the same time specific because will be uniformly called from the Kafka code -
   *                regardless of the implementation
   */
  def init(context: Any): Unit

  /**
   * Release all acquired resources
   */
  def close(): Unit
}

...