Featured image of post NIO Selector 无限触发事件

NIO Selector 无限触发事件

java nio 模型中没有控制好 channel 的状态,容易导致 selector 无限触发某一个事件

近期在研究java nio Selector时发现一件奇怪的事情:使用nio编写socket通信时,客户端莫名其妙无限次接收到服务端传过来的数据,导致客户端一直处于等待接收状态,完全影响了整个系统的性能。下面让我们来回顾一下整个事情发生经过:

问题描述

问题代码

服务端代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;

/**
 * 服务端代码
 * @author yinan
 * @date 18-9-11
 */
public class NIOServer {

    public static void main(String[] args) throws IOException {
        //创建一个channel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //绑定端口
        serverSocketChannel.socket().bind(
                new InetSocketAddress("127.0.0.1", 8881)
        );
        //设置通道为非阻塞
        serverSocketChannel.configureBlocking(false);
        Selector selector = Selector.open();
        //注册channel到selector
        serverSocketChannel.register(selector,  SelectionKey.OP_ACCEPT);
        while (selector.select() > 0) {
            //罗列所有的channel状态
            Iterator<SelectionKey> iterable = selector.selectedKeys().iterator();
            while (iterable.hasNext()) {
                SelectionKey key = iterable.next();
                iterable.remove();
                NIOServer server = new NIOServer();
                if (key.isAcceptable()) {
                    //获取满足条件的channel 即 OP_ACCEPT
                    ServerSocketChannel socketChannel = (ServerSocketChannel) key.channel();
                    //服务端接收一个客户端连接
                    SocketChannel channel = socketChannel.accept();
                    channel.configureBlocking(false);
                    //注册这个客户端连接状态
                    channel.register(selector, SelectionKey.OP_READ);
                } else if (key.isReadable()){
                    server.receive(key);
                } else if (key.isWritable()) {
                    server.write(key);
                }
            }


        }

    }

    /**
     * 处理写入就绪事件
     * @param key
     * @throws IOException
     */
    private void write(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
        String host = remote.getHostName();
        Charset charset = Charset.forName("utf-8");
        //获取http请求数据
        String request = compositeRequest(host);
        //向SocketChannel中写入事件
        channel.write(charset.encode(request));
    }

    private void receive(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        channel.read(buffer);
        buffer.flip();
        Charset charset = Charset.forName("utf-8");
        String receiveData = charset.decode(buffer).toString();
        //当没有数据可读时,取消在选择器中关联,并关闭socket连接
        if ("".equals(receiveData)) {
            key.channel();
            channel.close();
            return;
        }
        System.out.println(String.format("客户端发送信息: %s", receiveData));
        key.interestOps(SelectionKey.OP_WRITE);
    }

    private static String compositeRequest(String host){

        return "明天,你好";
    }

}

客户端代码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

/**
 * @author yinan
 * @date 18-9-11
 */
public class NIOClient {
    private static Selector selector;
    static {
        try {
            selector = Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
        try {
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            socketChannel.connect(new InetSocketAddress("127.0.0.1", 8881));
            socketChannel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE);
            NIOClient client = new NIOClient();
            //selector.select() 是同步阻塞的
            while (selector.select() > 0) {
                Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                while (keys.hasNext()) {
                    SelectionKey key = keys.next();
                    keys.remove();
                    if (key.isConnectable()) {
                        client.connect(key);
                    } else if (key.isReadable()) {
                        client.receive(key);
                    } else if (key.isWritable()) {
                        client.write(key);
                    }
                }
            }
            socketChannel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 处理连接事件
     * @param key
     * @throws IOException
     */
    private void connect(SelectionKey key) throws IOException {
        //获取事件句柄对应的channel
        SocketChannel channel = (SocketChannel) key.channel();
        //完成真正的socket连接
        channel.finishConnect();
        //打印连接信息
        InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
        String host = remote.getHostName();
        int port = remote.getPort();
        System.out.println(String.format("访问地址: %s:%s 连接成功!", host, port));

    }

    /**
     * 处理写入就绪事件
     * @param key
     * @throws IOException
     */
    private void write(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
        String host = remote.getHostName();
        Charset charset = Charset.forName("utf-8");
        //获取http请求数据
        String request = compositeRequest(host);
        //向SocketChannel中写入事件
        channel.write(charset.encode(request));
        //修改SocketChannel所关心的事件
        key.interestOps(SelectionKey.OP_READ);

    }

    private void receive(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        channel.read(buffer);
        buffer.flip();
        Charset charset = Charset.forName("utf-8");
        String receiveData = charset.decode(buffer).toString();
        //当没有数据可读时,取消在选择器中关联,并关闭socket连接
        if ("".equals(receiveData)) {
            key.channel();
            channel.close();
            return;
        }
        System.out.println(String.format("服务端返回信息: %s",  receiveData));
    }

    private static String compositeRequest(String host){

        return "GET / HTTP/1.1\r\n" +
                "Host: " + host + "\r\n" +
                "User-Agent: curl/7.43.0\r\n" +
                "Accept: */*\r\n\r\n";
    }


}

结果展示

服务端 1547793849782.gif 客户端 20190118 145252屏幕截图.png

问题

  • 为何会出现这种客户端无限接收数据的情况?
  • 从代码中,我们可以看到,在服务端初始化的时候ServerSocketChannelSelector中注册了一个OP_ACCEPT状态,服务端接收到一个客户端请求之后,SocketChannel又向Selector中注册了一个OP_READ状态,那么这两个状态是怎样保存的?会不会被清理掉?
  • 为何服务端每次仅仅发送一句明天,你好,客户端每一次却接收到那么多句?
  • 正常情况下,服务端向客户端发送一段数据,客户端会不会接收到多条?

问题解决

问题定位

从我们发的代码中可以看到,不论是服务端还是客户端,都是通过检测当前channel的四种状态来处理不同的逻辑,所以出现无限接收数据的情况肯定是和状态有关联。那我们首先从这几种状态转移入手,看看一个channel的状态是如何转移和获取的。

服务端在初始化时,会首先在selector中注册一个自己感兴趣的状态为OP_ACCEPT

1
serverSocketChannel.register(selector,  SelectionKey.OP_ACCEPT);

selector进行select的时候,会将准备就绪的channel对应的SelectionKey取出来,通过判断SelectionKey中记录的状态让当前channel处理不同的逻辑

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
while (selector.select() > 0) {
    //罗列所有的channel状态
    Iterator<SelectionKey> iterable = selector.selectedKeys().iterator();
    while (iterable.hasNext()) {
        SelectionKey key = iterable.next();
        iterable.remove();
        NIOServer server = new NIOServer();
        if (key.isAcceptable()) {
            //首先会进入这里
        } else if (key.isReadable()){
            
        } else if (key.isWritable()) {
            
        }
    }


}

由于ServerSocketChannel注册的是OP_ACCEPT状态,所以会进入第一个判断逻辑中,在这个逻辑片段中主要做的是接受一个客户端的连接,并为该客户端注册一些感兴趣的状态。

1
2
3
4
5
6
7
8
9
if (key.isAcceptable()) {
    //获取满足条件的channel 即 OP_ACCEPT
    ServerSocketChannel socketChannel = (ServerSocketChannel) key.channel();
    //服务端接收一个客户端连接
    SocketChannel channel = socketChannel.accept();
    channel.configureBlocking(false);
    //注册这个客户端连接状态
    channel.register(selector, SelectionKey.OP_READ);
}

此时,在selector中应该存在两个SelectionKey,分别用来保存ServerSocketChannelSocketChannel感兴趣的状态,通过debug,我们也的确看到了两个keywebwxgetmsgimg 1.jpeg

补充说明

selector中存在着三种选择建:

  • keys:保存了所有已注册且没有cancel的选择键,Set类型.可以通过selector.keys()获取
  • selectedKeys:已选择键集,即前一次操作期间,已经准备就绪的通道所对应的选择键.此集合为keys的子集.通过selector.selectedKeys()获取
  • canceledKeys:已取消键.已经被取消但尚未取消注册(deregister)的选择键.此集合不可被访问.为keys()的子集

对于新的Selector实例,上述三个集合均为空.通过channel.register将导致keys()集合被添加.如果某个selectionKey.cancel()被调用,那么此key将会被添加到canceldKey集合中,在下一次selector选择期间,如果canceldKeys不为空,将会导致触发此keyderegister操作(释放资源,并从keys中移除).无论通过channel.close()还是通过selectionKey.cancel(),都会导致key被加入到cannceldKey

此时后续的循环结束,代码再次跳转到selector.select()selector开始检测准备就绪的channel并将其设置到selectorselectedKeys中,便于针对该channel进行处理。但是,当代码运行到该处时,我发现了一个意料之外的情况,原本两个key仅仅有一个进入到了selectedKeyswebwxgetmsgimg 2.jpeg webwxgetmsgimg.jpeg 经过确认之后,发现是channel.register(selector, SelectionKey.OP_READ)中注册的READ状态进入了selectedKeys,而serverChannelaccept状态并没有被selector放入selectedKeys

通过以上的调试,我们可以猜想到当ServerSocketChannel接收到一个客户端连接时之后,会通过某个配置修改状态,导致selector在新的一轮执行select()方法时没有将ServerSocketChannelkeys中加载到selectedKeys中。

我们继续调试,由于上一次我们配置了SocketChannel的感兴趣状态为READ,所以会在这一次遍历的时候进入:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
if (key.isReadable()) {
    client.receive(key);
} 


private void receive(SelectionKey key) throws IOException {
    SocketChannel channel = (SocketChannel) key.channel();
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    channel.read(buffer);
    buffer.flip();
    Charset charset = Charset.forName("utf-8");
    String receiveData = charset.decode(buffer).toString();
    //当没有数据可读时,取消在选择器中关联,并关闭socket连接
    if ("".equals(receiveData)) {
        key.channel();
        channel.close();
        return;
    }
    System.out.println(String.format("客户端发送信息: %s", receiveData));
    key.interestOps(SelectionKey.OP_WRITE);
}

我们可以来看看经过这段代码的执行,当前的SocketChannelSelectionKey是新增了一种状态还是修改了当前的SelectionKey状态

执行该段代码前 webwxgetmsgimg 4.jpeg 执行该段代码后 webwxgetmsgimg 3.jpeg 我们发现SocketChannel中的SelectionKey并没有新增一种状态,而是更新了原本的状态,这样在下一次执行selector.select()方法时,该SocketChannel仅仅会去追踪WRITE状态。

问题解答

因此,到目前为止,我们可以回答之前提出的前两个问题了:

从代码中,我们可以看到,在服务端初始化的时候ServerSocketChannel向Selector中注册了一个OP_ACCEPT状态,服务端接收到一个客户端请求之后,SocketChannel又向Selector中注册了一个OP_READ状态,那么这两个状态是怎样保存的?会不会被清理掉?

首先所有的状态都不会被清理掉,这些状态只有几种结果:被重新配置进selectedKeys、被新的状态覆盖掉、被主动cancel进入cancelKeys以及被放入到keys中无法被selector选择。所以新的状态都是保存在selectorkeys中的,并不会被清理掉,仅仅当SocketChannel需要监听的状态发生改变时,原状态会被覆盖掉。

为何会出现这种客户端无限接收数据的情况?

从代码中我们可以发现,由于我们在receive()方法中将SocketChannel的感兴趣状态修改成了OP_WRITE之后,一直没有再变化,所以导致selectorselect()的时候,永远都会触发key.isWritable()方法,进而出现无限接收数据的情况。避免这种现象发生其实很简单,只需要在write()方法中重新指定SocketChannel感兴趣的状态,或者直接cancel掉当前SocketChannelSelectionKey即可。

接下来我们来讨论后面的两个问题,其实后面两个问题都是和nio中的一个新的类型ByteBuffer有关,ByteBuffer在初始化的时候会定义一个自定义大小的缓存空间,用来缓存指定大小的字段。当该空间未满后者不足时,会通过多次接收来解决,所以下面的两个问题就可以解答了。

为何服务端每次仅仅发送一句明天,你好,客户端每一次却接收到那么多句?

因为客户端接收数据的ByteBuffer设置过大,所以它会一直缓存直到该空间满了之后才会输出到控制台,即我们看到的多条语句。其实这里是服务端发送多条数据,客户端一次打印出所有数据。

正常情况下,服务端向客户端发送一段数据,客户端会不会接收到多条?

正常情况下会出现服务端发送一条数据,客户端接收多条,这是因为服务端发送的数据过大,客户端的ByteBuffer设置过小,导致无法一次性接收完所有的数据,需要多次接收完服务端发送的数据。

Licensed under CC BY-NC-SA 4.0
comments powered by Disqus