...
Below each module is presented by its interfacesinterface.
Code Block |
---|
language | scala |
---|
title | Group Membership Protocol |
---|
linenumbers | true |
---|
collapse | true |
---|
|
/**
* A connector for group membership protocol. Supports both parts:
* 1) "joining" (becoming the member, leaving the group, subscribing to change notifications)
* 2) "observing" (fetching group state, subscribing to change notifications)
*
* @tparam K type of the member ID - unique identifier among members of this group
* @tparam V type of the additional data that comes with ID
*/
trait GroupMembershipClient[K, V] {
/**
* Each instance of this class is tightly coupled with exactly one group,
* once set (during initialization) cannot be changed
* @return unique group identifier among all application groups
*/
def group: String
/**
* Become a member of this group. Throw an exception in case of ID conflict
* @param id unique member identifier among members of this group
* @param data supplemental data to be stored along with member ID
*/
def join(id: KString, data: VString): Unit
/**
* Stop membership in this group
* @param id unique member identifier among members of this group
*/
def leave(id: KString): Unit
/**
* Fetch membership of this group
* @return IDs of the members of this group
*/
def membershipList(): Set[KString]
/**
* Fetch detailed membership of this group
* @return IDs and corresponding supplemental data of the members of this group
*/
def membership(): Map[KString, VString]
/**
* A callback fired on event
*/
trait Listener {
/**
* Event fired when the group membership has changed (member(s) joined and/or left)
* @param membership new membership of the group
*/
def onGroupChange(membership: Set[KString])
}
/**
* Register permanent on group change listener.
* There is no guarantee listener will be fired on ALL events (due to session reconnects etc)
* @param listener see [[Listener]]
*/
def addListener(listener: Listener)
/**
* Deregister on group change listener
* @param listener see [[Listener]]
*/
def removeListener(listener: Listener)
/**
* Setup everything needed for concrete implementation
* @param context TBD. Should be abstract enough to be used by different implementations and
* at the same time specific because will be uniformly called from the Kafka code -
* regardless of the implementation
*/
def init(context: Any): Unit
/**
* Release all acquired resources
*/
def close(): Unit
} |
Code Block |
---|
language | scala |
---|
title | Leader Election |
---|
linenumbers | true |
---|
collapse | true |
---|
|
/**
* A connector for leadership election protocol. Supports both parts:
* 1) "running for election" (joining the candidates for leadership, resigning as a leader, subscribing to change notifications)
* 2) "observing" (getting current leader, subscribing to change notifications)
*
* @tparam K type of the candidate ID - unique identifier among candidates participating in leader election *
*/
trait LeaderElectionClient[K]{
/**
* Each instance of this class is tightly coupled with leadership over exactly one service (resource),
* once set (during initialization) cannot be changed
*
* @return unique group identifier among all application services (resources)
*/
def service: String
/**
* Get Fetchcurrent leader of thisthe serviceresource until one is elected(if any)
* @return futurethe resultleader ofid theif leaderit IDexists
*/
def getLeader: FutureOption[KString]
/**
* Make this candidate eligible for leader election and try to obtain leadership for this service if it's vacant
*
* @param candidate ID of the candidate which is eligible for election
* @return future result of the leader election true if given candidate is now a leader
*/
def nominate(candidate: KString): Future[K]Boolean
/**
* Voluntarily resign as a leader and initiate new leader election.
* It's a client responsibility to stop all leader duties before calling this method to avoid more-than-one-leader cases
*
* @param leader current leader ID (will be ignored if not a leader)
* @return future result of the leader election
*/
def resign(leader: KString): Future[K]Unit
/**
* A callback fired on leader change event
*/
trait Listener {
/**
* Event fired when the leader has changed and(resigned theor newacquired one has been electeda leadership)
* @param leader new leader for the given servicefor the given service if one has been elected, otherwise None
*/
def onLeaderChange(leader: KOption[String])
}
/**
* Register permanent on leader change listener
* There is no guarantee listener will be fired on ALL events (due to session reconnects etc)
* @param listener see [[Listener]]
*/
def addListener(listener: Listener)
/**
* Deregister on leader change listener
* @param listener see [[Listener]]
*/
def removeListener(listener: Listener)
/**
* Setup everything needed for concrete implementation
* @param context TBD. Should be abstract enough to be used by different implementations and
* at the same time specific because will be uniformly called from the Kafka code -
* regardless of the implementation
*/
def init(context: Any): Unit
/**
* Release all acquired resources
*/
def close(): Unit
} |
Code Block |
---|
language | scala |
---|
title | Storage |
---|
linenumbers | true |
---|
collapse | true |
---|
|
/**
* Interface to a (persistent) key value storage
* @tparam K type of data key
* @tparam V type of the fetched data
*/
trait Storage[K, V] {
/**
* Get data by its key
* @param key data ID in this storage
* @return future result of the value (if exists) associated with the key
*/
def fetch(key: KString): Future[Option[VString]]
/**
* Persist value with its associated key. The contract is to throw an exception
* if such key already exists
*
* @param key data ID in this storage
* @param data value associated with the key
*/
def put(key: KString, data: VString)
/**
* Update value by its associated key. The contract is to throw an exception
* if such key doesn't exist
*
* @param key data ID in this storage
* @param data value associated with the key
*/
def update(key: KString, data: VString)
/**
* Setup everything needed for concrete implementation
* @param context TBD. Should be abstract enough to be used by different implementations and
* at the same time specific because will be uniformly called from the Kafka code -
* regardless of the implementation
*/
def init(context: Any): Unit
/**
* Release all acquired resources
*/
def close(): Unit
}
|
...