为什么要使用NIO?Tomcat是如何解决服务器端高并发的请求

导读:本篇文章讲解 为什么要使用NIO?Tomcat是如何解决服务器端高并发的请求,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

为什么要使用NIO

在Java中使用Socket(套接字)实现了基于TCP/IP协议的网络编程。以HTTP协议为例,在HTTP服务器端的开发中,如果不使用NIO该如何实现呢?

单个线程定义客户端连接

使用ServerSocket绑定某个端口号,监听客户端的请求,如果有客户端向服务端发送请求,就会建立TCP连接,生成Socket客户端,与服务器基于i/o流实现信息交互。示例代码如下:

public static void main(String[] args) {
        ServerSocket serverSocket = null;
        int port = 8080;
        try {
            // 创建ServerSocket,监听8080端口
            serverSocket = new ServerSocket(port, 1, InetAddress.getByName("127.0.0.1"));
        } catch (IOException e) {
        }
        // 循环监听客户端请求
        while (!shutdown) {
            Socket socket = null;
            InputStream input = null;
            OutputStream output = null;
            try {
                socket = serverSocket.accept();
                input = socket.getInputStream();
                output = socket.getOutputStream();
                // 处理客户端请求
                Request request = new Request(input);
                request.parse();
                // 响应客户端
                Response response = new Response(output);
                response.setRequest(request);
                response.sendStaticResource();
                // 关闭socket连接
                socket.close();
            } catch (Exception e) {
                continue;
            }
        }
    }

该种方式,每次监听到客户端请求,就创建一个Socket标识一个TCP连接。需要,等本次连接过程中,所有的业务逻辑处理完毕,关闭该Socket连接后,才可以执行下一个客户端请求。如果多个客户端同时发送请求时,此时也只有一个线程,会排队执行,效率比较低下,流程如下:

为什么要使用NIO?Tomcat是如何解决服务器端高并发的请求

多个线程定义客户端连接

如果每接收到一个用户端的请求,就创建一个线程来执行业务请求,一个线程关联一个Socket(套接字),可以提高服务器端的并发性。流程如下:

为什么要使用NIO?Tomcat是如何解决服务器端高并发的请求

线程池定义客户端连接

如果,随着客户端请求的增加,线程数会成指数的增加,由于线程之间竞争CPU会导致服务器执行速度变得很慢。如果此时,使用一个线程池来接收客户端连接,可以避免以上问题,流程如下:

为什么要使用NIO?Tomcat是如何解决服务器端高并发的请求

NIO定义客户端连接

但是,使用线程池也会存在一个问题,就是,一个Socket会对应的Thread线程。如果,客户端请求比较多,用于Socket连接的线程就会使用很多。如果,在并发性要求比较的服务器端,可以使用NIO的事件监听机制,NIO使用channel(管道)和Selector(选择器)的机制,使用选择器监听注册的Socket的事件,一个线程就可以监听注册的客户端连接。现在,一般CPU都是多核,可以在服务器中启动与CPU核心相等个数的线程,并行的监听客户端链接。NIO监听客户端链接的流程如下:

为什么要使用NIO?Tomcat是如何解决服务器端高并发的请求

Tomcat如何实现HTTP服务器

以Tomcat源码中HTTP服务器为例,在tomcat中,使用了基于线程池定义客户端连接的Http11Protocol,和基于NIO定义的客户端连接Http11NioProtocol。

基于线程组的Http11Protocol

Http11Protocol,预先定义指定个数的线程数组,用于监听客户端的链接。并且使用基于AQS的LimitLatch同步锁,来实现指定客户端请求的限流,防止过多的客户端请求压垮服务器,默认拦截数为200。其大致流程如下:

为什么要使用NIO?Tomcat是如何解决服务器端高并发的请求

分析源码得知,在Http11Protocol中,JIoEndpoint基于Acceptor的线程数组监听客户端连接请求,其内部类Acceptor定义了具体的监听。Acceptor源码如下:

// JIoEndpoint.class
protected class Acceptor extends AbstractEndpoint.Acceptor {

        // 线程池执行的任务
        @Override
        public void run() {

             while (running) {
                while (paused && running) {
                    state = AcceptorState.PAUSED;
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }

                if (!running) {
                    break;
                }
                state = AcceptorState.RUNNING;

                try {
                    // 基于AQS的LimitLatch 进行客户端请求限流
                    countUpOrAwaitConnection();
                    Socket socket = null;
                    try {
                        // 创建客户端的连接Socket
                        socket = serverSocketFactory.acceptSocket(serverSocket);
                    } catch (IOException ioe) {
                        countDownConnection();
                        // Introduce delay if necessary
                        errorDelay = handleExceptionWithDelay(errorDelay);
                        // re-throw
                        throw ioe;
                    }
                    // Successful accept, reset the error delay
                    errorDelay = 0;

                    // Configure the socket
                    if (running && !paused && setSocketOptions(socket)) {
                        // 处理客户端业务请求
                        if (!processSocket(socket)) {
                            countDownConnection();
                            // Close socket right away
                            closeSocket(socket);
                        }
                    } else {
                        countDownConnection();
                        // Close socket right away
                        closeSocket(socket);
                    }
                } catch (IOException x) {
                    if (running) {
                        log.error(sm.getString("endpoint.accept.fail"), x);
                    }
                } catch (NullPointerException npe) {
                    if (running) {
                        log.error(sm.getString("endpoint.accept.fail"), npe);
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error(sm.getString("endpoint.accept.fail"), t);
                }
            }
            state = AcceptorState.ENDED;
        }
    }

    protected void countUpOrAwaitConnection() throws InterruptedException {
        if (maxConnections==-1) return;
        LimitLatch latch = connectionLimitLatch;
        // 使用LimitLatch 增加建立的客户端个数,如果达到限制,则阻塞
        if (latch!=null) latch.countUpOrAwait();
    }

在JIoEndpoint初始化时,就会创建执行个数的Acceptor任务组,默认值线程数为1,一个线程对应一个Acceptor任务,一个Acceptor任务对应一个Socket客户端连接,JIoEndpoint初始化代码如下:

// JIoEndpoint.class
@Override
public void startInternal() throws Exception {

        if (!running) {
            running = true;
            paused = false;

            // Create worker collection
            if (getExecutor() == null) {
                createExecutor();
            }
            // 初始化AQS的LimitLatch 同步锁
            initializeConnectionLatch();
            // 初始化Acceptor 线程组
            startAcceptorThreads();

            // Start async timeout thread
            Thread timeoutThread = new Thread(new AsyncTimeout(),
                    getName() + "-AsyncTimeout");
            timeoutThread.setPriority(threadPriority);
            timeoutThread.setDaemon(true);
            timeoutThread.start();
        }
    }

protected final void startAcceptorThreads() {
        int count = getAcceptorThreadCount();
        // 创建Acceptor任务数组
        acceptors = new Acceptor[count];

        for (int i = 0; i < count; i++) {
            acceptors[i] = createAcceptor();
            String threadName = getName() + "-Acceptor-" + i;
            acceptors[i].setThreadName(threadName);
            // 使用线程并行监听客户端链接的任务
            Thread t = new Thread(acceptors[i], threadName);
            t.setPriority(getAcceptorThreadPriority());
            t.setDaemon(getDaemon());
            t.start();
        }
    }

基于NIO的Http11NioProtocol

在使用Http11Protocol的过程中,每个客户端Socket链接对应一个线程,Http11Protocol中默认的客户端拦截个数设置是200。在默认配置下,如果请求量比较大的情况下,光是用于建立客户端连接的线程就会使用至少200个。如果,使用NIO的话,则只需要少数线程(Math.min(2,Runtime.getRuntime().availableProcessores())个),既可以监听大量的客户端链接。在Tomcat7中,Http11NioProtocol基于NIO的模式,默认最大的客户端链接数限制是1000,远远大于Http11Protocol的限制数。

Http11NioProtocol的NioEndpoint中,Acceptor只是用于客户端请求的SocketChannel(Socket管道)与Poller中的selector(选择器)的绑定,而具体客户端建立链接的具体逻辑,在Poller中处理。在Poller中,通过selector(选择器)获取触发的客户端连接请求,其流程大致如下:

为什么要使用NIO?Tomcat是如何解决服务器端高并发的请求

在Http11NioProtocol中,NioEndpoint中的Acceptor 是获取SocketChanel(Socket管道)绑定Poller中selector选择器的具体实现。Acceptor 的源码如下:

// NioEndpoint.class
 protected class Acceptor extends AbstractEndpoint.Acceptor {

        @Override
        public void run() {

            int errorDelay = 0;
            while (running) {

                // Loop if endpoint is paused
                while (paused && running) {
                    state = AcceptorState.PAUSED;
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }

                if (!running) {
                    break;
                }
                state = AcceptorState.RUNNING;

                try {
                    // 使用LimitLatch记录客户端连接数,超过限制个数,则线程阻塞
                    countUpOrAwaitConnection();
                    SocketChannel socket = null;
                    try {
                        // 使用Channel管道监听客户端请求,并将Channel注册到Poller的选择器
                        socket = serverSock.accept();
                    } catch (IOException ioe) {
                        //we didn't get a socket
                        countDownConnection();
                        // Introduce delay if necessary
                        errorDelay = handleExceptionWithDelay(errorDelay);
                        // re-throw
                        throw ioe;
                    }
                    errorDelay = 0;
                    if (running && !paused) {
                        // 设置Socket管道到Poller
                        if (!setSocketOptions(socket)) {
                            countDownConnection();
                            closeSocket(socket);
                        }
                    } else {
                        countDownConnection();
                        closeSocket(socket);
                    }
                } 
                ........
            }
            state = AcceptorState.ENDED;
        }
    }

 /**
   * 设置SocketChannel 属性,并注册到Poller的Selector选择器
   */
protected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        try {
            // 设置SocketChannel管道为非阻塞模式
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);
            // 包装NioChannel 
            NioChannel channel = nioChannels.poll();
            // 省略NioChannel 的属性设置
            .......

            // 注册SocketChannel 到Poller对应的Selector选择器
            getPoller0().register(channel);
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            try {
                log.error("",t);
            } catch (Throwable tt) {
                ExceptionUtils.handleThrowable(tt);
            }
            // Tell to close the socket
            return false;
        }
        return true;
    }

// NioEndpoint.class
 public void register(final NioChannel socket) {
            // 关联 Poller
            socket.setPoller(this);
            KeyAttachment key = keyCache.poll();
            final KeyAttachment ka = key!=null?key:new KeyAttachment(socket);
            ka.reset(this,socket,getSocketProperties().getSoTimeout());
            ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
            ka.setSecure(isSSLEnabled());
            PollerEvent r = eventCache.poll();
            ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
            // PollerEvent任务异步注册SocketChannel 到Poller 的Selector选择器
            if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
            else r.reset(socket,ka,OP_REGISTER);
            addEvent(r);
        }

// PollerEvent.class
@Override
public void run() {
            if ( interestOps == OP_REGISTER ) {
                try {
                    // SocketChannel注册到Poller的Selector选择器
                    socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
                } catch (Exception x) {
                    log.error("", x);
                }
            } else {
                final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
                try {
                    if (key == null) {
                        socket.getPoller().getEndpoint().countDownConnection();
                    } else {
                        final KeyAttachment att = (KeyAttachment) key.attachment();
                        if ( att!=null ) {
                            //handle callback flag
                            if (att.isComet() && (interestOps & OP_CALLBACK) == OP_CALLBACK ) {
                                att.setCometNotify(true);
                            } else {
                                att.setCometNotify(false);
                            }
                            interestOps = (interestOps & (~OP_CALLBACK));//remove the callback flag
                            att.access();
                            int ops = key.interestOps() | interestOps;
                            att.interestOps(ops);
                            key.interestOps(ops);
                        } else {
                            socket.getPoller().cancelledKey(key, SocketStatus.ERROR, false);
                        }
                    }
                } catch (CancelledKeyException ckx) {
                    try {
                        socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT, true);
                    } catch (Exception ignore) {}
                }
            }
        }

Poller用于监听SocketChannel管道中触发的客户端Socket的链接。在NioEndpoint的初始化过程中,会根据配置个数初始化Poller数组,其个数设置的算法为Math.min(2,Runtime.getRuntime().availableProcessores())个,提高获取客户端链接的效率。Poller初始化,以及监听客户端连接的源码如下:

// NioEndpoint.class
@Override
public void startInternal() throws Exception {

        if (!running) {
            running = true;
            paused = false;

            // Create worker collection
            if ( getExecutor() == null ) {
                createExecutor();
            }
            // 初始化AQS LimitLatch同步器
            initializeConnectionLatch();

            // 初始化Poller数组
            pollers = new Poller[getPollerThreadCount()];
            for (int i=0; i<pollers.length; i++) {
                pollers[i] = new Poller();
                Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
                pollerThread.setPriority(threadPriority);
                pollerThread.setDaemon(true);
                pollerThread.start();
            }

            startAcceptorThreads();
        }
    }

// 基于selector监听客户端链接
@Override
        public void run() {
            // 循环监听
            while (true) {
                try {
                    // Loop if endpoint is paused

                        省略......

                    // Time to terminate?
                    省略......
                    
                    // 获取触发监听事件的Socket链接
                    Iterator<SelectionKey> iterator =
                        keyCount > 0 ? selector.selectedKeys().iterator() : null;
                    // 遍历所有Socket链接
                    while (iterator != null && iterator.hasNext()) {
                        SelectionKey sk = iterator.next();
                        KeyAttachment attachment = (KeyAttachment)sk.attachment();
                        // Attachment may be null if another thread has called
                        // cancelledKey()
                        if (attachment == null) {
                            iterator.remove();
                        } else {
                            attachment.access();
                            iterator.remove();
                            // 处理具体的客户端请求业务
                            processKey(sk, attachment);
                        }
                    }//while

                    //process timeouts
                    timeout(keyCount,hasEvents);
                    if ( oomParachute > 0 && oomParachuteData == null ) checkParachute();
                } catch (OutOfMemoryError oom) {
                    try {
                        oomParachuteData = null;
                        releaseCaches();
                        log.error("", oom);
                    }catch ( Throwable oomt ) {
                        try {
                            System.err.println(oomParachuteMsg);
                            oomt.printStackTrace();
                        }catch (Throwable letsHopeWeDontGetHere){
                            ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
                        }
                    }
                }
            }
            // 设置并发控制客户端个数
            stopLatch.countDown();
        }}

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/13617.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!