序
近期在研究java nio Selector
时发现一件奇怪的事情:使用nio
编写socket
通信时,客户端莫名其妙无限次接收到服务端传过来的数据,导致客户端一直处于等待接收状态,完全影响了整个系统的性能。下面让我们来回顾一下整个事情发生经过:
问题描述
问题代码
服务端代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
/**
* 服务端代码
* @author yinan
* @date 18-9-11
*/
public class NIOServer {
public static void main(String[] args) throws IOException {
//创建一个channel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//绑定端口
serverSocketChannel.socket().bind(
new InetSocketAddress("127.0.0.1", 8881)
);
//设置通道为非阻塞
serverSocketChannel.configureBlocking(false);
Selector selector = Selector.open();
//注册channel到selector
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (selector.select() > 0) {
//罗列所有的channel状态
Iterator<SelectionKey> iterable = selector.selectedKeys().iterator();
while (iterable.hasNext()) {
SelectionKey key = iterable.next();
iterable.remove();
NIOServer server = new NIOServer();
if (key.isAcceptable()) {
//获取满足条件的channel 即 OP_ACCEPT
ServerSocketChannel socketChannel = (ServerSocketChannel) key.channel();
//服务端接收一个客户端连接
SocketChannel channel = socketChannel.accept();
channel.configureBlocking(false);
//注册这个客户端连接状态
channel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()){
server.receive(key);
} else if (key.isWritable()) {
server.write(key);
}
}
}
}
/**
* 处理写入就绪事件
* @param key
* @throws IOException
*/
private void write(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
String host = remote.getHostName();
Charset charset = Charset.forName("utf-8");
//获取http请求数据
String request = compositeRequest(host);
//向SocketChannel中写入事件
channel.write(charset.encode(request));
}
private void receive(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
buffer.flip();
Charset charset = Charset.forName("utf-8");
String receiveData = charset.decode(buffer).toString();
//当没有数据可读时,取消在选择器中关联,并关闭socket连接
if ("".equals(receiveData)) {
key.channel();
channel.close();
return;
}
System.out.println(String.format("客户端发送信息: %s", receiveData));
key.interestOps(SelectionKey.OP_WRITE);
}
private static String compositeRequest(String host){
return "明天,你好";
}
}
|
客户端代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
/**
* @author yinan
* @date 18-9-11
*/
public class NIOClient {
private static Selector selector;
static {
try {
selector = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
try {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8881));
socketChannel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE);
NIOClient client = new NIOClient();
//selector.select() 是同步阻塞的
while (selector.select() > 0) {
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
if (key.isConnectable()) {
client.connect(key);
} else if (key.isReadable()) {
client.receive(key);
} else if (key.isWritable()) {
client.write(key);
}
}
}
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 处理连接事件
* @param key
* @throws IOException
*/
private void connect(SelectionKey key) throws IOException {
//获取事件句柄对应的channel
SocketChannel channel = (SocketChannel) key.channel();
//完成真正的socket连接
channel.finishConnect();
//打印连接信息
InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
String host = remote.getHostName();
int port = remote.getPort();
System.out.println(String.format("访问地址: %s:%s 连接成功!", host, port));
}
/**
* 处理写入就绪事件
* @param key
* @throws IOException
*/
private void write(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
String host = remote.getHostName();
Charset charset = Charset.forName("utf-8");
//获取http请求数据
String request = compositeRequest(host);
//向SocketChannel中写入事件
channel.write(charset.encode(request));
//修改SocketChannel所关心的事件
key.interestOps(SelectionKey.OP_READ);
}
private void receive(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
buffer.flip();
Charset charset = Charset.forName("utf-8");
String receiveData = charset.decode(buffer).toString();
//当没有数据可读时,取消在选择器中关联,并关闭socket连接
if ("".equals(receiveData)) {
key.channel();
channel.close();
return;
}
System.out.println(String.format("服务端返回信息: %s", receiveData));
}
private static String compositeRequest(String host){
return "GET / HTTP/1.1\r\n" +
"Host: " + host + "\r\n" +
"User-Agent: curl/7.43.0\r\n" +
"Accept: */*\r\n\r\n";
}
}
|
结果展示
服务端
客户端
问题
- 为何会出现这种客户端无限接收数据的情况?
- 从代码中,我们可以看到,在服务端初始化的时候
ServerSocketChannel
向Selector
中注册了一个OP_ACCEPT
状态,服务端接收到一个客户端请求之后,SocketChannel
又向Selector
中注册了一个OP_READ
状态,那么这两个状态是怎样保存的?会不会被清理掉?
- 为何服务端每次仅仅发送一句
明天,你好
,客户端每一次却接收到那么多句?
- 正常情况下,服务端向客户端发送一段数据,客户端会不会接收到多条?
问题解决
问题定位
从我们发的代码中可以看到,不论是服务端还是客户端,都是通过检测当前channel
的四种状态来处理不同的逻辑,所以出现无限接收数据的情况肯定是和状态有关联。那我们首先从这几种状态转移入手,看看一个channel
的状态是如何转移和获取的。
服务端在初始化时,会首先在selector
中注册一个自己感兴趣的状态为OP_ACCEPT
1
|
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
|
在selector
进行select
的时候,会将准备就绪的channel
对应的SelectionKey
取出来,通过判断SelectionKey
中记录的状态让当前channel
处理不同的逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
while (selector.select() > 0) {
//罗列所有的channel状态
Iterator<SelectionKey> iterable = selector.selectedKeys().iterator();
while (iterable.hasNext()) {
SelectionKey key = iterable.next();
iterable.remove();
NIOServer server = new NIOServer();
if (key.isAcceptable()) {
//首先会进入这里
} else if (key.isReadable()){
} else if (key.isWritable()) {
}
}
}
|
由于ServerSocketChannel
注册的是OP_ACCEPT
状态,所以会进入第一个判断逻辑中,在这个逻辑片段中主要做的是接受一个客户端的连接,并为该客户端注册一些感兴趣的状态。
1
2
3
4
5
6
7
8
9
|
if (key.isAcceptable()) {
//获取满足条件的channel 即 OP_ACCEPT
ServerSocketChannel socketChannel = (ServerSocketChannel) key.channel();
//服务端接收一个客户端连接
SocketChannel channel = socketChannel.accept();
channel.configureBlocking(false);
//注册这个客户端连接状态
channel.register(selector, SelectionKey.OP_READ);
}
|
此时,在selector
中应该存在两个SelectionKey
,分别用来保存ServerSocketChannel
和SocketChannel
感兴趣的状态,通过debug
,我们也的确看到了两个key
。
补充说明
在selector
中存在着三种选择建:
keys
:保存了所有已注册且没有cancel
的选择键,Set
类型.可以通过selector.keys()
获取
selectedKeys
:已选择键集,即前一次操作期间,已经准备就绪的通道所对应的选择键.此集合为keys
的子集.通过selector.selectedKeys()
获取
canceledKeys
:已取消键.已经被取消但尚未取消注册(deregister
)的选择键.此集合不可被访问.为keys()
的子集
对于新的Selector
实例,上述三个集合均为空.通过channel.register
将导致keys()
集合被添加.如果某个selectionKey.cancel()
被调用,那么此key
将会被添加到canceldKey
集合中,在下一次selector
选择期间,如果canceldKeys
不为空,将会导致触发此key
的deregister
操作(释放资源,并从keys
中移除).无论通过channel.close()
还是通过selectionKey.cancel()
,都会导致key
被加入到cannceldKey
中
此时后续的循环结束,代码再次跳转到selector.select()
,selector
开始检测准备就绪的channel
并将其设置到selector
的selectedKeys
中,便于针对该channel
进行处理。但是,当代码运行到该处时,我发现了一个意料之外的情况,原本两个key
仅仅有一个进入到了selectedKeys
:
经过确认之后,发现是channel.register(selector, SelectionKey.OP_READ)
中注册的READ
状态进入了selectedKeys
,而serverChannel
的accept
状态并没有被selector
放入selectedKeys
。
通过以上的调试,我们可以猜想到当ServerSocketChannel
接收到一个客户端连接时之后,会通过某个配置修改状态,导致selector
在新的一轮执行select()
方法时没有将ServerSocketChannel
从keys
中加载到selectedKeys
中。
我们继续调试,由于上一次我们配置了SocketChannel
的感兴趣状态为READ
,所以会在这一次遍历的时候进入:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
if (key.isReadable()) {
client.receive(key);
}
private void receive(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
buffer.flip();
Charset charset = Charset.forName("utf-8");
String receiveData = charset.decode(buffer).toString();
//当没有数据可读时,取消在选择器中关联,并关闭socket连接
if ("".equals(receiveData)) {
key.channel();
channel.close();
return;
}
System.out.println(String.format("客户端发送信息: %s", receiveData));
key.interestOps(SelectionKey.OP_WRITE);
}
|
我们可以来看看经过这段代码的执行,当前的SocketChannel
的SelectionKey
是新增了一种状态还是修改了当前的SelectionKey
状态
执行该段代码前
执行该段代码后
我们发现SocketChannel
中的SelectionKey
并没有新增一种状态,而是更新了原本的状态,这样在下一次执行selector.select()
方法时,该SocketChannel
仅仅会去追踪WRITE
状态。
问题解答
因此,到目前为止,我们可以回答之前提出的前两个问题了:
从代码中,我们可以看到,在服务端初始化的时候ServerSocketChannel向Selector中注册了一个OP_ACCEPT状态,服务端接收到一个客户端请求之后,SocketChannel又向Selector中注册了一个OP_READ状态,那么这两个状态是怎样保存的?会不会被清理掉?
首先所有的状态都不会被清理掉,这些状态只有几种结果:被重新配置进selectedKeys
、被新的状态覆盖掉、被主动cancel
进入cancelKeys
以及被放入到keys
中无法被selector
选择。所以新的状态都是保存在selector
的keys
中的,并不会被清理掉,仅仅当SocketChannel
需要监听的状态发生改变时,原状态会被覆盖掉。
为何会出现这种客户端无限接收数据的情况?
从代码中我们可以发现,由于我们在receive()
方法中将SocketChannel
的感兴趣状态修改成了OP_WRITE
之后,一直没有再变化,所以导致selector
在select()
的时候,永远都会触发key.isWritable()
方法,进而出现无限接收数据的情况。避免这种现象发生其实很简单,只需要在write()
方法中重新指定SocketChannel
感兴趣的状态,或者直接cancel
掉当前SocketChannel
的SelectionKey
即可。
接下来我们来讨论后面的两个问题,其实后面两个问题都是和nio
中的一个新的类型ByteBuffer
有关,ByteBuffer
在初始化的时候会定义一个自定义大小的缓存空间,用来缓存指定大小的字段。当该空间未满后者不足时,会通过多次接收来解决,所以下面的两个问题就可以解答了。
为何服务端每次仅仅发送一句明天,你好
,客户端每一次却接收到那么多句?
因为客户端接收数据的ByteBuffer
设置过大,所以它会一直缓存直到该空间满了之后才会输出到控制台,即我们看到的多条语句。其实这里是服务端发送多条数据,客户端一次打印出所有数据。
正常情况下,服务端向客户端发送一段数据,客户端会不会接收到多条?
正常情况下会出现服务端发送一条数据,客户端接收多条,这是因为服务端发送的数据过大,客户端的ByteBuffer
设置过小,导致无法一次性接收完所有的数据,需要多次接收完服务端发送的数据。