THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Below each module is presented by its interface.
(NOTE: Initial version of the interfaces is in Scala to make it cleaner and shorter. The final version (actual Kafka interfaces) is planned to be written in Java).
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
/** * A connector for group membership protocol. Supports two modes: * 1) "joining" (becoming the member, leaving the group, subscribing to change notifications) * 2) "observing" (fetching group state, subscribing to change notifications) * */ trait GroupMembershipClient { /** * Each instance of this class is tightly coupled with exactly one group, * once set (during initialization) cannot be changed * @return unique group identifier among all application groups */ def group: String /** * Become a member of this group. Throw an exception in case of ID conflict * @param id unique member identifier among members of this group * @param data supplemental data to be stored along with member ID */ def join(id: String, data: String): Unit /** * Stop membership in this group * @param id unique member identifier among members of this group */ def leave(id: String): Unit /** * Fetch membership of this group * @return IDs of the members of this group */ def membershipList(): Set[String] /** * Fetch detailed membership of this group * @return IDs and corresponding supplemental data of the members of this group */ def membership(): Map[String, String] /** * Register Apermanent callbackon firedgroup onchange eventlistener. */ There traitis Listenerno { guarantee listener will be /** fired on ALL events (due *to Eventsession fired when the group membership has changed (member(s) joined and/or left) reconnects etc) * @param membershiplistener new membership of the group see [[GroupChangeListener]] */ def onGroupChangeaddListener(membershiplistener: Set[String]) }GroupChangeListener) /** * Register permanentDeregister on group change listener. * There is no guarantee listener will be fired on ALL events (due to session reconnects etc) * @param listener see [[ListenerGroupChangeListener]] */ def addListenerremoveListener(listener: ListenerGroupChangeListener) /** * Deregister on group change listener * @param listener see [[Listener]] */ def removeListener(listener: Listener) /** * Setup everythingSetup everything needed for concrete implementation * @param context TBD. Should be abstract enough to be used by different implementations and * at the same time specific because will be uniformly called from the Kafka code - * regardless of the implementation */ def init(context: Any): Unit /** * Release all acquired resources */ def close(): Unit } | ||||||||
Code Block | ||||||||
| ||||||||
/** * A connectorcallback forfired leadershipon electiongroup change event */ trait GroupChangeListener { /** * Event fired when the group membership has changed (member(s) joined and/or left) * @param membership new membership of the group */ def onGroupChange(membership: Set[String]) } |
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
/** * A connector for leadership election protocol. Supports two modes: * 1) "running for election" (joining the candidates for leadership, resigning as a leader, subscribing to change notifications) * 2) "observing" (getting current leader, subscribing to change notifications) * */ trait LeaderElectionClient{protocol. Supports two modes: * 1) "running for election" (joining the candidates for leadership, resigning as a leader, subscribing to change notifications) * 2) "observing" (getting current leader, subscribing to change notifications) * */ trait LeaderElectionClient{ /** * Each instance of this class is tightly coupled with leadership over exactly one service (resource), * once set (during initialization) cannot be changed * * @return unique group identifier among all application services (resources) */ def service: String /** * GetEach currentinstance leaderof ofthis theclass resourceis (if any) * @return the leader id if it exists */ def getLeader: Option[String] /*tightly coupled with leadership over exactly one service (resource), * once set (during initialization) cannot be changed * * Make@return thisunique candidategroup eligibleidentifier foramong leaderall electionapplication and try to obtain leadership for this service if it's vacant services (resources) */ def service: String /** * @paramGet candidatecurrent IDleader of the candidateresource which is eligible for election(if any) * @return truethe ifleader givenid candidateif is now a leaderit exists */ def nominate(candidategetLeader: String): BooleanOption[String] /** * Make Voluntarilythis resigncandidate aseligible afor leader election and andtry initiateto newobtain leaderleadership election. for this service *if Itit's avacant client responsibility to* stop all leader* duties@param beforecandidate callingID thisof methodthe tocandidate avoid more-than-one-leader cases *which is eligible for election * @param leader current leader ID (will be ignored if not @return true if given candidate is now a leader) */ def resignnominate(leadercandidate: String): UnitBoolean /** * AVoluntarily callbackresign firedas ona leader changeand event initiate new */ trait Listener { leader election. /** It's a client responsibility *to Eventstop firedall whenleader theduties leaderbefore hascalling changedthis (resignedmethod orto acquired a leadership) avoid more-than-one-leader cases * * @param leader newcurrent leader forID the(will givenbe serviceignored if onenot has been elected, otherwise None a leader) */ def onLeaderChangeresign(leader: Option[String]) : }Unit /** * Register permanent on leader change listener * There is no guarantee listener will be fired on ALL events (due to session reconnects etc) * @param listener see [[LeaderChangeListener]] */ def addListener(listener: LeaderChangeListener) /** * Deregister on leader change listener * @param listener see [[ListenerLeaderChangeListener]] */ def addListenerremoveListener(listener: Listener) /** * Deregister on leader change listener * @param listener see [[Listener]] */ def removeListener(listener: Listener) /** * Setup everything needed for concrete implementation * @param context TBD. Should be abstract enough to be used by different implementations and * at the same time specific because will be uniformly called from the Kafka code - * regardless of the implementation */ def init(context: Any): Unit /** * Release all acquired resources */ def close(): UnitLeaderChangeListener) /** * Setup everything needed for concrete implementation * @param context TBD. Should be abstract enough to be used by different implementations and * at the same time specific because will be uniformly called from the Kafka code - * regardless of the implementation */ def init(context: Any): Unit /** * Release all acquired resources */ def close(): Unit } /** * A callback fired on leader change event */ trait LeaderChangeListener { /** * Event fired when the leader has changed (resigned or acquired a leadership) * @param leader new leader for the given service if one has been elected, otherwise None */ def onLeaderChange(leader: Option[String]) } |
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
/** * Interface to a (persistent) key value storage */ trait Storage { /** * Get data by its key * @param key data ID in this storage * @return future result of the value (if exists) associated with the key */ def fetch(key: String): Future[Option[String]] /** * Persist value with its associated key. The contract is to throw an exception * if such key already exists * * @param key data ID in this storage * @param data value associated with the key */ def put(key: String, data: String) /** * Update value by its associated key. The contract is to throw an exception * if such key doesn't exist * * @param key data ID in this storage * @param data value associated with the key */ def update(key: String, data: String) /** * Setup everything needed for concrete implementation * @param context TBD. Should be abstract enough to be used by different implementations and * at the same time specific because will be uniformly called from the Kafka code - * regardless of the implementation */ def init(context: Any): Unit /** * Release all acquired resources */ def close(): Unit } |
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
/**
* A registry for async data change notifications
*/
trait ListenerRegistry {
/**
* Register permanent callback for data change event
* @param key the listenable data identifier
* @param eventListener see [[ValueChangeListener]]
*/
def addValueChangeListener(key: String, eventListener: ValueChangeListener): Unit
/**
* Deregister permanent callback for data change event
* @param key the listenable data identifier
* @param eventListener see [[EventListener]]
* @tparam T type of the data ID
*/
def removeValueChangeListener(key: String, eventListener: ValueChangeListener): Unit
/**
* Register permanent callback for key-set change event
* @param namespace the listenable key-set identifier (e.g. parent path in Zookeeper, table name in Database etc)
* @param eventListener see [[ValueChangeListener]]
*/
def addKeySetChangeListener(namespace: String, eventListener: KeySetChangeListener): Unit
/**
* Deregister permanent callback for key-set change event
* @param namespace the listenable key-set identifier (e.g. parent path in Zookeeper, table name in Database etc)
* @param eventListener see [[ValueChangeListener]]
*/
def removeKeySetChangeListener(namespace: String, eventListener: KeySetChangeListener): Unit
/**
* Setup everything needed for concrete implementation
* @param context TBD. Should be abstract enough to be used by different implementations and
* at the same time specific because will be uniformly called from the Kafka code,
* regardless of the implementation
*/
def init(context: Any): Unit
/**
* Release all acquired resources
*/
def close(): Unit
}
/**
* Callback on value change event
*/
trait ValueChangeListener {
def valueChanged(newValue: Option[String])
}
/**
* Callback on key-set change event
*/
trait KeySetChangeListener {
def keySetChanged(newKeySet: Set[String])
} |
...