Zookeeper系列——3Zookeeper源码分析之Session管理及请求处理

导读:本篇文章讲解 Zookeeper系列——3Zookeeper源码分析之Session管理及请求处理,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

学习目标

  1. 理清Zookeeper的Session创建、刷新和过期流程分析
  2. 明确Zookeeper的核心业务调用链

第1章 Session创建

上文给大家讲过Zookeeper的应用了,实际上Zookeeper分为两个模块,server端和client端,server端实现了所有的zookeeper业务逻辑,而client端就是封装了server端的一些方法调用。既然存在两个模块,那肯定涉及到了网络通信,ZooKeeper中使用ServerCnxnFactory管理与客户端的连接,其有两个实现,一个是NIOServerCnxnFactory,使用Java原生NIO实现;一个是NettyServerCnxnFactory,使用netty实现;使用ServerCnxn代表一个客户端与服务端的连接。从单机版启动中可以发现Zookeeper默认通信组件为NIOServerCnxnFactory。

1.1 客户端发送请求

上文中已经讲过,建立连接是通过new ZooKeeper方法完成的,在ZooKeeper的构造方法中会创建一个ClientCnxn对象,并调用该对象的start方法,在该方法中会启动两个线程任务:sendThread和eventThread。

而sendThread线程就是我们去建立连接的核心线程,在该线程的run方法中实际上是通过一个while循环,不断的执行,如果是第一次进来会去创建连接,如果连接状态是CONNECTED的话,则会最大不超过10秒去发送一次Ping请求保证连接不断开。

源码比较长,有些不重要的代码就直接省略了。

public void run() {
    //发送Ping的间隔
    final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
    while (state.isAlive()) {
        try {
            //如果状态是CONNECTING的话就去创建连接
            if (!clientCnxnSocket.isConnected()) {
                startConnect(serverAddress);
            }

			//如果已经连接成功,则最大不超过10秒发送一次心跳
            if (state.isConnected()) {
                //这段逻辑实际上就是控制心跳的是发送间隔,避免过多的发送
                int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - 
                    ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
                if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
                    sendPing();
                    clientCnxnSocket.updateLastSend();
                } else {
                    if (timeToNextPing < to) {
                        to = timeToNextPing;
                    }
                }
            }
        } 
}
void connect(InetSocketAddress addr) throws IOException {
    SocketChannel sock = createSock();
    try {
        //会调用ZK的服务端完成会话创建
        registerAndConnect(sock, addr);
    } catch (IOException e) {

    }
}

void registerAndConnect(SocketChannel sock, InetSocketAddress addr) 
    throws IOException {
    sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
    //调用NIO开启会话
    boolean immediateConnect = sock.connect(addr);

}

1.2 服务端接收连接

服务端由NIOServerCnxnFactory启动线程去接收请求,NIOServerCnxnFactory启动时会启动四类线程:

  • AcceptThread:该线程接收来自客户端的连接,并将其分配给SelectorThread(启动一个线程)。
  • SelectorThread:该线程执行select(),由于在处理大量连接时,select()会成为性能瓶颈,因此启动多个SelectorThread,使用系统属性zookeeper.nio.numSelectorThreads配置该类线程数,默认个数为 核心数/2。
  • WorkerThread:该线程执行基本的套接字读写,使用系统属性zookeeper.nio.numWorkerThreads配置该类线程数,默认为核心数∗2核心数∗2.如果该类线程数为0,则另外启动一线程进行IO处理,见下文worker thread介绍。
  • ConnectionExpirationThread:若连接上的session已过期,则关闭该连接。

1.2.1 AcceptThread

该线程会接收客户端的请求

public void run() {
    while (!stopped && !acceptSocket.socket().isClosed()) {
        select();
    }
}
private void select() {
    try {
        //查找就绪的连接
        selector.select();

        Iterator<SelectionKey> selectedKeys =
            selector.selectedKeys().iterator();
        while (!stopped && selectedKeys.hasNext()) {

            if (key.isAcceptable()) {
                //1:和当前服务建立链接。
                //2:获取远程客户端计算机地址信息。
                //3:判断当前链接是否超出最大限制。
                //4:调整为非阻塞模式。
                //5:轮询获取一个SelectorThread,将当前链接分配给该SelectorThread。
                //6:将当前请求添加到该SelectorThread的acceptedQueue中,并唤醒该SelectorThread。
                if (!doAccept()) {
                    pauseAccept(10);
                }
            } 
        }
    } 
}

进入到doAccept方法中

private boolean doAccept() {
			...
            try {
                //建立连接
                sc = acceptSocket.accept();
                accepted = true;
                //获取远程计算机地址信息
                InetAddress ia = sc.socket().getInetAddress();
                int cnxncount = getClientCnxnCount(ia);

                //判断是否超出最大客户端连接的限制
                if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
                    ...
                }

                LOG.debug("Accepted socket connection from "
                         + sc.socket().getRemoteSocketAddress());
                //调整此通道的阻塞模式
                sc.configureBlocking(false);

                //轮询将此连接分配给一个SelectorThread
                if (!selectorIterator.hasNext()) {
                    selectorIterator = selectorThreads.iterator();
                }
                SelectorThread selectorThread = selectorIterator.next();
                //将新连接加入SelectorThread的acceptedQueue中,并唤醒SelectorThread
                if (!selectorThread.addAcceptedConnection(sc)) {
                    ...
                }
                acceptErrorLogger.flush();
            } catch (IOException e) {
				...
            }
            return accepted;
        }
    }
public boolean addAcceptedConnection(SocketChannel accepted) {
    //将accepted添加到acceptedQueue
    if (stopped || !acceptedQueue.offer(accepted)) {
        return false;
    }
    //唤醒SelectorThread
    wakeupSelector();
    return true;
}

在addAcceptedConnection方法中会唤醒SelectorThread,所以,接下来,逻辑会进入到SelectorThread.run方法中

1.2.2 SelectorThread

该线程的主要作用是从Socket读取数据,并封装成workRequest,并将workRequest交给workerPool工作线程池处理,同时将acceptedQueue中未处理的连接取出,并未每个连接绑定OP_READ读事件,并封装对应的上下文对象NIOServerCnxn。SelectorThread的run方法如下:

public void run() {

    //读取就绪的IO事件,交由worker thread处理,在ZookeeperServer的processPacket()中处理数据
    select();
    //把acceptedQueue队列中接收的连接,取出来注册OP_READ事件,
    //并添加NIOServerCnxn对象与当前key绑定
    //相当于给每个连接添加附加对象NIOServerCnxn(上下文对象)
    processAcceptedConnections();
    //遍历所有updateQueue,更新updateQueue中连接的监听事件
    processInterestOpsUpdateRequests();
}

先来看看processAcceptedConnections方法,该方法中会为每个连接创建一个NIOServerCnxn对象,同时也会调用服务续约的逻辑

private void processAcceptedConnections() {
    SocketChannel accepted;
    while (!stopped && (accepted = acceptedQueue.poll()) != null) {
        SelectionKey key = null;
        key = accepted.register(selector, SelectionKey.OP_READ);
        // 针对每个连接,创建一个NIOServerCnxn
        NIOServerCnxn cnxn = createConnection(accepted, key, this);
        key.attach(cnxn);
        addCnxn(cnxn);
    }
}

这块不是很重要,我们不往深挖,接着回去看select方法

private void select() {
    selector.select();
    Set<SelectionKey> selected = selector.selectedKeys();
    ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
    Collections.shuffle(selectedList);
    Iterator<SelectionKey> selectedKeys = selectedList.iterator();
    while (!stopped && selectedKeys.hasNext()) {
        SelectionKey key = selectedKeys.next();
        selected.remove(key);
        if (key.isReadable() || key.isWritable()) {
            //核心逻辑
            handleIO(key);
        }
    }
}

handleIO()方法会封装当前SelectorThread为IOWorkRequest,并将IOWorkRequest交给workerPool来调度,而workerPool调度才是读数据的开始,源码如下:

private void handleIO(SelectionKey key) {
    //将SelectorThread封装成workRequest对象
    IOWorkRequest workRequest = new IOWorkRequest(this, key);
 	//处理服务续约的方法
    touchCnxn(cnxn);
    //将封装好的workRequest交给线程池去处理,在这里读取客户端数据
    workerPool.schedule(workRequest);
}

我们先来看看处理续约的方法,不只是在这里调用了NIOServerCnxnFactory.touchCnxn(NIOServerCnxn)方法。

public void touchCnxn(NIOServerCnxn cnxn) {
    cnxnExpiryQueue.update(cnxn, cnxn.getSessionTimeout());
}

进入到update方法中,会发现是ExpiryQueue中的一个方法,从名字上能看出来,ExpiryQueue实际上就是服务端管理session过期的队列

// 维护每个NIOServerCnxn对应的过期时间
private final ConcurrentHashMap<E, Long> elemMap = new ConcurrentHashMap<E, Long>();
// 维护每个过期时间对应的桶里有哪些NIOServerCnxn
private final ConcurrentHashMap<Long, Set<E>> expiryMap = new ConcurrentHashMap<Long, Set<E>>();
private final AtomicLong nextExpirationTime = new AtomicLong();

public Long update(E elem, int timeout) {
    Long prevExpiryTime = elemMap.get(elem);//获取当前NIOServerCnxn对应的过期时间
    long now = Time.currentElapsedTime();
    Long newExpiryTime = roundToNextInterval(now + timeout);//获取下次过期时间
    if (newExpiryTime.equals(prevExpiryTime)) {
        return null; // No change, so nothing to update
    }
    // First add the elem to the new expiry time bucket in expiryMap.
    Set<E> set = expiryMap.get(newExpiryTime); //拿到下一个过期时间的桶
    if (set == null) {
        // Construct a ConcurrentHashSet using a ConcurrentHashMap
        set = Collections.newSetFromMap(new ConcurrentHashMap<E, Boolean>());
        // Put the new set in the map, but only if another thread hasn't beaten us to it
        Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
        if (existingSet != null) {
            set = existingSet;
        }
    }
    set.add(elem); //把原来的NIOServerCnxn移动到新的桶里

    // Map the elem to the new expiry time. If a different previous
    // mapping was present, clean up the previous expiry bucket.
    prevExpiryTime = elemMap.put(elem, newExpiryTime);
    if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
        Set<E> prevSet = expiryMap.get(prevExpiryTime);
        if (prevSet != null) {
            prevSet.remove(elem); //清空之前过期的桶
        }
    }
    return newExpiryTime;
}

ok,简单了解了过期时间的更新,我们在回到之前讲的通过工作线程池去处理workRequest对象读取客户端数据的流程

1.2.3 WorkerThread

WorkerThread相比上面的线程而言,调用关系颇为复杂,设计到了多个对象方法调用,主要用于处理IO,但并未对数据做出处理,数据处理将有业务链对象RequestProcessor处理,调用关系图如下:

Zookeeper系列——3Zookeeper源码分析之Session管理及请求处理

public void schedule(WorkRequest workRequest) {
    schedule(workRequest, 0);
}
public void schedule(WorkRequest workRequest, long id) {
    ScheduledWorkRequest scheduledWorkRequest = new ScheduledWorkRequest(workRequest);
    int size = workers.size();
    int workerNum = ((int) (id % size) + size) % size;
    ExecutorService worker = workers.get(workerNum);
    worker.execute(scheduledWorkRequest);
}
WorkerService.ScheduledWorkRequest

private class ScheduledWorkRequest implements Runnable {
    @Override
    public void run() {
        //IOWorkRequest.doWork
        workRequest.doWork();
    }
}
private class IOWorkRequest extends WorkerService.WorkRequest {
    public void doWork() throws InterruptedException {
        if (key.isReadable() || key.isWritable()) {
            //执行IO数据处理
            cnxn.doIO(key);
            //再次见到这个方法,做服务续约的
            touchCnxn(cnxn);
        }
    }
}

后面的一些细节我们就不展开了,通过doIO方法最终会调用到readPayload。

private void readPayload() throws IOException, InterruptedException {
    if (incomingBuffer.remaining() != 0) { // have we read length bytes?
        int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
        if (rc < 0) {
            throw new EndOfStreamException(
                "Unable to read additional data from client sessionid 0x"
                + Long.toHexString(sessionId)
                + ", likely client has closed socket");
        }
    }

    if (incomingBuffer.remaining() == 0) { // have we read length bytes?
        packetReceived();
        incomingBuffer.flip();
        //第一次未初始化时,读取连接请求
        if (!initialized) {
            readConnectRequest();
        } else {
            readRequest();
        }
        lenBuffer.clear();
        incomingBuffer = lenBuffer;
    }
}

此时如果initialized=false,表示第一次连接 需要创建Session(createSession),此处调用readConnectRequest()后,在readConnectRequest()方法中会将initialized设置为true,只有在处理完连接请求之后才会把initialized设置为true,才可以处理客户端其他命令。

private void readConnectRequest() throws IOException, InterruptedException {
    if (!isZKServerRunning()) {
        throw new IOException("ZooKeeperServer not running");
    }
    zkServer.processConnectRequest(this, incomingBuffer);
    //下次进来就不会再来创建了
    initialized = true;
}

上面方法还调用了processConnectRequest处理连接请求, processConnectRequest 第一次从请求中获取的sessionId=0,此时会把创建Session作为一个业务,会调用createSession()方法,processConnectRequest 方法部分关键代码如下:

public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) {
    BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
    ConnectRequest connReq = new ConnectRequest();//创建连接请求
    connReq.deserialize(bia, "connect"); //反序列化连接请求参数
    long sessionId = connReq.getSessionId(); //创建一个sessionId
    int sessionTimeout = connReq.getTimeOut();
    byte[] passwd = connReq.getPasswd();
    cnxn.setSessionTimeout(sessionTimeout);
    if (sessionId == 0) {
        long id = createSession(cnxn, passwd, sessionTimeout); //创建session
    }
}

创建会话调用createSession(),该方法会首先创建一个sessionId,并把该sessionId作为会话ID创建一个创建session会话的请求,并将该请求交给业务链作为一个业务处理,createSession()源码如下:

long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
    if (passwd == null) {
        // Possible since it's just deserialized from a packet on the wire.
        passwd = new byte[0];
    }
    //sessionTracker去创建一个sessionId
    long sessionId = sessionTracker.createSession(timeout);
    Random r = new Random(sessionId ^ superSecret);
    r.nextBytes(passwd);
    ByteBuffer to = ByteBuffer.allocate(4);
    to.putInt(timeout);
    cnxn.setSessionId(sessionId);
    //创建一个OpCode.createSession请求(根据SessionId提交一个创建会话的业务)
    Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
    setLocalSessionFlag(si);
    //提交业务
    submitRequest(si);
    return sessionId;
}

上面方法用到的sessionTracker.createSession(timeout)做了2个操作分别是创建sessionId和配置sessionId的跟踪信息,方法源码如下:

public long createSession(int sessionTimeout) {
    //获取下一个SessionId
    long sessionId = nextSessionId.getAndIncrement();
    //Session跟踪配置
    addSession(sessionId, sessionTimeout);
    return sessionId;
}

会话信息的跟踪其实就是将会话信息添加到队列中,任何地方可以根据会话ID找到会话信息,addSession方法实现了Session创建、Session队列存储、Session过期队列存储,trackSession方法源码如下:

public synchronized boolean addSession(long id, int sessionTimeout) {
    sessionsWithTimeout.put(id, sessionTimeout);

    boolean added = false;
    //获取一个Session,如果为空,则以SessionId创建一个Session
    SessionImpl session = sessionsById.get(id);
    if (session == null){
        session = new SessionImpl(id, sessionTimeout);
    }

    // findbugs2.0.3 complains about get after put.
    // long term strategy would be use computeIfAbsent after JDK 1.8
    //Session存入到sessionById中,可以根据ID获取到Session
    SessionImpl existedSession = sessionsById.putIfAbsent(id, session);

    if (existedSession != null) {
        session = existedSession;
    } else {
        added = true;
        LOG.debug("Adding session 0x" + Long.toHexString(id));
    }

    if (LOG.isTraceEnabled()) {
        String actionStr = added ? "Adding" : "Existing";
        ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                                 "SessionTrackerImpl --- " + actionStr + " session 0x"
                                 + Long.toHexString(id) + " " + sessionTimeout);
    }

    //将Session添加到失效队列中
    updateSessionExpiry(session, sessionTimeout);
    return added;
}

第2章 Session刷新

也可以叫服务续约,客户端除了PING请求以外,其他正常的CRUD请求也会对session续约,这里以PING请求为例

ClientCnxn.SendThread

public void run() {
    clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
    clientCnxnSocket.updateNow();
    clientCnxnSocket.updateLastSendAndHeard();
    while (state.isAlive()) {
        //如果连接建立,每隔段时间发送PING请求
        if (state.isConnected()) {
            //1000(1 second) is to prevent race condition missing to send the second ping
            //also make sure not to send too many pings when readTimeout is small
            int timeToNextPing = readTimeout / 2
                                 - clientCnxnSocket.getIdleSend()
                                 - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
            //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
            if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
                //发送PING请求
                sendPing();
                clientCnxnSocket.updateLastSend();
            } else {
                if (timeToNextPing < to) {
                    to = timeToNextPing;
                }
            }
        }
    }
}

发送PING请求给服务端

private void sendPing() {
    lastPingSentNs = System.nanoTime();
    RequestHeader h = new RequestHeader(ClientCnxn.PING_XID, OpCode.ping);
    queuePacket(h, null, null, null, null, null, null, null, null);
}
public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) {
    ...
    //这段逻辑实际上就是唤醒一个sendThread线程,其实再去调用一下sendThread.run方法,在这个方法里面会重新发请求到服务端
    sendThread.getClientCnxnSocket().packetAdded();
    return packet;
}
//重复执行sendThread.run
public void run() {
    clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
    //发送请求
    clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
}
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
                 ClientCnxn cnxn)
    throws IOException, InterruptedException {
   
    for (SelectionKey k : selected) {
        SocketChannel sc = ((SocketChannel) k.channel());
        if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
            if (sc.finishConnect()) {
                updateLastSendAndHeard();
                //发送PING
                sendThread.primeConnection();
            }
        } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
            doIO(pendingQueue, outgoingQueue, cnxn);
        }
    }
    if (sendThread.getZkState().isConnected()) {
        synchronized(outgoingQueue) {
            if (findSendablePacket(outgoingQueue,
                                   cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                enableWrite();
            }
        }
    }
    selected.clear();
}

服务端会收到客户端的PING请求,同样也是AcceptedThread接收请求,然后执行的逻辑同Session的创建流程,最后进入到了SelectorThread.run——>select()——>handleIO(key)——>touchCnxn(cnxn)。

第3章 Session过期

通过Session创建的源码分析其实大家应该也能看出来,对于Session的过期属性的管理的是SessionTrackerImpl这个类,而它也是一个线程类,继承了 ZooKeeperCriticalThread ,我们可以看它的run方法,它首先获取了下一个会话过期时间,并休眠等待会话过期时间到期,然后获取过期的客户端会话集合并循环关闭。

public void run() {
    try {
        while (running) {
            //获取下一个失效时间
            long waitTime = sessionExpiryQueue.getWaitTime();
            if (waitTime > 0) {
                //休眠
                Thread.sleep(waitTime);
                continue;
            }
            //获取失效的客户端会话集合
            for (SessionImpl s : sessionExpiryQueue.poll()) {
                //把Session会话的 isClosing 状态设置为了true
                setSessionClosing(s.sessionId);
                //让客户端会话失效
                expirer.expire(s);
            }
        }
    } catch (InterruptedException e) {
        handleException(this.getName(), e);
    }
    LOG.info("SessionTrackerImpl exited loop!");
}

让客户端失效的方法 expirer.expire(s); 其实也是一个业务操作,主要调用了ZooKeeperServer.expire() 方法,而该方法获取SessionId后,又创建了一个OpCode.closeSession 的请求,并交给业务链处理,我们查看 ZooKeeperServer.expire() 方法源码如下:

public void expire(Session session) {
    long sessionId = session.getSessionId();
    LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
             + ", timeout of " + session.getTimeout() + "ms exceeded");
    close(sessionId);
}
private void close(long sessionId) {
    //创建一个OpCode.closeSession业务请求
    Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null);
    setLocalSessionFlag(si);
    //提交给业务链处理
    submitRequest(si);
}

道理发现又出现了submitRequest方法,这里暂时先不讲,在下文的请求处理中再详细介绍,在这里只需要知道,我们会调用该方法将我们的session关闭就好了。

第4章 请求处理

zookeeper 的业务处理流程就像工作流一样,其实就是一个单链表;在zookeeper启动的时候,会确立各个节点的角色特性,即leader、follower和observer,每个角色确立后,就会初始化它的工作责任链;

4.1 RequestProcessor结构

客户端请求过来,每次执行不同事务操作的时候,Zookeeper也提供了一套业务处理流程RequestProcessor。

我们来看一下RequestProcessor初始化流程,ZooKeeperServer.setupRequestProcessors()方法源码如下:

/**
 * 初始化业务处理流程
 */
protected void setupRequestProcessors() {
    //创建FinalRequestProcessor
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    //创建SyncRequestProcessor,并将FinalProcessor作为它下一个业务链
    RequestProcessor syncProcessor = new SyncRequestProcessor(this,
                                                              finalProcessor);
    //启动syncProcessor
    ((SyncRequestProcessor)syncProcessor).start();
    //创建PrepRequestProcessor,并作为第一个处理业务的RequestProcessor,将syncProcessor作为它的下一个业务链
    firstProcessor = new PrepRequestProcessor(this, syncProcessor);
    //启动firstProcessor
    ((PrepRequestProcessor)firstProcessor).start();
}

syncProcessor创建时,将finalProcessor作为参数传递进来源码如下:

/**
 * 创建SyncRequestProcessor,下一个责任链 FinalRequestProcessor
 * @param zks
 * @param nextProcessor
 */
public SyncRequestProcessor(ZooKeeperServer zks,
                            RequestProcessor nextProcessor) {
    super("SyncThread:" + zks.getServerId(), zks
          .getZooKeeperServerListener());
    this.zks = zks;
    //下一个责任链
    this.nextProcessor = nextProcessor;
    running = true;
}

firstProcessor创建时,将syncProcessor作为参数传递进来源码如下:

public PrepRequestProcessor(ZooKeeperServer zks,
                            RequestProcessor nextProcessor) {
    super("ProcessThread(sid:" + zks.getServerId() + " cport:"
          + zks.getClientPort() + "):", zks.getZooKeeperServerListener());
    this.nextProcessor = nextProcessor;
    this.zks = zks;
}

PrepRequestProcessor/SyncRequestProcessor关系图:

Zookeeper系列——3Zookeeper源码分析之Session管理及请求处理

PrepRequestProcessor和SyncRequestProcessor的结构一样,都是实现了Thread的一个线程,所以在这里初始化时便启动了这两个线程。

4.2 PrepRequestProcessor

PrepRequestProcessor是请求处理器的第1个处理器,我们把之前的请求业务处理衔接起来,一步一步分析。ZooKeeperServer.processPacket()>submitRequest()>enqueueRequest()>RequestThrottler.submitRequest() ,我们来看下RequestThrottler.submitRequest()源码,它将当前请求添加到submittedRequests队列中了,源码如下:

在submitRequest中会执行firstProcessor.processRequest方法,会进入到PrepRequestProcessor.processRequest(request)

public void processRequest(Request request) {
    submittedRequests.add(request);
}

public void run() {
    while (true) {
        Request request = submittedRequests.take();
        pRequest(request);
    }
}

protected void pRequest(Request request) throws RequestProcessorException {
    request.setHdr(null);
    request.setTxn(null);
    switch (request.type) {
    case OpCode.createSession: //针对连接请求做处理
    case OpCode.closeSession:
        if (!request.isLocalSession()) {
            pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
        }
        break;
    }
}

protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) {
    switch (type) {
    case OpCode.createSession:
        int to = request.request.getInt();
        request.setTxn(new CreateSessionTxn(to));
        zks.sessionTracker.trackSession(request.sessionId, to);
        zks.setOwner(request.sessionId, request.getOwner());
        break;
    }
}

从代码可以看出pRequest2Txn()方法主要做了权限校验、快照记录、事务信息记录相关的事,还并未涉及数据处理,也就是说PrepRequestProcessor其实是做了操作前权限校验、快照记录、事务信息记录相关的事。

4.3 SyncRequestProcessor

分析了PrepRequestProcessor处理器后,接着来分析SyncRequestProcessor,该处理器主要是将请求数据高效率存入磁盘,并且请求在写入磁盘之前是不会被转发到下个处理器的。

我们先看请求被添加到队列的方法:

public void processRequest(Request request) {
    // request.addRQRec(">sync");
    //将请求添加到queueRequest队列中
    queuedRequests.add(request);
}

同样SyncRequestProcessor是一个线程,执行队列中的请求也在线程中触发,我们看它的run方法,源码如下:

public void run() {
    try {
        int logCount = 0;

        // we do this in an attempt to ensure that not all of the servers
        // in the ensemble take a snapshot at the same time
        int randRoll = r.nextInt(snapCount/2);
        while (true) {
            Request si = null;
            if (toFlush.isEmpty()) {
                //阻塞方法获取一个请求
                si = queuedRequests.take();
            } else {
                si = queuedRequests.poll();
                if (si == null) {
                    flush(toFlush);
                    continue;
                }
            }
            if (si == requestOfDeath) {
                break;
            }
            if (si != null) {
                // track the number of records written to the log
                if (zks.getZKDatabase().append(si)) {
                    logCount++;
                    if (logCount > (snapCount / 2 + randRoll)) {
                        randRoll = r.nextInt(snapCount/2);
                        // roll the log
                        //重置上次rollLog以来的txn数量
                        zks.getZKDatabase().rollLog();
                        // take a snapshot
                        if (snapInProcess != null && snapInProcess.isAlive()) {
                            LOG.warn("Too busy to snap, skipping");
                        } else {
                            snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                                public void run() {
                                    try {
                                        //保存快照数据
                                        zks.takeSnapshot();
                                    } catch(Exception e) {
                                        LOG.warn("Unexpected exception", e);
                                    }
                                }
                            };
                            snapInProcess.start();
                        }
                        logCount = 0;
                    }
                } else if (toFlush.isEmpty()) {
                    // optimization for read heavy workloads
                    // iff this is a read, and there are no pending
                    // flushes (writes), then just pass this to the next
                    // processor
                    if (nextProcessor != null) {
                        nextProcessor.processRequest(si);
                        if (nextProcessor instanceof Flushable) {
                            ((Flushable)nextProcessor).flush();
                        }
                    }
                    continue;
                }
                //将当前请求添加到toFlush队列中,toFlush队列是已经写入并等待刷新到磁盘的事务
                toFlush.add(si);
                if (toFlush.size() > 1000) {
                    //提交数据
                    flush(toFlush);
                }
            }
        }
    } catch (Throwable t) {
        handleException(this.getName(), t);
    } finally{
        running = false;
    }
    LOG.info("SyncRequestProcessor exited!");
}

run方法会从queuedRequests队列中获取一个请求,如果获取不到就会阻塞等待直到获取到一个请求对象,程序才会继续往下执行,接下来会调用Snapshot Thread线程实现将客户端发送的数据以快照的方式写入磁盘,最终调用flush()方法实现数据提交,flush()方法源码如下:

private void flush(LinkedList<Request> toFlush)
    throws IOException, RequestProcessorException
{
    if (toFlush.isEmpty())
        return;
    //数据提交
    zks.getZKDatabase().commit();
    while (!toFlush.isEmpty()) {
        Request i = toFlush.remove();
        if (nextProcessor != null) {
            //调用下一个业务链
            nextProcessor.processRequest(i);
        }
    }
    if (nextProcessor != null && nextProcessor instanceof Flushable) {
        ((Flushable)nextProcessor).flush();
    }
}

flush()方法实现了数据提交,并且会将请求交给下一个业务链,下一个业务链为FinalRequestProcessor

4.4 FinalRequestProcessor

前面分析了SyncReqeustProcessor,接着分析请求处理链中最后的一个处理器FinalRequestProcessor,该业务处理对象主要用于返回Response。

在SyncRequestProcessor对txn(创建session的操作)进行持久化,在FinalRequestProcessor会对Session进行提交,其实就是把Session的ID和Timeout存到sessionsWithTimeout中去。

public void processRequest(Request request) {
    ProcessTxnResult rc = zks.processTxn(request);
    switch (request.type) {
        case OpCode.createSession: {
            lastOp = "SESS";
            updateStats(request, lastOp, lastZxid);
            zks.finishSessionInit(request.cnxn, true);
            return;
        }
    }
    if (path == null || rsp == null) {
        cnxn.sendResponse(hdr, rsp, "response"); //服务端将请求返回,这时客户端会收到服务端响应
    }
}

public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
    // register with JMX
    if (valid) {
        if (serverCnxnFactory != null && serverCnxnFactory.cnxns.contains(cnxn)) {
            serverCnxnFactory.registerConnection(cnxn);
        } else if (secureServerCnxnFactory != null && secureServerCnxnFactory.cnxns.contains(cnxn)) {
            secureServerCnxnFactory.registerConnection(cnxn);
        }
    }
}

调用sendResponse方法后会将请求信息返回给客户端

客户端收到服务端响应

ClientCnxnSocketNIO.doIO(Queue<Packet>, ClientCnxn)

void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
    if (sockKey.isReadable()) {
        if (!initialized) {
            readConnectResult();
        }
    }
}

void readConnectResult() throws IOException {
    ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
    BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
    ConnectResponse conRsp = new ConnectResponse();
    conRsp.deserialize(bbia, "connect");
    this.sessionId = conRsp.getSessionId(); //连接建立完成
    sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);
}

下文预告

  1. 理解Zookeeper的watcher机制原理

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/76687.html

(1)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!