f10@t's blog

Java网络编程模型复习

字数统计: 7.2k阅读时长: 29 min
2023/01/04

失踪人口回归...,2022年几乎没怎么好好写博客,一方面原因是因为科研,另一方面...懒了懒了我错了。年底中招奥密克戎,最近也是才缓过劲来,真是坎坷的一年。本科之前其实接触过C语言的网络编程,带课的李金库老师讲的很好,记得当时特别地对I/O多路复用(select)留下了很深的印象。今天复习一下Java中网络编程的相关理论和代码,包括最基础的Sokcet使用以及三种I/O模型。(ps:我gitee的博客由于部分文章没法过审核,因此gitee博客以后可能就不会更新了,我仍然使用github博客为主。

db04ec35da1f9962545649576098426a

什么是Socket?Java中如何使用?

老生常谈的问题了,什么是Socket?也即套接字?

首先我们要了解一下,什么是进程间通信(InterProcess Communication, IPC),操作系统中时时刻刻都有大量的进程在运行,每一个进程实体都由数据、程序代码和进程控制块(PCB)组成,每个进程都在自己的地址范围内运行,操作系统通过PCB控制进程,进程根据程序代码数据进行处理计算。当两个进程之间需要通信的时候,操作系统为我们提供了很多途径,如UNIX中的pipe,下图是一个父子进程使用的普通管道,此外还有对等进程使用的命名管道。

Unix中的普通管道

上述的计算和通信过程都是在一个机器内部发生的,因此我们很容易就会想到,那不同机器上的不同进程如何通信呢?ok,这就是网络编程干的事情。

操作系统为我们提供了Socket来完成这件事情。提供这个干什么呢?具体来说,既然是网络编程,那我们必定是要使用网络协议栈的,也即TCP/IP四层模型:应用层、传输层、网络层和网络接口层,Socket为作为应用层开发者的我们,提供了一种简单的使用网络协议栈的方法。如man手册中就一言以蔽之:

The BSD compatible sockets are the uniform interface between the user process and the network protocol stacks in the kernel.

BSD(Berkeley Software Distribution, 伯克利软件套件) sockets是用户进程和内核中网络协议栈之间的通用接口。ps:这东西最早是伯克利大学开发的

Socket是为了解决网络编程中的什么问题?

我们深入探讨一下,Socket的出发点是为了解决什么问题。同样,我们先看看单机中进程通信的一个最基本的问题。

单机中,不同的进程运行在不同的地址空间内,当发生通信时,如何寻找另一个进程呢?

  • 通过PID,即process identifier,通过这唯一的进程标识符,我们就可以唯一地确定一个进程。可以通过命令ulimit -n和查看/proc/sys/kernel/pid_max来知道pid的实际以及理论上限。

那么自然而然的,问题1:网络编程中,本地的进程如何寻找另一个机器上的进程呢?也是使用PID吗?那万一这俩进程号一样,怎么确定唯一性呢?那用IP?关键这个机器上进程很多啊,你说个127.0.0.1我也不知道是哪个进程啊?

所以TCP/IP协议中才设计了传输层协议端口这个东西,我们使用一个端口号唯一确定一个进程,结合IP地址,我们就可以唯一的确定一台主机上的一个进程了,我们用一个五元组在全局中唯一表示一个网间进程通信:(传输层协议,本地IP,本地端口,远程IP,远程端口)

如下图是TCP和UDP两种不同传输层协议的报文格式,这就是为什么报文中要有端口号。由于端口号是一个16bit的的字段,因此,共有2^16=65535个端口号可供我们使用,其中0-1023属于保留的端口(well-know port),1-255给一些常见服务如如HTTP 80、HTTPS 443、FTP 21、DNS 53、SMTP 25等等,256-1023保留给如路由等协议。而1024-4999可以作为任意客户的端口,5000-65535这个庞大的空间作为用户的服务器端口(毕竟连接服务器的人多嘛)。

TCP、UDP报文格式

除了问题1,我们还面临着如下三个问题:

  1. 如何连接网络剧哦协议栈
  2. 不同网络协议(如IPX/SPX)如何识别
  3. 不同应用的数据传输可靠性、速率等要求不同,如何实现有选择地使用网络协议栈提供的不同服务(即TCP or UDP)

那上述问题,socket都帮我们解决了,socket作为操作系统本身的系统调用,为我们提供了使用网络协议栈的方法;socket创建时int socket(int domain, int type, int protocol);,为我们提供了不同网络协议的选择方法(如AF_IPX字段代表IPX协议、AF_DECnet代表DECet协议等等);socket也可以让我们选择不同的传输层协议(SOCK_STREAM即TCP、SOCK_DGRAM即UDP)。

Java中的Socket

TCP

我们先看看在C语言中,也即UNIX系统中是如何创建并使用一个TCP类型的socket的。具体以C/S架构为例,如图所示:

TCP网络编程流程

根据角色分一下类,我们可以看到,客户端程序和服务端程序需要执行的函数是有区别的:

  • 客户端不需要被动Listen
  • 不需要绑定本地端口(即随机从1024-65535中选取一个),
  • 不需要Accept远程机器的连接。

因此,面向对象嘛,Java对这两种不同职责的Socket进行了划分,分别为java.net.Socketjava.net.ServerSocket,这俩的区别其实就是上面提到的三点。二者关系如下:

Java中TCP网络编程常用类

服务端代码

我们写一个简单的服务端,他将接收客户端的请求ID,并为其返回对应的数据。我们要使用到上面提到的java.net.ServerSocket,他的生命周期如下:

ServerSocket生命周期

下面我们将实现一个服务端程序,他接收客户端发来的个位数,并返回这个个位数的英文:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Arrays;

/**
* TCP 服务端 - 同步阻塞模式
*
* @author lzwgiter
* @since 2023/01/03
*/
public class BIOTCPServer {

private static final ArrayList<String> DICT = new ArrayList<>(
Arrays.asList("Zero", "One", "Two", "Three", "Four",
"Five", "Six", "Seven", "Eight", "Nine")
);

public static void main(String[] args) {
// 这里默认完成了bind的操作,你也可以通过无参构造,然后手动bind一个SocketAddress
try (ServerSocket serverSocket = new ServerSocket(6666)) {
// 一直运行
while (true) {
char[] data = new char[1];
// 阻塞等待连接
Socket requestSocket = serverSocket.accept();
// 收到请求
System.out.println("# 客户端连接:" + requestSocket.getRemoteSocketAddress() + " #");
// 接收数据并处理
Reader reader = new InputStreamReader(requestSocket.getInputStream());
reader.read(data);
// 返回数据
Writer out = new OutputStreamWriter(requestSocket.getOutputStream());
Thread.sleep(1000);
out.write(DICT.get(Integer.parseInt(String.valueOf(data))));
out.flush();
// 关闭连接
requestSocket.close();
}
} catch (IOException ex) {
System.err.println(ex);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

客户端代码

对应我们的服务端,客户端代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.io.*;
import java.net.Socket;
import java.net.UnknownHostException;

/**
* TCP 客户端 - 同步阻塞模式
*
* @author lzwgiter
* @since 2023/01/03
*/
public class BIOClient {
public static void main(String[] args) {
// 创建连接(注意,这里会直接connect,你也可以通过无参构造,手动connect一个SocketAddress
try (Socket client = new Socket("127.0.0.1", 6666);) {
System.out.println("本地的Socket信息:" + client.getLocalSocketAddress());
System.out.println("远程的Socket信息:" + client.getRemoteSocketAddress());
Writer writer = new OutputStreamWriter(client.getOutputStream());
char[] wordToInquire = new char[]{'2'};
writer.write(wordToInquire);
writer.flush();

// 读取数据
char[] result = new char[10];
Reader reader = new InputStreamReader(client.getInputStream());
reader.read(result);
System.out.println(String.valueOf(wordToInquire) + " in English is " + String.valueOf(result).trim());
} catch (UnknownHostException ex) {
System.err.println("主机名无法解析" + ex);
} catch (IOException ex) {
System.err.println(ex);
}
}
}

运行结果

最后我们的运行结果如下图所示:

BIO TCP运行结果

在这个过程中我重启了两次客户端程序,可以看到:

  • 每次的本地客户端端口都是随机选取的。
  • 客户端程序结束了,但服务端会一直阻塞accept()等待新的连接。

UDP

相较于TCP需要三次握手建立连接的过程,UDP协议不需要该过程,当然也没提供TCP那些什么滑动窗口、差错控制等等,毕竟你看他报文那么简单嘛。

我们还是看看C语言中是什么样子的,具体如下:

UDP报文格式

仍然是分开看一下Java中客户端和服务器端要做的事情:

服务端:

  • 创建UDP类型套接字并绑定IP、端口
  • 阻塞等待接收数据报文
  • 接收后处理并返回
  • 关闭套接字

客户端:

  • 创建UDP类型套接字
  • 向服务端发送数据报文
  • 接收数据包
  • 关闭套接字

为了实现上述功能,Java中定义了java.net.DatagramPacketjava.net.DatagramSocket来负责上述功能,我们通过DatagramPacket来封装和解封数据,用DatagramSocket来发送DatagramPacket数据包。

Java中UDP网络编程常用类

服务端代码

这里实现的功能与上一章节相同,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;

/**
* UDP 服务端 - 同步阻塞模式
*
* @author lzwgiter
* @since 2023/01/03
*/
public class BIOUDPServer {

private static final ArrayList<String> DICT = new ArrayList<>(
Arrays.asList("Zero", "One", "Two", "Three", "Four",
"Five", "Six", "Seven", "Eight", "Nine")
);

public static void main(String[] args) {
try (DatagramSocket server = new DatagramSocket(6667)) {
while (true) {
DatagramPacket request = new DatagramPacket(new byte[1], 1);
// 接收数据包
server.receive(request);
// 收到请求
System.out.println("# 客户端数据报:" + request.getAddress() + ":" + request.getPort() + "#");
int index = Integer.parseInt(new String(request.getData()));
byte[] data = DICT.get(index).getBytes(StandardCharsets.UTF_8);
SocketAddress clientAddress = new InetSocketAddress(request.getAddress(), request.getPort());
DatagramPacket response = new DatagramPacket(data, data.length, clientAddress);
server.send(response);
}
} catch (IOException e) {
System.err.println(e);
}
}
}

客户端代码

注意,在UDP下创建客户端的Socket时,和TCP是有区别的,TCP中我们使用:new Socket("127.0.0.1", 6666);,这个端口指的是远程的端口。而在UDP客户端socket初始化时,new DatagramSocket(0),这里填写的可不是远程的,而是本地的,0代表随机选择一个。客户端代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.io.IOException;
import java.net.*;
import java.nio.charset.StandardCharsets;

/**
* UDP 客户端 - 同步阻塞模式
*
* @author lzwgiter
* @since 2023/01/03
*/
public class BIOUDPClient {
public static void main(String[] args) {
// 设置绑定的本地端口,0代表随机选择
try (DatagramSocket client = new DatagramSocket(0)) {
// 设置超时(单位:毫秒)
client.setSoTimeout(10000);
// 创建数据报
SocketAddress remote = new InetSocketAddress("127.0.0.1", 6667);
String wordToInquire = "1";
byte[] dataToSend = wordToInquire.getBytes(StandardCharsets.UTF_8);
DatagramPacket requestPacket = new DatagramPacket(dataToSend, dataToSend.length, remote);
// 构造接收的报文
byte[] data = new byte[10];
DatagramPacket recvedPacket = new DatagramPacket(data, data.length);
client.send(requestPacket);
client.receive(recvedPacket);
System.out.println(new String(recvedPacket.getData()).trim());
} catch (IOException e) {
System.err.println(e);
}
}
}

运行结果

BIO UDP运行结果

在这个过程中我重启了两次客户端程序,可以看到:

  • 每次的本地客户端端口都是随机选取的(左边服务器端显示的日志)。
  • 客户端程序结束了,但服务端会一直阻塞receive()等待新的数据包。

Java中三种I/O模型及其实现

知道基本的Socket、TCP/UDP相关java的API使用方式后,我们就要深究一下,有哪些IO模式了,通常上,我们使用两个指标来划分,即:同步or异步阻塞or非阻塞。那同步异步区别在哪呢?它们的区别在消息通信机制上,同步为Synchronous Communication,而异步为Aynchronous Communication,具体来说:

  • 同步模式下,我们调用一个方法,在这个方法完成前,我们需要一直主动等待它(这个主动的理解很关键),没得到结果前我们会一直等着。
  • 异步模式下,同样一个方法,我们调用后将直接返回,在它完成前,我们不需要等待他,而是由它主动通知我们,结果好了,可以使用了。通知方法如信号、回调函数等。

ok,那同步异步的区别实际上就是看是我们主动等结果,还是结果主动通知我们了。那阻塞和非阻塞呢?

  • 阻塞IO下,执行阻塞的系统调用(如前面我们看到的receive)当前线程会被挂起,等待系统调用的完成
  • 非阻塞IO下,与阻塞相反,非阻塞的系统调用会直接返回一个瞬时的结果,无需等待系统调用的完成。然后通过轮询的方式去判断调用是否完成。典型的例子如select系统调用,它可以轮询检测活动的socket而无需等待,有I/O可用时将直接返回。

所以根据同步异步的消息通信机制,以及阻塞非阻塞的IO方式,我们可以排列组合得到如下集中IO模型:

  • 同步阻塞
  • 同步非阻塞
  • 异步阻塞
  • 异步非阻塞(其实没这个说法,直接就叫异步)

哎为什么没有异步阻塞呢?因为前面我们说异步模式下,方法会直接返回无需等待,所以异步一定是非阻塞的。但是反过来可就不对了(如select同步非阻塞)。

所以从大的方面上来说,我们有三种IO模型。实际上,Unix中有5种IO模型,划分更为细致。分别为:

  • Blocking IO(同步阻塞),可以看到流程其实和我们UDP程序逻辑是一样的,也即Java原始的BIO
阻塞模式
  • NonBlocking IO (同步非阻塞,轮询)
非阻塞模式
  • IO Multiplexing(如select、poll)(同步阻塞,但相较于Blocking IO,能监听更多的socket) 。这也是Java的NIO原理,这个有大量的应用,比如你Nginx。
IO多路复用
  • Signal-Driven(同步非阻塞,当IO操作准备好时会通过信号通知)
信号驱动
  • Asynchronous IO(异步,当IO操作完成时会通过信号通知)。这也是Java的AIO原理
异步

注意,前四种都是同步的,这是因为他们都需要调用receive等方法将数据从内核空间复制到用户空间,都会导致当前线程挂起。而只有异步的IO模型,是真正不需要这个操作的(上图中没有出现如recvform的系统阻塞调用)。这五种模型的比较如下图所示:

UNIX中5种IO模型比较

BIO

BIO(Blocking IO),即IO是阻塞的状态,其实上一章节中我们的BIOServer就是阻塞的,即服务器会阻塞在accept/receive,等待客户端的连接,这个时候你的程序是干不了其他事情的。BIO是一种同步阻塞的机制。具体详情前面的代码、示图已经说的很清楚了。Java中主要使用的就是java.net.*下面的类。

NIO

JDK 1.4中java引入了NIO(New IO),主要是使用了Selector实现了IO多路复用。其实在Linux中,该机制是通过select系统调用来实现的。

我写过一个简单的C语言的聊天室项目,感兴趣的可以看看,该项目就是用的select机制来实现的。当然也有更好的epoll可以替换。NIO本质上是一种利用了I/O多路复用技术的、同步非阻塞的机制。但是相较于BIO,可以处理更多的Socket。

使用方法

我们以TCP为例。Java中主要使用java.nio.*下的类来实现,常用的类如:java.nio.channels.Selectorjava.nio..channels.ServerSocketChanneljava.nio.channels.SocketChannel。至于UDP,将上面的ServerSocketChannel和改为使用DatagramChannel即可,他们的关系如下:

NIO常用类关系图

用一张图表示的话,NIO的结构是这样的:

NIO结构

下面我们就使用上述类写一个服务端和客户端,学习一下使用方法。

服务端

对于服务端,我们主要使用java.nio.Selector来监听多个socket,此时我们不需要手动for循环去判断哪一个socket可用,而是由操作系统通知JVM哪个socket可用读入或写入

注意,在select时,Java会调用java.nio.channels.spi.SelectorProvider这个单例类的provide()方法来返回操作系统的具体实现。且当select时,当前线程会阻塞,等待有IO可用时、操作系统的通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;
import java.util.Iterator;
import java.util.Set;

/**
* Java NIO
*
* @author lzwgiter
* @since 2022/12/30
*/
public class NIOServer {

public static void main(String[] args) {
try (Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {
// 设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
// 绑定端口
serverSocketChannel.bind(new InetSocketAddress(6666));
// 将serverSocketChannel的accept交由selector来处理
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("端口注册完成, 等待连接中......");

// 阻塞在select方法
while (selector.select() > 0) {
// 我们使用SelectionKey来对可用的socket进行遍历
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 处理每一个事件
SocketChannel sc;
Iterator<SelectionKey> iter = selectionKeys.iterator();

// 遍历每一个可用的socket
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isAcceptable()) {
ServerSocketChannel nssc = (ServerSocketChannel) key.channel();
sc = nssc.accept();
// 设为非阻塞
sc.configureBlocking(false);
// 注册并分配缓存区
ByteBuffer echoBuffer = ByteBuffer.allocate(100);
sc.register(selector, SelectionKey.OP_READ, echoBuffer);
System.out.println(LocalDateTime.now() + " - ** 新的连接 ** " + sc);
} else if (key.isReadable()) {
sc = (SocketChannel) key.channel();
ByteBuffer echoBuffer = (ByteBuffer) key.attachment();
// 读取数据
echoBuffer.clear();
int len = sc.read(echoBuffer);
if (len == -1) {
break;
}
if (len > 0) {
echoBuffer.clear();
String raw = new String(echoBuffer.array()).trim();
System.out.println(LocalDateTime.now() + " - ## 接收来自 " + sc + "的数据:" + raw + " ##");
// 处理消息
String response = "MSG: {" + raw + "} is accepted.";
echoBuffer.put(response.getBytes());
echoBuffer.flip();
sc.write(echoBuffer);
System.out.println(LocalDateTime.now() + " - ## 发送给 " + sc + "的数据:" +
new String(echoBuffer.array()).trim() + " ##");
}
// 关闭客户端连接
sc.close();
System.out.println(sc + "连接结束");
System.out.println("===========================");
}
// 从遍历集合中删除
iter.remove();
}
}
System.out.println("this is a simple test.");
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

这里需要澄清一个小概念,我学习时就有疑惑。单线程下,若没有可用的I/O操作,那不就阻塞在select函数吗?啥都干不了啊?

实际上阻塞、非阻塞都是对于I/O操作来说的,由于I/O多路复用机制提供了单线程下操作多个I/O的方法,因此我们当前线程不会阻塞在单个I/O中。比如现在有两个个客户端连接进来了,ok我们处理了1号I/O,由于1没有新的消息了,因此我们会去处理2号I/O,而并没有阻塞在1号I/O上。因此说它是非阻塞的。

客户端

客户端代码要简单一些。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;

/**
* Java NIO客户端
*
* @author lzwgiter
* @since 2022/12/30
*/
public class NIOClient {
public static void main(String[] args) {
try (SocketChannel client = SocketChannel.open(new InetSocketAddress(6666))) {
client.configureBlocking(false);
String msg = "this is a simple test.";
System.out.println(LocalDateTime.now() + " - ##发送数据: " + msg + " ##");
ByteBuffer buffer = ByteBuffer.allocate(100);
buffer.put(msg.getBytes());
buffer.flip();
client.write(buffer);
buffer.clear();

while (true) {
int len = client.read(buffer);
if (len > 0) {
System.out.println(LocalDateTime.now() + " - ##接收数据: " + new String(buffer.array()).trim() + " ##");
client.close();
System.out.println(LocalDateTime.now() + " - ##连接关闭: " + " ##");
break;
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

运行结果

NIO运行结果

AIO

JDK 1.7引入了NIO2.0,也即AIO(Asynchronous IO),他为我们提供了异步的可能性,如异步的文件通道、异步的套接字通道,他的底层在Windows中是通过IOCP(I/O Completion Port)来实现的,Linux中则是epollAIO本质上是一种异步的机制。

那么AIO和BIO的区别在哪里呢?其实很好理解,在上一节NIO代码可以看出,操作系统通知我们的时刻,是该IO就绪的时刻,即可读或可写,在此之前我们是阻塞的。回顾一下select的图:

IO多路复用

而对于真正的异步,我们是不需要主动等待的,而是立即返回,并由操作系统通知我们。对于AIO,操作系统通知我们的时刻,是该IO已经完成的时刻,即读完了,写完了。回顾一下异步的图。

异步

使用方法

我们以TCP为例。Java中主要使用java.nio.*下的类来实现,常用的类如:java.nio.channels.AsynchronousSocketChanneljava.nio.channels.AsynchronousServerSocketChanneljava.nio.channels.CompletionHandler。至于UDP,将上面的AsynchronousSocketChannel和改为使用AsynchronousDatagramChanel即可,他们的关系如下:

AIO常用类关系图

注意到,在AsynchronoutSocketChannel中,acceptconnectreadwrite方法有都提供了使用CompletionHandler<Integer, A>回调机制,这也是体现异步的地方,这四个函数的调用会立即返回,不会阻塞,且将这四个函数的具体执行交给JVM默认线程池的某个线程在后台执行,当操作完成后,该线程会再执行传入的回调函数来通知我们

这样的回调机制和方法声明,在NIO中是没有的,也是最主要的一个区别。我们可以看一下这个关键的java.nio.channels.CompletionHandler是怎么定义的:

CompletionHandler接口定义

可以看到,该接口声明了两个方法分别对应IO操作成功失败两种情况,并传入IO操作的结果或异常至我们的回调函数中进行进一步的处理。

服务端

下面我们就看一下使用AIO如何编写服务端和客户端。大体来说,与BIO编写方式类似,但是我们可以通过回调的方式实现异步操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.time.LocalDateTime;


/**
* @author lzwgiter
* @since 2023/01/06
*/
public class AIOTCPServer {
/**
* 异步监听socket,在静态代码块中初始化
*/
private static AsynchronousServerSocketChannel serverSocketChannel = null;

static {
try {
serverSocketChannel = AsynchronousServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(6666));
} catch (IOException e) {
System.err.println(e);
}
}

/**
* 读取数据的回调函数
*/
private static final CompletionHandler<Integer, ByteBuffer> READ_HANDLER = new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer readResultBuffer) {
// 数据读取完成(这里注意,不需要我们去read操作,操作系统已经帮我们把数据拷贝到用户空间了,即readResultBuffer变量中
readResultBuffer.flip();
String rawData = new String(readResultBuffer.array()).trim();
System.out.println(LocalDateTime.now() + " - " + Thread.currentThread().getName() +
" - ## 接收数据:" + rawData + " ##");
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
};

/**
* 接收请求的回调函数
*/
private static final CompletionHandler<AsynchronousSocketChannel, Void> ACCEPT_HANDLER =
new CompletionHandler<AsynchronousSocketChannel, Void>() {
@Override
public void completed(AsynchronousSocketChannel channel, Void obj) {
try {
System.out.println(LocalDateTime.now() + " - " + Thread.currentThread().getName() +
" - ## 客户端连接成功:" + channel.getRemoteAddress() + " ##");
// 异步读取数据,立即返回不阻塞
ByteBuffer buffer = ByteBuffer.allocate(100);
channel.read(buffer, buffer, READ_HANDLER);

// 再次接受其他的客户端链接
serverSocketChannel.accept(null, this);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void failed(Throwable exc, Void obj) {
System.err.println("连接客户端失败!err: " + exc);
}
};

public static void main(String[] args) {
System.out.println("端口绑定完成,等待连接......");
// 等待连接,立即返回,不阻塞
serverSocketChannel.accept(null, ACCEPT_HANDLER);
// 验证异步
while (true) {
try {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName() + "闲的一匹");
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}
}

在上面的代码中,static静态代码块以及main函数处我们写了简单的逻辑,即初始化socket、绑定端口等等。

需要注意的是,从accept函数开始我们的使用方法就有区别了。在开头我定义了两个常量(ps:实际开发肯定不这么干,肯定是写类)READ_HANDLERACCEPT_HANDLER,一个是接收到连接时的回调函数,另一个是读取数据完成时的回调函数。二者均实现了前面提到的CompletionHandler接口,分别定义了complete和failed函数的内容。

而这两个回调函数也正是我们实现异步操作的核心了,当JVM后台线程池accept、read操作后,就会对应地、执行我们的回调函数。这就对应了我们前面理论部分中提到的:操作完成时(如accept完成、读取完成、写入完成等)由操作系统通知我们,而不是我们主动阻塞等待。

客户端

对应的实现客户端代码如下。同上,我们需要定义一个connect函数的回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;

/**
* @author lzwgiter
* @since 2023/01/06
*/
public class AIOTCPClient {
public static void main(String[] args) {
try (AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open()) {
InetSocketAddress remote = new InetSocketAddress("127.0.0.1", 6666);
// 用于计数,避免客户端退出
CountDownLatch end = new CountDownLatch(1);

ByteBuffer buffer = ByteBuffer.allocate(100);

socketChannel.connect(remote, buffer, new CompletionHandler<Void, ByteBuffer>() {
@Override
public void completed(Void result, ByteBuffer channel) {
try {
System.out.println("客户端连接成功");
String msg = "this is a simple test.";
channel.put(msg.getBytes());
channel.flip();
socketChannel.write(channel);
System.out.println("发送数据: " + msg);
socketChannel.close();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
end.countDown();
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
end.countDown();
}
});

// 等待直到连接关闭
end.await();

} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}

运行结果

运行结果有几个意思的点,结合理论,重点强调一下加深印象。

AIO运行结果
  • 可以看到,能打印出闲的一批就已经说明accept不阻塞了,并没有"卡"在accpet那里使得main线程挂起。

  • 观察一下,在acceptread操作的回调函数中,是谁在执行操作?从图中可以看到Thread-x的名字。那就说明,是从属于JVM默认线程池中的线程来做的,这和理论保持一致。

小结

看完基本的三种IO模型后,一个很自然而然的问题就是,我们应当在什么场景下使用呢?

  • BIO:BIO方式适用于连接数量少且固定的场景,这种方式对服务器资源要求比较高, JDK1.4之前唯一的选择,程序直观简单易理解。
  • NIO:适用于连接数目多且业务比较轻。JDK1.4开始支持。
  • AIO:适用于连接数目多且连接比较长(业务重操作),需要操作系统充分参与并发操作。JDK1.7开始支持。

如果说你的服务器资源充足、且客户端数量少,那BIO就可以了,这是足够且最简单的方法。

而如果连接数比较多了,比如10万个连接,就算是多线程+BIO也没法处理(线程切换开销、线程池资源有限)。这时候就可以考虑使用利用selector机制的NIO。值得一提的是,你如果自己用JDK原生的NIO类去写代码,说实话蛮麻烦的,我前面那个都是小儿科了,正儿八经的需要熟悉Reactor模式(单一线程监听连接、多线程处理不同连接),如select后的包装成FutureTask扔给线程池,那你还得懂多线程编程、并发注意事项等等。

Netty是一个封装了Java NIO的API的框架,比如Hadoop的RPC框架就是基于Netty实现的,直接用Netty去实现NIO相较于你自己从头搞要好很多(当然,了解原理是很重要的)。而AIO,当你需要异步需求的时候可以使用。

总结

总结一下,本篇复习了以下内容:

  • 关于Socket
  • Java中如何使用基本的socket实现基本同步阻塞的TCP、UDP编程
  • Java中的三种IO模型(BIO、NIO、AIO)、使用方法及其应用场景。

参考文献

CATALOG
  1. 1. 什么是Socket?Java中如何使用?
    1. 1.1. Socket是为了解决网络编程中的什么问题?
    2. 1.2. Java中的Socket
      1. 1.2.1. TCP
        1. 1.2.1.1. 服务端代码
        2. 1.2.1.2. 客户端代码
        3. 1.2.1.3. 运行结果
      2. 1.2.2. UDP
        1. 1.2.2.1. 服务端代码
        2. 1.2.2.2. 客户端代码
        3. 1.2.2.3. 运行结果
  2. 2. Java中三种I/O模型及其实现
    1. 2.1. BIO
    2. 2.2. NIO
      1. 2.2.1. 使用方法
        1. 2.2.1.1. 服务端
        2. 2.2.1.2. 客户端
      2. 2.2.2. 运行结果
    3. 2.3. AIO
      1. 2.3.1. 使用方法
        1. 2.3.1.1. 服务端
        2. 2.3.1.2. 客户端
      2. 2.3.2. 运行结果
    4. 2.4. 小结
  3. 3. 总结
  4. 4. 参考文献