Java NIO实例

示例来源(imooc)(代码下载

NioServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;

/**
* NIO服务器端
*/
public class NioServer {

/**
* 启动
*/
public void start() throws IOException {
/**
* 1. 创建Selector
*/
Selector selector = Selector.open();

/**
* 2. 通过ServerSocketChannel创建channel通道
*/
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

/**
* 3. 为channel通道绑定监听端口
*/
serverSocketChannel.bind(new InetSocketAddress(8000));

/**
* 4. **设置channel为非阻塞模式**
*/
serverSocketChannel.configureBlocking(false);

/**
* 5. 将channel注册到selector上,监听连接事件
*/
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务器启动成功!");

/**
* 6. 循环等待新接入的连接
*/
for (;;) { // while(true) c for;;
/**
* TODO 获取可用channel数量
*/
int readyChannels = selector.select();

/**
* TODO 为什么要这样!!?
*/
if (readyChannels == 0) continue;

/**
* 获取可用channel的集合
*/
Set<SelectionKey> selectionKeys = selector.selectedKeys();

Iterator iterator = selectionKeys.iterator();

while (iterator.hasNext()) {
/**
* selectionKey实例
*/
SelectionKey selectionKey = (SelectionKey) iterator.next();

/**
* **移除Set中的当前selectionKey**
*/
iterator.remove();

/**
* 7. 根据就绪状态,调用对应方法处理业务逻辑
*/
/**
* 如果是 接入事件
*/
if (selectionKey.isAcceptable()) {
acceptHandler(serverSocketChannel, selector);
}

/**
* 如果是 可读事件
*/
if (selectionKey.isReadable()) {
readHandler(selectionKey, selector);
}
}
}
}

/**
* 接入事件处理器
*/
private void acceptHandler(ServerSocketChannel serverSocketChannel,
Selector selector)
throws IOException {
/**
* 如果要是接入事件,创建socketChannel
*/
SocketChannel socketChannel = serverSocketChannel.accept();

/**
* 将socketChannel设置为非阻塞工作模式
*/
socketChannel.configureBlocking(false);

/**
* 将channel注册到selector上,监听 可读事件
*/
socketChannel.register(selector, SelectionKey.OP_READ);

/**
* 回复客户端提示信息
*/
socketChannel.write(Charset.forName("UTF-8")
.encode("你与聊天室里其他人都不是朋友关系,请注意隐私安全"));
}

/**
* 可读事件处理器
*/
private void readHandler(SelectionKey selectionKey, Selector selector)
throws IOException {
/**
* 要从 selectionKey 中获取到已经就绪的channel
*/
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();

/**
* 创建buffer
*/
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

/**
* 循环读取客户端请求信息
*/
String request = "";
while (socketChannel.read(byteBuffer) > 0) {
/**
* 切换buffer为读模式
*/
byteBuffer.flip();

/**
* 读取buffer中的内容
*/
request += Charset.forName("UTF-8").decode(byteBuffer);
}

/**
* 将channel再次注册到selector上,监听他的可读事件
*/
//socketChannel.register(selector, SelectionKey.OP_READ);

/**
* 将客户端发送的请求信息 广播给其他客户端
*/
if (request.length() > 0) {
// 广播给其他客户端
broadCast(selector, socketChannel, request);
}
}

/**
* 广播给其他客户端
*/
private void broadCast(Selector selector,
SocketChannel sourceChannel, String request) {
/**
* 获取到所有已接入的客户端channel
*/
Set<SelectionKey> selectionKeySet = selector.keys();

/**
* 循环向所有channel广播信息
*/
for(SelectionKey selectionKey:selectionKeySet){
Channel targetChannel = selectionKey.channel();
// 剔除发消息的客户端
if (targetChannel instanceof SocketChannel
&& targetChannel != sourceChannel) {
try {
// 将信息发送到targetChannel客户端
((SocketChannel) targetChannel).write(
Charset.forName("UTF-8").encode(request));
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

/**
* 主方法
* @param args
*/
public static void main(String[] args) throws IOException {
new NioServer().start();
}
}

NioClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Scanner;

/**
* NIO客户端
*/
public class NioClient {

/**
* 启动
*/
public void start(String nickname) throws IOException {
/**
* 连接服务器端
*/
SocketChannel socketChannel = SocketChannel.open(
new InetSocketAddress("127.0.0.1", 8000));

/**
* 接收服务器端响应
*/
// 新开线程,专门负责来接收服务器端的响应数据
// selector , socketChannel , 注册
Selector selector = Selector.open();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
new Thread(new NioClientHandler(selector)).start();

/**
* 向服务器端发送数据
*/
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String request = scanner.nextLine();
if (request != null && request.length() > 0) {
socketChannel.write(
Charset.forName("UTF-8")
.encode(nickname + " : " + request));
}
}

}

public static void main(String[] args) throws IOException {
//new NioClient().start();
}
}

NioClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;

/**
* 客户端线程类,专门接收服务器端响应信息
*/
public class NioClientHandler implements Runnable {
private Selector selector;

public NioClientHandler(Selector selector) {
this.selector = selector;
}

@Override
public void run() {

try {
for (;;) {
int readyChannels = selector.select();

if (readyChannels == 0) continue;

/**
* 获取可用channel的集合
*/
Set<SelectionKey> selectionKeys = selector.selectedKeys();

Iterator iterator = selectionKeys.iterator();

while (iterator.hasNext()) {
/**
* selectionKey实例
*/
SelectionKey selectionKey = (SelectionKey) iterator.next();

/**
* **移除Set中的当前selectionKey**
*/
iterator.remove();

/**
* 7. 根据就绪状态,调用对应方法处理业务逻辑
*/

/**
* 如果是 可读事件
*/
if (selectionKey.isReadable()) {
readHandler(selectionKey, selector);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* 可读事件处理器
*/
private void readHandler(SelectionKey selectionKey, Selector selector)
throws IOException {
/**
* 要从 selectionKey 中获取到已经就绪的channel
*/
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();

/**
* 创建buffer
*/
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

/**
* 循环读取服务器端响应信息
*/
String response = "";
while (socketChannel.read(byteBuffer) > 0) {
/**
* 切换buffer为读模式
*/
byteBuffer.flip();

/**
* 读取buffer中的内容
*/
response += Charset.forName("UTF-8").decode(byteBuffer);
}

/**
* 将channel再次注册到selector上,监听他的可读事件
*/
//socketChannel.register(selector, SelectionKey.OP_READ);

/**
* 将服务器端响应信息打印到本地
*/
if (response.length() > 0) {
System.out.println(response);
}
}
}

AClient

1
2
3
4
5
6
7
8
9
import java.io.IOException;

public class AClient {

public static void main(String[] args)
throws IOException {
new NioClient().start("AClient");
}
}

BClient

1
2
3
4
5
6
7
8
9
import java.io.IOException;

public class BClient {

public static void main(String[] args)
throws IOException {
new NioClient().start("BClient");
}
}

CClient

1
2
3
4
5
6
7
8
9
import java.io.IOException;

public class CClient {

public static void main(String[] args)
throws IOException {
new NioClient().start("CClient");
}
}
0%