A Short Guide to Reviewing ZOOKEEPER-368:
Below are some notes to help the reviewer make sense of ZOOKEEPER-368, the Observers patch. Only selected diffs are included - other changes are hopefully reasonably self-explanatory.
Configuration
QuorumPeerConfig.java
Configuration changes are mainly in QuorumPeerConfig.java:
+ protected final HashMap<Long,QuorumServer> observers = + new HashMap<Long, QuorumServer>();
We must keep track of which servers are Followers and which are Observers so that quorums can be constructed only from voting members. This HashMap mimics the servers HashMap, and is combined with servers after quorum construction.
+ protected LearnerType peerType = LearnerType.PARTICIPANT;
Keep track of the type of the QuorumServer (peerType can be renamed to learnerType).
+ } else if (key.equals("peerType")) { + if (value.toLowerCase().equals("observer")) { + peerType = LearnerType.OBSERVER; + } else if (value.toLowerCase().equals("participant")) { + peerType = LearnerType.PARTICIPANT; + } else + { + throw new ConfigException("Unrecognised peertype: " + value); + }
peerType=observer is the way to configure a node as an Observer. This code pulls this in.
+ if ((parts.length != 2) && (parts.length != 3) && (parts.length !=4)) { LOG.error(value - + " does not have the form host:port or host:port:port"); + + " does not have the form host:port or host:port:port " + + " or host:port:port:type");
We must also add a new form for the server specification to identify Observers so that they can be excluded from quorum construction.
+ } else if (parts.length == 4) { + InetSocketAddress electionAddr = new InetSocketAddress( + parts[0], Integer.parseInt(parts[2])); + LearnerType type = LearnerType.PARTICIPANT; + if (parts[3].toLowerCase().equals("observer")) { + type = LearnerType.OBSERVER; + observers.put(Long.valueOf(sid), new QuorumServer(sid, addr, + electionAddr,type)); + } else if (parts[3].toLowerCase().equals("participant")) { + type = LearnerType.PARTICIPANT; + servers.put(Long.valueOf(sid), new QuorumServer(sid, addr, + electionAddr,type)); + } else { + throw new ConfigException("Unrecognised peertype: " + value); + }
Note that if :observer is specified, the QuorumServer goes into the observers HashMap, otherwise it goes into servers.
+ if (observers.size() > 0 && electionAlg != 0) { + throw new IllegalArgumentException("Observers must currently be used with simple leader election" + + " (set electionAlg=0)"); + }
We can't use Observers with election algorithms other than 0 at the moment. This will change fairly shortly.
Operation
QuorumPeer.java
+ public enum LearnerType { + PARTICIPANT, OBSERVER; + } + private LearnerType peerType = LearnerType.PARTICIPANT; + + public LearnerType getPeerType() { + return peerType; + } + + public void setPeerType(LearnerType p) { + peerType = p; + }
Although ServerState captures the dynamic state of a node, it is hard to know whether a node that is LOOKING should move to OBSERVING or FOLLOWING once it has found the Leader. This type allows us to record that state, so that the state transition is obvious.
+ protected Map<Long, QuorumServer> quorumPeers;
quorumPeers should be protected.
+ + protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException { + return new Observer(this, new ObserverZooKeeperServer(logFactory, + this, new ZooKeeperServer.BasicDataTreeBuilder())); + }
Following the same pattern as Follower and Leader, makeObserver constructs a new Observer.
+ case OBSERVING: + // Do nothing, Observers keep themselves to + // themselves. + break;
Explicitly make sure that Observers do not respond to leader inquiries - this is the simplest way to ensure that their votes aren't counted (the alternative is to have the client know if they are an Observer. In fact, the client does not query Observers, but it's good to be defensive here.)
+ new QuorumMaj(countParticipants(quorumPeers)));
Before this was quorumPeers.size(). However, we now must distinguish Observers and Followers.
+ if (getPeerType() == LearnerType.PARTICIPANT) { + startLeaderElection(); + }
Observers should not instigate the LE process. Instead, they wait for Leaders to be elected and then attempt to get a majority of votes.
+ for (QuorumServer p : getView().values()) {
accesses to quorumPeers should be replaced by getView or getVotingView calls now.
+ /** + * Count the number of nodes in the map that could be followers. + * @param peers + * @return The number of followers in the map + */ + protected static int countParticipants(Map<Long,QuorumServer> peers) { + int count = 0; + for (QuorumServer q : peers.values()) { + if (q.type == LearnerType.PARTICIPANT) { + count++; + } + } + return count; + }
Not synchronized on peers; probably should be once the dynamic ensembles patch rolls around, but for the time being all updates to peers are at start-up time.
+ case OBSERVING: + try { + LOG.info("OBSERVING"); + setObserver(makeObserver(logFactory)); + observer.observeLeader(); + } catch (Exception e) { + LOG.warn("Unexpected exception",e ); + } finally { + observer.shutdown(); + setObserver(null); + setPeerState(ServerState.LOOKING); + } + break;
This is the Observers version of the main driver loop; much the same as for followers.
+ * A 'view' is a node's current opinion of the membership of the entire + * ensemble. */ public Map<Long,QuorumPeer.QuorumServer> getView() { - return this.quorumPeers; + return Collections.unmodifiableMap(this.quorumPeers); + } + + /** + * Observers are not contained in this view, only nodes with + * PeerType=PARTICIPANT. + */ + public Map<Long,QuorumPeer.QuorumServer> getVotingView() { + Map<Long,QuorumPeer.QuorumServer> ret = + new HashMap<Long, QuorumPeer.QuorumServer>(); + Map<Long,QuorumPeer.QuorumServer> view = getView(); + for (QuorumServer server : view.values()) { + if (server.type == LearnerType.PARTICIPANT) { + ret.put(server.id, server); + } + } + return ret; + } + + /** + * Returns only observers, no followers. + */ + public Map<Long,QuorumPeer.QuorumServer> getObservingView() { + Map<Long,QuorumPeer.QuorumServer> ret = + new HashMap<Long, QuorumPeer.QuorumServer>(); + Map<Long,QuorumPeer.QuorumServer> view = getView(); + for (QuorumServer server : view.values()) { + if (server.type == LearnerType.OBSERVER) { + ret.put(server.id, server); + } + } + return ret;
Three functions to get various varieties of 'view' of the current cluster. Note the unmodifiable map constructor - getView is read-only.
+ /** + * Only used by QuorumStats at the moment + */
This is a strangely named function (getQuorumPeers) that returns a String representation of the total view; this comment added to show that it is not really part of the core QuorumPeer API.
Leader.java
+ protected HashSet<LearnerHandler> observingLearners = new HashSet<LearnerHandler>();
Although LearnerHandlers can currently speak to both Followers and Observers, this may not always be the case. It is also necessary to keep Observers separate so that they only get INFORMed, not PROPOSAL messages.
+ * This tells the leader that the connecting peer is actually an observer + */ + final static int OBSERVERINFO = 16; +
This is how the Leader distinguishes Observers from Followers.
+ /** + * This message type informs observers of a committed proposal. + */ + final static int INFORM = 8;
This new message type is the key to the Observer protocol. INFORM messages are much the same as PROPOSAL messages; but the semantic is that the proposal attached has already been agreed. INFORMs are sent after a quorum of ACKs are received for a given PROPOSAL. At this point the PROPOSAL is guaranteed eventually to commit to a quorum of peers so it is safe for the Observer to receive this message.
+ inform(p); + /** + * Create an inform packet and send it to all observers. + * @param zxid + * @param proposal + */ + public void inform(Proposal proposal) { + QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid, + proposal.packet.getData(), null); + sendObserverPacket(qp); + }
Constructs an INFORM packet and sends it to all Observers reliably.
+ /** + * send a packet to all observers + */ + void sendObserverPacket(QuorumPacket qp) { + synchronized(observingLearners) { + for (LearnerHandler f : observingLearners) { + f.queuePacket(qp); + } + } + } +
This is where the separate data structure for Observer LearnerHandlers pays off. This code might be getting called very frequently, so doing the test-and-branch here to find all Observers might take too long.
LeaderElection.java
+ /** + * We want to make sure we implement the state machine + * correctly. If we are a PARTICIPANT, once a leader + * is elected we can move either to LEADING or + * FOLLOWING. However if we are an OBSERVER, it is an + * error to be elected as a Leader. + */ + if (self.getPeerType() == LearnerType.OBSERVER) { + if (current.id == self.getId()) { + // This should never happen! + LOG.error("OBSERVER elected as leader!"); + Thread.sleep(100); + } + else { + self.setPeerState(ServerState.OBSERVING); + Thread.sleep(100); + return current; + } + } else { + self.setPeerState((current.id == self.getId()) + ? ServerState.LEADING: ServerState.FOLLOWING); + if (self.getPeerState() == ServerState.FOLLOWING) { + Thread.sleep(100); + } + return current;
As the comment says, we want to move from LOOKING to OBSERVING correctly. If, somehow, an Observer is elected as a Leader, it will sleep (in the hope that this encourages the ensemble to vote someone else in).
AuthFastLeaderElection.java / FastLeaderElection.java
Only changes to use the getView APIs
QuorumCnxManager.java
Only changes to use the getView APIs
LearnerHandler.java
+ private LearnerType learnerType = LearnerType.PARTICIPANT; + public LearnerType getLearnerType() { + return learnerType; + }
Used by Leader to distinguish the type of Learner being handled.
+ if (this.learnerType == LearnerType.OBSERVER) { + LOG.error("Received ACK from Observer " + this.sid); + }
Defensively error when we receive an ACK from an Observer. This could possibly be removed, especially if it hurts throughput. Also, Observers will send an ACK at the Leader-sync stage of the join protocol, which causes this error message to be written. This should probably be removed in the future.
ObserverRequestProcessor.java (new file)
This simple request processor deals with forwarding requests to an Observer object. In the future, this might be customized to alter behaviour to do things like batching.
Observer.java (new file)
Observer extends Learner.
+ void observeLeader() throws InterruptedException { + zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean); + + try { + InetSocketAddress addr = findLeader(); + LOG.info("Observing " + addr); + try { + connectToLeader(addr); + long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO); + + syncWithLeader(newLeaderZxid); + QuorumPacket qp = new QuorumPacket(); + while (self.running) { + readPacket(qp); + processPacket(qp); + } + } catch (IOException e) { + LOG.warn("Exception when observing the leader", e); + try { + sock.close(); + } catch (IOException e1) { + e1.printStackTrace(); + } + + synchronized (pendingRevalidations) { + // clear pending revalidations + pendingRevalidations.clear(); + pendingRevalidations.notifyAll(); + } + } + } finally { + zk.unregisterJMX(this); + } + }
Simple driver loop for Observers - read a packet and then process it. Deal with pending session revalidations in the same way that Followers do.
+ protected void processPacket(QuorumPacket qp) throws IOException{ + switch (qp.getType()) { + case Leader.PING: + ping(qp); + break; + case Leader.PROPOSAL: + LOG.warn("Ignoring proposal"); + break; + case Leader.COMMIT: + LOG.warn("Ignoring commit"); + break; + case Leader.UPTODATE: + zk.takeSnapshot(); + self.cnxnFactory.setZooKeeperServer(zk); + break; + case Leader.REVALIDATE: + revalidate(qp); + break; + case Leader.SYNC: + ((ObserverZooKeeperServer)zk).sync(); + break; + case Leader.INFORM: + TxnHeader hdr = new TxnHeader(); + BinaryInputArchive ia = BinaryInputArchive + .getArchive(new ByteArrayInputStream(qp.getData())); + Record txn = SerializeUtils.deserializeTxn(ia, hdr); + Request request = new Request (null, hdr.getClientId(), + hdr.getCxid(), + hdr.getType(), null, null); + request.txn = txn; + request.hdr = hdr; + ObserverZooKeeperServer obs = (ObserverZooKeeperServer)zk; + obs.commitRequest(request); + break; + } + }
This code describes how the Observer reacts to certain packet types. PROPOSAL and COMMIT are ignored, and should never be received (should this then be a LOG.error?). The most interesting case is INFORM, which unpacks the committed proposal and tells the ObserverZooKeeperServer to commit it. (Note the downcast - this could be optimized away).
ObserverZooKeeperServer.java (new file)
ObserverZooKeeperServer extends LearnerZooKeeperServer.
A lot of code is shared between this class and FollowerZooKeeperServer, but the class hierarchy does not provide a good place to share the code.
+ private CommitProcessor commitProcessor; + private SyncRequestProcessor syncProcessor;
The RequestProcessor pipeline is simpler than for a Follower - CommitProcessor handles commits as a result of INFORM messages, and syncProcessor handles synchronisation requests from the client.
Testing
ObserverTest.java
This contains tests to make sure that Observers are behaving correctly.
testObserver ensures that commands get forwarded correctly through an Observer to an ensemble, and that a quorum is lost even though a voting Observer would make the ensemble quorate (i.e., with 2 Followers and one Observer, and 1 Follower fails).
testSingleObserver makes sure that a singleton Observer in an ensemble can't come up on its own.
testLeaderElectionFail ensures that a cluster with an electionType != 0 won't come up if it contains Observers.
QuorumBase.java
Various changes here to add options to start a cluster with two Observers.
AsyncHammerTest.java
+ @Test + public void testObserversHammer() throws Exception { + qb.tearDown(); + qb.setUp(true); + bang = true; + Thread[] hammers = new Thread[100]; + for (int i = 0; i < hammers.length; i++) { + hammers[i] = new HammerThread("HammerThread-" + i); + hammers[i].start(); + } + Thread.sleep(5000); // allow the clients to run for max 5sec + bang = false; + for (int i = 0; i < hammers.length; i++) { + hammers[i].interrupt(); + verifyThreadTerminated(hammers[i], 60000); + } + // before restart + qb.verifyRootOfAllServersMatch(qb.hostPort); + }
Mimics the AsyncHammerTest with Observers.
Other tests: should probably have sync hammer equivalent?