Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Below each module is presented by its interface.

(NOTE: Initial version of the interfaces is in Scala to make it cleaner and shorter. The final version (actual Kafka interfaces) is planned to be written in Java).

 

Code Block
languagescala
titleGroup Membership Protocol
linenumberstrue
collapsetrue
/**
 * A connector for group membership protocol. Supports two modes:
 * 1) "joining" (becoming the member, leaving the group, subscribing to change notifications)
 * 2) "observing" (fetching group state, subscribing to change notifications)
 *
 */
trait GroupMembershipClient {
  /**
   * Each instance of this class is tightly coupled with exactly one group,
   * once set (during initialization) cannot be changed
   * @return unique group identifier among all application groups
   */
  def group: String

  /**
   * Become a member of this group. Throw an exception in case of ID conflict
   * @param id unique member identifier among members of this group
   * @param data supplemental data to be stored along with member ID
   */
  def join(id: String, data: String): Unit

  /**
   * Stop membership in this group
   * @param id unique member identifier among members of this group
   */
  def leave(id: String): Unit

  /**
   * Fetch membership of this group
   * @return IDs of the members of this group
   */
  def membershipList(): Set[String]

  /**
   * Fetch detailed membership of this group
   * @return IDs and corresponding supplemental data of the members of this group
   */
  def membership(): Map[String, String]

  /**
   * Register permanent on group change listener.
   * There is no guarantee listener will be fired on ALL events (due to session reconnects etc)
   * @param listener see [[GroupChangeListener]]
   */
  def addListener(listener: GroupChangeListener)

  /**
   * Deregister on group change listener
   * @param listener see [[GroupChangeListener]]
   */
  def removeListener(listener: GroupChangeListener)

  /**
   * Setup everything needed for concrete implementation
   * @param context TBD. Should be abstract enough to be used by different implementations and
   *                at the same time specific because will be uniformly called from the Kafka code -
   *                regardless of the implementation
   */
  def init(context: Any): Unit

  /**
   * Release all acquired resources
   */
  def close(): Unit
}
 
/**
  * A callback fired on group change event
*/
trait GroupChangeListener {
    /**
     * Event fired when the group membership has changed (member(s) joined and/or left)
     * @param membership new membership of the group
     */
    def onGroupChange(membership: Set[String])
}

...