Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagescala
titleGroup Membership Protocol
linenumberstrue
collapsetrue
/**
 * A connector for group membership protocol. Supports bothtwo partsmodes:
 * 1) "joining" (becoming the member, leaving the group, subscribing to change notifications)
 * 2) "observing" (fetching group state, subscribing to change notifications)
 *
 */
trait GroupMembershipClient {
  /**
   * Each instance of this class is tightly coupled with exactly one group,
   * once set (during initialization) cannot be changed
   * @return unique group identifier among all application groups
   */
  def group: String

  /**
   * Become a member of this group. Throw an exception in case of ID conflict
   * @param id unique member identifier among members of this group
   * @param data supplemental data to be stored along with member ID
   */
  def join(id: String, data: String): Unit

  /**
   * Stop membership in this group
   * @param id unique member identifier among members of this group
   */
  def leave(id: String): Unit

  /**
   * Fetch membership of this group
   * @return IDs of the members of this group
   */
  def membershipList(): Set[String]

  /**
   * Fetch detailed membership of this group
   * @return IDs and corresponding supplemental data of the members of this group
   */
  def membership(): Map[String, String]

  /**
   * A callback fired on event
   */
  trait Listener {
    /**
     * Event fired when the group membership has changed (member(s) joined and/or left)
     * @param membership new membership of the group
     */
    def onGroupChange(membership: Set[String])
  }

  /**
   * Register permanent on group change listener.
   * There is no guarantee listener will be fired on ALL events (due to session reconnects etc)
   * @param listener see [[Listener]]
   */
  def addListener(listener: Listener)

  /**
   * Deregister on group change listener
   * @param listener see [[Listener]]
   */
  def removeListener(listener: Listener)

  /**
   * Setup everything needed for concrete implementation
   * @param context TBD. Should be abstract enough to be used by different implementations and
   *                at the same time specific because will be uniformly called from the Kafka code -
   *                regardless of the implementation
   */
  def init(context: Any): Unit

  /**
   * Release all acquired resources
   */
  def close(): Unit
}
Code Block
languagescala
titleLeader Election
linenumberstrue
collapsetrue
/**
 * A connector for leadership election protocol. Supports bothtwo partsmodes:
 * 1) "running for election" (joining the candidates for leadership, resigning as a leader, subscribing to change notifications)
 * 2) "observing" (getting current leader, subscribing to change notifications)
 *
 */
trait LeaderElectionClient{
  /**
   * Each instance of this class is tightly coupled with leadership over exactly one service (resource),
   * once set (during initialization) cannot be changed
   *
   * @return unique group identifier among all application services (resources)
   */
  def service: String

  /**
   * Get current leader of the resource (if any)
   * @return the leader id if it exists
   */
  def getLeader: Option[String]

  /**
   * Make this candidate eligible for leader election and try to obtain leadership for this service if it's vacant
   *
   * @param candidate ID of the candidate which is eligible for election
   * @return true if given candidate is now a leader
   */
  def nominate(candidate: String): Boolean

  /**
   * Voluntarily resign as a leader and initiate new leader election.
   * It's a client responsibility to stop all leader duties before calling this method to avoid more-than-one-leader cases
   *
   * @param leader current leader ID (will be ignored if not a leader)
   */
  def resign(leader: String): Unit

  /**
   * A callback fired on leader change event
   */
  trait Listener {
    /**
     * Event fired when the leader has changed (resigned or acquired a leadership)
     * @param leader new leader for the given service if one has been elected, otherwise None
     */
    def onLeaderChange(leader: Option[String])
  }

  /**
   * Register permanent on leader change listener
   * There is no guarantee listener will be fired on ALL events (due to session reconnects etc)
   * @param listener see [[Listener]]
   */
  def addListener(listener: Listener)
  /**
   * Deregister on leader change listener
   * @param listener see [[Listener]]
   */
  def removeListener(listener: Listener)

  /**
   * Setup everything needed for concrete implementation
   * @param context TBD. Should be abstract enough to be used by different implementations and
   *                at the same time specific because will be uniformly called from the Kafka code -
   *                regardless of the implementation
   */
  def init(context: Any): Unit

  /**
   * Release all acquired resources
   */
  def close(): Unit
}

...

Code Block
languagescala
titleListener Registry
linenumberstrue
collapsetrue
/**
 * A registry for async data change notifications
 */
trait ListenerRegistry {
  /**
   * Register permanent callback for data change event
   * @param key the listenable data identifier
   * @param eventListener see [[EventListenerValueChangeListener]]
   * @tparam T type of the data ID
   */
  def addListener[T]addValueChangeListener(key: TString, eventListener: EventListenerValueChangeListener): Unit

  /**
   * Deregister permanent callback for data change event
   * @param key the listenable data identifier
   * @param eventListener see [[EventListener]]
   * @tparam T type of the data ID
   */
  def removeLister[T]removeValueChangeListener(key: TString, eventListener: EventListenerValueChangeListener): Unit
 
  /**
   * SetupRegister everythingpermanent neededcallback for concrete implementationkey-set change event
   * @param context TBD. Should be abstract enough to be used by different implementations andnamespace the listenable key-set identifier (e.g. parent path in Zookeeper, table name in Database etc)
   * @param eventListener see [[ValueChangeListener]]
   */
  def addKeySetChangeListener(namespace:      at the same time specific because will be uniformly called from the Kafka code,
   *                regardless of the implementation
   */
  def init(context: Any): Unit

  /**String, eventListener: KeySetChangeListener): Unit

  /**
   * Deregister permanent callback for key-set change event
   * @param namespace the listenable key-set identifier (e.g. parent path in Zookeeper, table name in Database etc)
   * Release@param alleventListener acquired resourcessee [[ValueChangeListener]]
   */
  def close(removeKeySetChangeListener(namespace: String, eventListener: KeySetChangeListener): Unit
}

  /**
 * Base class for change events
 */
sealed trait Event

/**
 * Event fired if the listenable data state has changed. In terms of k-v storage - if the
 * value associated with the listenable key has changed
 * @tparam T type
 */
trait DataChangeEvent[T] extends Event {
  /**
   * @return new data state
   */
  def data: T
}

/**
 * Event fired if the listenable collection of data has changed. In terms of k-v storage - if the
 * key set in the particular namespace has changed
 * @tparam T collection data type
 */
trait CollectionChangeEvent[T] extends Event {
  /**
   * @return new collection states
   */
  def collection: Set[T]Setup everything needed for concrete implementation
   * @param context TBD. Should be abstract enough to be used by different implementations and
   *                at the same time specific because will be uniformly called from the Kafka code,
   *                regardless of the implementation
   */
  def init(context: Any): Unit

  /**
   * Release all acquired resources
   */
  def close(): Unit
}

/**
 * Callback on value change event
 */
trait ValueChangeListener {
  def valueChanged(newValue: Option[String])
}

/**
 * ACallback callbackon firedkey-set onchange event
 */
trait EventListenerKeySetChangeListener {
  def onEventkeySetChanged(eventnewKeySet: EventSet[String])
}

 

Compatibility, Deprecation, and Migration Plan

...