Netty-02-NIO实战及原理

Netty-02-NIO实战及原理

前言

  • Java NIO 全称 java non-blocking IO,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了一系列改进的输入/输出的新特性,被统称为 NIO(即 New IO),是同步非阻塞

  • NIO 相关类都被放在 java.nio 包及子包下,并且对原 java.io 包中的很多类进行改写。

  • NIO 有三大核心部分:Channel(通道)Buffer(缓冲区), Selector(选择器)

  • NIO是 面向冲区 ,或者面向 编程的。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络

  • Java NIO的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。

  • 通俗理解:NIO是可以做到用一个线程来处理多个操作的。假设有10000个请求过来,根据实际情况,可以分配50或者100个线程来处理。不像之前的阻塞IO那样,非得分配10000个。

  • HTTP2.0使用了多路复用的技术,做到同一个连接并发处理多个请求,而且并发请求的数量比HTTP1.1大了好几个数量级。

1. NIO三大核心概述

mark

Buffer:

  • 缓存数组,就是一个内存块,底层用数组实现
  • Channel进行数据的读写。
  • 数据的读取写入是通过Buffer, 这个和BIO 一样, 而BIO 中要么是输入流,或者是输出流, 不能双向,但是NIOBuffer 是可以读也可以写, 需要 flip 方法切换。

Channel:

  • 通信通道,每个客户端连接都会建立一个Channel通道
  • 客户端直接与Channel进行通信,当客户端发送消息时,消息就流通到Channel里面,本地程序需要将Channel里面的数据存放在Buffer里面,才可以查看;当本地需要发送消息时,先把消息存在Buffer里面,再将Buffer里面的数据放入Channel,数据就流通到了客户端
  • 总而言之:Buffer就是本地程序与Channel数据交换的一个中间媒介。

Selector

mark

  • NIO之所以是非阻塞的,关键在于它一个线程可以同时处理多个客户端的通信。而Selector就是它一个线程如何处理多个客户端通信的关键,一个Selector就对应一个线程
  • 首先在创建与客户端连接的Channel时,应该调用 Channel.register()方法,将Channel注册到一个Selector上面。调用该方法后,会返回一个SelectionKey对象,该对象与Channel是一一对应的。而Selector则通过管理SelectionKey的集合间接的去管理各个Channel。示例图如下:

mark

Selector具体是如何管理这么多个通信的呢?这就引出了事件

事件、以及NIO的工作流程介绍

  • 事件:当将Channel绑定到Selector上面时,必须同时为该Channel声明一个监听该Channel的事件(由Channel和该Channel的事件一起组成了SelectionKey),并将SelectionKey加入到SelectorSet集合中去
  • 当有客户端建立连接或者进行通信,会在对应的各个Channel中产生不同的事件。
  • Selector会一直监听所有的事件,当他监听到某个SelectionKey中有事件产生时,会将所有产生事件的SelectionKey统一加入到一个集合中去
  • 而我们则需要获取到这个集合,首先对集合中的各个SelectionKey进行判断,判断它产生的是什么事件,再根据不同的事件进行不同的处理。
  • 在操作这个SelectionKey集合的时候,其实我们就是在一个线程里面对几个不同客户端的连接进行操作。具体的关系图如下:

mark

2. Buffer

  • 缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个容器对象(含数组),
  • 该对象提供了一组方法,可以更轻松地使用内存块,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。
  • Channel提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由Buffer

2.1 Buffer类介绍

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public abstract class Buffer {

/**
* The characteristics of Spliterators that traverse and split elements
* maintained in Buffers.
*/
static final int SPLITERATOR_CHARACTERISTICS =
Spliterator.SIZED | Spliterator.SUBSIZED | Spliterator.ORDERED;

// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;

// Used only by direct buffers
// NOTE: hoisted here for speed in JNI GetDirectBufferAddress
long address;

mark

  • 基类是Buffer抽象类

  • 基类派生出基于基本数据类型的7个xxxBuffer 抽象类,没有boolean相关的buffer类。

  • 除了ByteBuffer外,每个基本数据的抽象类 xxxBuffer 类下面都派生出转向 ByteBuffer 的类 ByteBufferXxxAsBufferLByteBufferAsXxxBufferB实现类;以及 DirectXxxBufferUDirectXxxBufferSHeapXxxBuffer==(具体实例对象类)==这五个类。

  • 就只有抽象类CharBuffer 派生出了第六个类StringCharBuffer

  • ByteBuffer只派生出了 HeapByteBufferMappedByteBufferR 两个类

mark

2.2 Buffer 属性

属性 描述
Capacity 容量,即可以容纳的最大数据量;在缓冲区创建时被设定并且不能改变
Limit 表示缓冲区的当前终点,不能对缓冲区超过极限的位置进行读写操作。且极限是可以修改的
Position 位置,下一个要被读或写的元素的索引,每次读写缓冲区数据时都会改变改值,为下次读写作准备
Mark 标记 ,一般不会主动修改,在flip()被调用后,mark就作废了。

mark <= position <= limit <= capacity

2.3 简单使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class BasicBuffer {
public static void main(String[] args) {

// 1. 举例说明Buffer的使用
// 创建一个IntBuffer,大小为5,即存放5个int类型
IntBuffer intBuffer = IntBuffer.allocate(5);

// 2. 向Buffer中存储数据
for (int i = 0; i < intBuffer.capacity(); i++) {
intBuffer.put(i * 2);
}


// 3. 从Buffer中读取数据
// 将buffer进行读写切换(重要)
intBuffer.flip();

while (intBuffer.hasRemaining()){
System.out.println(intBuffer.get());
}
}
}
  • Buffer刚创建时,capacity = 5,固定不变。limit指针指向5position指向0mark指向-1

mark

  • 之后调用 intBuffer.put方法,向buffer中添加数据,会不断移动position指针,最后position变量会和limit指向相同。

mark

  • 调用 buffer.flip()实际上是重置了positionlimit两个变量,将limit放在position的位置,position放在0的位置。这里只是最后的positionlimit位置相同,所以fliplimit位置没变。

mark

  • 调用 intBuffer.get()实际上是不断移动position指针,直到它移动到limit的位置

2.4 Buffer 类方法

Buffer基类(抽象类)
  • public final int capacity();
    • 直接返回了此缓冲区的容量,capacity
  • public final int position();
    • 直接返回了此缓冲区指针的当前位置
  • public final Buffer position(int newPosition);
    • 设置此缓冲区的位置,设置position
  • public final int limit();
    • 返回此缓冲区的限制
  • public final Buffer limit(int newLimit);
    • 设置此缓冲区的限制,设置limit
  • public final Buffer clear();
    • 清除此缓冲区,即将各个标记恢复到初识状态, position = 0;limit = capacity; mark = -1,但是并没有删除数据。
  • public final Buffer flip();
    • 反转此缓冲区, limit = position;position = 0;mark = -1
    • 当指定数据存放在缓冲区中后,position所指向的即为此缓冲区数据最后的位置。只有当数据大小和此缓冲区大小相同时,position才和limit的指向相同。
    • flip()方法将limit置向positionposition0,那么从position读取数据到limit即为此缓冲区中所有的数据。
  • public final boolean hasRemaining();
    • 告知当前位置和限制之间是否有元素。return position < limit;
  • public abstract boolean isReadOnly();
    • 此方法为抽象方法,告知此缓冲区是否为只读缓冲区,具体实现在各个实现类中。
  • public abstract boolean hasArray();
    • 告知此缓冲区是否具有可访问的底层实现数组
  • public abstract Object array();
    • 返回此缓冲区的底层实现数组

2.5 ByteBuffer的使用

从前面可以看出来对于Java中的基本数据类型(boolean除外),都有一个Buffer类型与之对应,最常用的自然是ByteBuffer类(二进制数据),该类的主要方法如下:

  • public static ByteBuffer allocateDirect(int capacity);
    • 创建直接缓冲区
  • public static ByteBuffer allocate(int capacity) ;
    • 设置缓冲区的初识容量
  • public abstract byte get();
    • 从当前位置positionget数据,获取之后,position会自动加1
  • public abstract byte get(int index);
    • 通过绝对位置获取数据。
  • public abstract ByteBuffer put(byte b);
    • 从当前位置上添加,put之后,position会自动加1
  • public abstract ByteBuffer put(int index, byte b);
    • 从绝对位置上添加数据
  • public abstract ByteBuffer putXxx(Xxx value [, int index]);
    • position当前位置插入元素。Xxx表示基本数据类型
    • 此方法时类型化的 putgetput放入的是什么数据类型,get就应该使用相应的数据类型来取出,否则可能有 BufferUnderflowException 异常。

demo01 : 当取出的顺序和上面插入的数据类型的顺序不对时,就会抛出BufferUnderflowException异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ByteBuffer buf = ByteBuffer.allocate(64);

//类型化方式放入数据
buf.putInt(100);
buf.putLong(20);
buf.putChar('上');
buf.putShort((short)44);

//取出,当取出的顺序和上面插入的数据类型的顺序不对时,就会抛出BufferUnderflowException异常
buf.flip();
System.out.println(buf.getInt());
System.out.println(buf.getLong());
System.out.println(buf.getChar());
System.out.println(buf.getShort());

demo02 : 可以将一个普通的Buffer转成只读的Buffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//创建一个Buffer
ByteBuffer byteBuffer = ByteBuffer.allocate(64);
for (int i = 0; i < 64; i++) {
byteBuffer.put((byte)i);
}
//读取
byteBuffer.flip();
//得到一个只读的Buffer
ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer();
System.out.println(readOnlyBuffer.getClass());
//读取
while (readOnlyBuffer.hasRemaining()){
System.out.println(readOnlyBuffer.get());
}
readOnlyBuffer.put((byte)100); //会抛出 ReadOnlyBufferException

demo03: MappedByteBuffer可以让文件直接在内存(堆外内存)中进行修改,而如何同步到文件由NIO来完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 1、MappedByteBuffer可以让文件直接在内存中(堆外内存)修改,操作系统不需要拷贝一次
*/
@Test
public void test() throws IOException {
RandomAccessFile randomAccessFile = new RandomAccessFile("1.txt", "rw");
//获取对应的文件通道
FileChannel channel = randomAccessFile.getChannel();
/**
* 参数1: FileChannel.MapMode.READ_WRITE,使用的读写模式
* 参数2: 0,可以直接修改的起始位置
* 参数3: 5,是映射到内存的大小(不是文件中字母的索引位置),即将 1.txt 的多少个字节映射到内存,也就是可以直接修改的范围就是 [0, 5)
* 实际的实例化类型:DirectByteBuffer
*/
MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);

mappedByteBuffer.put(0,(byte)'N');
mappedByteBuffer.put(3, (byte)'M');
mappedByteBuffer.put(5, (byte)'Y'); //会抛出 IndexOutOfBoundsException

randomAccessFile.close();
System.out.println("修改成功~");
}

3. 通道(Channel)

3.1 基本介绍

NIO的通道类似于流,但有些区别

  • 通道可以同时进行读写,而流只能读或者只能写
  • 通道可以实现异步读写数据
  • 通道可以从缓存读数据,也可以写数据到缓存

mark

  • BIO 中的 stream 是单向的,例如:FileInputStream对象只能进行读取数据的操作,而NIO中的通道(Channel)是双向的,可以读操作,也可以写操作。
  • Channel 在 NIO 中是一个接口:public interface Channel extends Closeable{}
  • 常用的Channel类有:FileChannelDatagramChannelServerSocketChannel(类似ServerSocket)、SocketChannel(类似Socket
  • FileChannel 用于文件数据的读写,DatagramChannel用于UDP数据的读写,ServerSocketChannelSocketChannel用于TCP数据读写

类关系图:

mark

3.2 FileChannel

  • public int read(ByteBuffer dst)
    
    1
    2
    3
    4
    5

    - 从通道读取数据并放到缓冲区中
    - 此操作也会移动 `Buffer` 中的`position`指针,不断往`position`中放数据,`read`完成后`position`指向`limit`。

    -
    public int write(ByteBuffer src)
    1
    2
    3
    4
    5

    - 把缓冲区的数据写到通道中
    - 此操作也会不断移动`Buffer`中的`position`位置直到`limit`,读取到的数据就是`position`到`limit`这两个指针之间的数据。

    -
    public long transferFrom(ReadableByteChannel src, long position, long count)
    1
    2
    3
    4

    - 从目标通道中复制数据到当前通道

    -
    public long transferTo(long position, long count, WritableByteChannel target)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29

    - 把数据从当前通道复制给目标通道
    - 该方法拷贝数据使用了**零拷贝**,通常用来在网络`IO`传输中,将`FileChannel`里面的文件数据直接拷贝到与客户端或者服务端连接的`Channel`里面从而达到文件传输。



    **应用实例**

    **实例1:将数据写入到本地文件**

    ```java
    String str = "hello,尚硅谷";
    //创建一个输出流 -> Channel
    FileOutputStream fileOutputStream = new FileOutputStream("d:\\file01.txt");

    //通过 FileOutputStream 获取对应的 FileChannel
    //这个 FileChannel 真实类型是 FileChannelImpl
    FileChannel fileChannel = fileOutputStream.getChannel();

    //创建一个缓冲区 ByteBuffer
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    //将str放入ByteBuffer
    byteBuffer.put(str.getBytes());
    //对ByteBuffer进行反转,开始读取
    byteBuffer.flip();
    //将ByteBuffer数据写入到FileChannel
    //此操作会不断移动 Buffer中的 position到 limit 的位置
    fileChannel.write(byteBuffer);
    fileOutputStream.close();

实例2:从本地文件读取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
//创建文件的输入流
File file = new File("d:\\file01.txt");
FileInputStream fileInputStream = new FileInputStream(file);
//通过fileInputStream 获取对应的FileChannel -> 实际类型 FileChannelImpl
FileChannel fileChannel = fileInputStream.getChannel();
//创建缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());
//将通道的数据读入到buffer
fileChannel.read(byteBuffer);

//将ByteBuffer 的字节数据转成String
System.out.println(new String(byteBuffer.array()));
fileInputStream.close();

实例1、2的示例图:

mark

实例3:使用一个Buffer完成文件的读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
FileInputStream fileInputStream = new FileInputStream("1.txt");
FileChannel fileChannel1 = fileInputStream.getChannel();

FileOutputStream fileOutputStream = new FileOutputStream("2.txt");
FileChannel fileChannel2 = fileOutputStream.getChannel();

ByteBuffer byteBuffer = ByteBuffer.allocate(512);
while (true){
//清空buffer,由于循环的最后执行了 write 操作,会将 position 移动到 limit 的位置
//清空 Buffer的操作才为上一次的循环重置position的位置
// 如果没有重置position,那么上次读取后,position和limit位置一样,读取后read的值永远为0
byteBuffer.clear();
//将数据存入 ByteBuffer,它会基于 Buffer 此刻的 position 和 limit 的值,
// 将数据放入position的位置,然后不断移动position直到其与limit相等;
int read = fileChannel1.read(byteBuffer);
System.out.println("read=" + read);
if (read == -1) { //表示读完
break;
}
//将buffer中的数据写入到 FileChannel02 ---- 2.txt
byteBuffer.flip();
fileChannel2.write(byteBuffer);
}

//关闭相关的流
fileInputStream.close();
fileOutputStream.close();

mark

实例4:拷贝文件 transferFrom 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//创建相关流
FileInputStream fileInputStream = new FileInputStream("d:\\a.gif");
FileOutputStream fileOutputStream = new FileOutputStream("d:\\a2.gif");

//获取各个流对应的FileChannel
FileChannel source = fileInputStream.getChannel();
FileChannel dest = fileOutputStream.getChannel();

//使用 transferForm 完成拷贝
dest.transferFrom(source, 0, source.size());
//关闭相关的通道和流
source.close();
dest.close();
fileInputStream.close();
fileOutputStream.close();
  • 实例4相当于封装了实例3

3.3 ServerSocketChannel 和 SocketChannel 类

ServerSocketChannel:主要用于在服务器监听新的客户端Socket连接

  • public static ServerSocketChannel open()
    • 得到一个 ServerSocketChannel 通道
  • public final ServerSocketChannel bind(SocketAddress local)
    • 设置服务器监听端口
  • public final SelectableChannel configureBlocking(boolean block)
    • 用于设置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式
    • 此方法位于 ServerSocketChannelSocketChannel的共同父类AbstractSelectableChannel类中
  • public abstract SocketChannel accept()
    • 接受一个连接,返回代表这个连接的通道对象
  • public final SelectionKey register(Selector sel, int ops)
    • Channel注册到选择器并设置监听事件,也可以在绑定的同时注册多个事件,如下所示:
    • channel.register(selector,Selectionkey.OP_READ | Selectionkey.OP_CONNECT)

SocketChannel:网络IO通道,具体负责进行读写操作。NIO把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区

  • public static SocketChannel open()
    • 得到一个SocketChannel通道
  • public final SelectableChannel configureBlocking(boolean block)
    • 设置阻塞或非阻塞模式,取值 false表示采用非阻塞模式
    • 此方法位于 ServerSocketChannelSocketChannel的共同父类AbstractSelectableChannel类中
  • public abstract boolean connect(SocketAddress remote)
    • 连接服务器
  • public boolean finishConnect()
    • 如果上面的方法连接失败,接下来就要通过该方法完成连接操作
  • public int write(ByteBuffer src)
    • 往通道里写数据
    • 这里写入的是buffer里面positionlimit这个之间的数据
  • public int read(ByteBuffer dst)
    • 从通道里读数据
  • public final SelectionKey register(Selector sel, int ops, Object att)
    • 注册Channel到选择器并设置监听事件,最后一个参数可以设置共享数据
  • public final void close()
    • 关闭通道

应用实例

  • 通过Buffer数组来完成读写操作,即ScatteringGathering
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* Scattering:将数据写入到buffer时,可以采用buffer数组,初次写入 【分散】
* Gathering:从buffer读取数据时,也可以采用buffer数组,依次读
*/
@Test
public void test() throws IOException {
//使用 ServerSocketChannel 和 SocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress inetSocketAddress = new InetSocketAddress(7000);
//绑定端口到socket,并启动
serverSocketChannel.socket().bind(inetSocketAddress);
//创建一个Buffer数组
ByteBuffer[] byteBuffers = new ByteBuffer[2];
byteBuffers[0] = ByteBuffer.allocate(5);
byteBuffers[1] = ByteBuffer.allocate(3);

//等待客户端的连接(Telnet)
SocketChannel socketChannel = serverSocketChannel.accept();
int msgLength = 8; //假定从客户端接受8个字节
//循环的读取
while (true) {
int byteRead = 0;
while (byteRead < msgLength) {
long l = socketChannel.read(byteBuffers);
byteRead += l; //累计读取的字节数
System.out.println("byteRead= " + byteRead);
//使用流打印,看看当前这个buffer的position和limit
Arrays.stream(byteBuffers)
.map(buffer -> "position=" + buffer.position() + ", limit = " + buffer.limit())
.forEach(System.out::println);
}
//读书数据后需要将所有的buffer进行flip
Arrays.asList(byteBuffers).forEach(Buffer::flip);

//将数据读出显示到客户端
long byteWrite = 0;
while (byteWrite < msgLength) {
long l = socketChannel.write(byteBuffers);
byteWrite += l;
}

//将所有的 buffer 进行clear操作
Arrays.asList(byteBuffers).forEach(Buffer::clear);
System.out.println("byteRead=" + byteRead + ", byteWrite=" + byteWrite
+ ", msgLength=" + msgLength);
}
}

4. Selector(选择器)

4.1 基本介绍

  • Java 的 NIO,用非阻塞的 IO 方式。可以用一个线程,处理多个的客户端连接,就会使用到Selector(选择器)
  • Selector能够检测多个注册的通道上是否有事件发生(注意:多个Channel以事件的方式可以注册到同一个Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。
  • 只有在 连接/通道 真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程
  • 避免了多线程之间的上下文切换导致的开销

mark

其中 : NettyIO线程NioEventLoop聚合了Selector(选择器,也叫多路复用器),可以同时并发处理成百上千个客户端连接。

  • 当线程从某客户端 Socket 通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。

  • 线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。

  • 由于读写操作都是非阻塞的,这就可以充分提升 IO 线程的运行效率,避免由于频繁 I/O 阻塞导致的线程挂起。

  • 一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。

4.2 SelectionKey 介绍

mark

  • Selector通过管理SelectionKey的集合从而去监听各个Channel
  • Channel注册到Selector上面时,会携带该Channel关注的事件(SelectionKey包含Channel以及与之对应的事件),并会返回一个SelectionKey的对象,Selector将该对象加入到它统一管理的集合中去,从而对Channel进行管理。
  • SelectionKey表示的是Selector和网络通道的注册关系,故FileChannel是没有办法通过SelectionKey注册到Selector上去的。

四大事件

  • public static final int OP_READ = 1 << 0
    • 值为1,表示读操作,
    • 代表本Channel已经接受到其他客户端传过来的消息,需要将Channel中的数据读取到Buffer中去
  • public static final int OP_WRITE = 1 << 2
    • 值为4,表示写操作
    • 一般临时将Channel的事件修改为它,在处理完后又修改回去。
  • public static final int OP_CONNECT = 1 << 3
    • 值为8,代表建立连接。
    • 一般在ServerSocketChannel上绑定该事件,结合 channel.finishConnect()在连接建立异常时进行异常处理
  • public static final int OP_ACCEPT = 1 << 4
    • 值为16,表示由新的网络连接可以accept
    • ServerSocketChannel进行绑定,用于创建新的SocketChannel,并把其注册到Selector上去

相关方法

  • public abstract Selector selector()
    • 得到该SelectionKey具体是属于哪个Selector对象的
  • public abstract SelectableChannel channel()
    • 通过SelectionKey的到对应的Channel
  • public final Object attachment()
    • 得到与之关联的共享数据,一般用于获取buffer
    • 在使用register注册通道时,也可以为该Channel绑定一个Buffer,可以通过本方法获取这个Buffer
    • 通过selectionKey.attach(Object ob)绑定的数据,也是通过该方法获取
  • public abstract SelectionKey interestOps()
    • 获取该SelectionKey下面的事件
  • public abstract SelectionKey interestOps(int ops)
    • 用于设置或改变某个Channel关联的事件
    • 增加事件:key.interestOps(key.interestOps | SelectionKey.OP_WRITE)
    • 减少事件:key.interestOps(key.interestOps & ~SelectionKey.OP_WRITE)
  • public final boolean isAcceptable(),isReadable(),isWritable(),isConnectable()
    • 用于判断这个SelectionKey产生的是什么事件,与上面的事件类型一一对应

4.3 Selector 常见方法

  • public static Selector open();
    • 得到一个选择器对象,实例化出 WindowsSelectorImpl对象。
  • public int select(long timeout)
    • 监控所有注册的通道,当其中有IO操作可以进行时,将对应的SelectionKey加入到内部集合中并返回,返回的结果为Channel响应的事件总和,当结果为0时,表示本Selector监听的所有Channel中没有Channel产生事件。
    • 如果不传入timeout值,就会阻塞线程,传入值则为阻塞多少毫秒,通过它设置超时时间。
    • 之所以需要传入时间,是为了让它等待几秒钟再看有没有Channel会产生事件,从而获取一段时间内产生事件的Channel的总集合再一起处理。
  • selector.selectNow();
    • 不会阻塞,立马返回冒泡的事件数
  • public Set<SelectionKey> selectedKeys()
    • 从内部集合中得到所有的SelectionKey

4.4 Demo 实例

编码步骤

  1. 当客户端连接时,会通过ServerSocketChannel 得到 SocketChannel
  2. Selector 进行监听 select方法, 返回有事件发生的通道的个数.
  3. socketChannel注册到Selector上, register(Selector sel, int ops), 一个selector上可以注册多个SocketChannel
  4. 注册后返回一个 SelectionKey, 会和该Selector 关联(集合)
  5. 进一步得到各个 SelectionKey (有事件发生)
  6. 在通过 SelectionKey 反向获取 SocketChannel , 方法 channel()
  7. 判断该Channel的事件类型,对不同事件进行不同的业务处理

NIO入门案例:实现服务器和客户端的简单通讯

  • 服务器端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public void Server() throws IOException {
//创建ServerSocketChannel -> ServerSocket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//得到一个Selector对象
Selector selector = Selector.open();
//绑定一个端口6666
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
//设置非阻塞
serverSocketChannel.configureBlocking(false);

//把 serverSocketChannel 注册到 selector ,关心事件为:OP_ACCEPT,有新的客户端连接
SelectionKey register = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println();
//循环等待客户端连接
while (true) {
//等待1秒,如果没有事件发生,就返回
if (selector.select(1000) == 0) {
System.out.println("服务器等待了1秒,无连接");
continue;
}
//如果返回的 > 0,表示已经获取到关注的事件
// 就获取到相关的 selectionKey 集合,反向获取通道
Set<SelectionKey> selectionKeys = selector.selectedKeys();

//遍历 Set<SelectionKey>,使用迭代器遍历
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
//获取到SelectionKey
SelectionKey key = keyIterator.next();
//根据 key 对应的通道发生的事件,做相应的处理
if (key.isAcceptable()) {//如果是 OP_ACCEPT,有新的客户端连接
//该客户端生成一个 SocketChannel
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("客户端连接成功,生成了一个SocketChannel:" + socketChannel.hashCode());
//将SocketChannel设置为非阻塞
socketChannel.configureBlocking(false);
//将socketChannel注册到selector,关注事件为 OP_READ,同时给SocketChannel关联一个Buffer
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
}
if (key.isReadable()) {
//通过key,反向获取到对应的Channel
SocketChannel channel = (SocketChannel) key.channel();
//获取到该channel关联的Buffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
channel.read(buffer);
System.out.println("from 客户端:" + new String(buffer.array()));
}
//手动从集合中移除当前的 selectionKey,防止重复操作
keyIterator.remove();
}

}
}
  • 客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void Client() throws IOException {
//得到一个网络通道
SocketChannel socketChannel = SocketChannel.open();
//设置非阻塞
socketChannel.configureBlocking(false);
//提供服务器端的IP和端口
InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 6666);
//连接服务器
if (!socketChannel.connect(socketAddress)){ //如果不成功
while (!socketChannel.finishConnect()){
System.out.println("因为连接需要时间,客户端不会阻塞,可以做其他工作。。。");
}
}

//如果连接成功,就发送数据
String str = "hello, 尚硅谷";
ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes());
//发送数据,实际上就是将buffer数据写入到channel
socketChannel.write(byteBuffer);
System.in.read();
}

5. NIO 实战(群聊系统demo)

  • 需要实现客户端和服务器端之间的数据通讯,服务端能够将数据转发给其他所有客户端。

5.1 服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package GroupChat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

public class GroupChatServer {
// 属性
private Selector selector;
private ServerSocketChannel listenChannel;
private static final int PORT = 6677;

// 初始化工作
public GroupChatServer(){
try{
// 打开连接通道
selector = Selector.open();
listenChannel = ServerSocketChannel.open();

// 绑定端口
listenChannel.socket().bind(new InetSocketAddress(PORT));
// 设置非阻塞
listenChannel.configureBlocking(false);
// 注册到选择器上
listenChannel.register(selector, SelectionKey.OP_ACCEPT);
}catch (Exception e){
e.printStackTrace();
}
}


// 监听函数
public void listen(){
try {
while (true){
int count = selector.select(2000);
if(count > 0){ // 返回就绪的socket的数量
// 遍历得到SelectionKey集合
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

// 获取key
SelectionKey key = iterator.next();

// 反向获取accept的事件的channel
if (key.isAcceptable()){
SocketChannel socketChannel = listenChannel.accept();
socketChannel.configureBlocking(false);

// 将socketChannel 注册到选择器上
socketChannel.register(selector,SelectionKey.OP_READ);

// 提示客户端上线了
System.out.println(socketChannel.getRemoteAddress().toString().substring(1) + "上线了");
}

if (key.isReadable()){ // 通道数据为可读的状态
// 读取数据
readData(key);
}

// 删除当前的key,防止重复处理
iterator.remove();
}
}
}catch (Exception e2){
e2.printStackTrace();
}
}


// 读取客户端发送的数据
public void readData(SelectionKey key){
// 定义一个socketChannel
SocketChannel channel = null;

try{
// 得到channel
channel = (SocketChannel) key.channel();

// 创建buffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int read = channel.read(byteBuffer);

if(read > 0){
// 缓存区的数据转存到字符串
String msg = new String(byteBuffer.array());
// 输出消息
System.out.println("来自客户端的消息" + msg);

// 将服务器消息转发给其他用户
sendToOthers(msg,channel);
}
}catch (IOException e){ // 表示客户端此时下线了
try {
System.out.println(channel.getRemoteAddress() + "离线了....");
//取消注册
key.cancel();
//关闭通道
channel.close();
}catch (IOException e2){
e2.printStackTrace();
}

e.printStackTrace();
}
}

// 消息转发给其他用户
private void sendToOthers(String msg,SocketChannel self) throws IOException {
System.out.println("服务器消息转发中...");

// 遍历已经注册到selector 的 socketchannel,排除掉自己
for (SelectionKey key : selector.keys()) {
// 通过key取出对应的SocketChannel
Channel channel = key.channel();
// 排除自己
if(channel instanceof SocketChannel & channel != self){
// 转型
SocketChannel dest = (SocketChannel) channel;
// 将msg放入buffer中
ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());
// 将buffer数据写入到通道
dest.write(byteBuffer);
}
}
}


// 服务器启动
public static void main(String[] args) {
GroupChatServer groupChatServer = new GroupChatServer();
groupChatServer.listen();
}
}

5.2 客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package GroupChat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;

public class GroupChatClient {
private SocketChannel socketChannel = null;
private Selector selector = null;
private final static int PORT = 6677;
private final static String HOST = "127.0.0.1";
private String userName;

// 初始化操作
public GroupChatClient() throws IOException {
selector = this.selector.open();
//链接服务器
socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));
//设置非阻塞
socketChannel.configureBlocking(false);

// 注册到selector上
socketChannel.register(selector, SelectionKey.OP_READ);

// 获取用户名
userName = socketChannel.getLocalAddress().toString().substring(1);
System.out.println(userName + " is ok....");
}

//向服务器发送消息
public void sendInfo(String info) {
info = userName + "说:" + info;
try {
socketChannel.write(ByteBuffer.wrap(info.getBytes()));
} catch (IOException e) {
e.printStackTrace();
}
}

// 读取从服务器回显的消息
public void readInfo(){
try {
int select = selector.select();
if (select > 0){ // 说明有可用的通道
// 获取key集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 迭代
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
if(key.isReadable()){
// 得到相关通道
SocketChannel channel = (SocketChannel) key.channel();
// 得到一个buffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 读取数据
channel.read(byteBuffer);
// 将数据转换成字符串
String s = new String(byteBuffer.array());
System.out.println(s.trim());
}
}
// 读完了删除这个key
iterator.remove();
}else{
// 这里可以干点别的东西
}
}catch (Exception e){
e.printStackTrace();
}
}

public static void main(String[] args) throws IOException {
GroupChatClient groupChatClient = new GroupChatClient();

// 开启一个线程读取数据
new Thread(()->{while (true){groupChatClient.readInfo();}}).start();
// 发送数据
Scanner sc = new Scanner(System.in);
while (sc.hasNextLine()){
String s= sc.nextLine();
groupChatClient.sendInfo(s);
}
}
}

注意事项:

  • 使用int read = channel.read(buffer)

    读取数据时,读取的结果情况:

    • read=-1时,说明客户端的数据发送完毕,并且主动的关闭socket。所以这种情况下,服务器程序需要关闭socketSocket,并且取消key的注册。注意:这个时候继续使用SocketChannel进行读操作的话,就会抛出:==远程主机强迫关闭一个现有的连接==的IO异常
    • read = 0时:
      • 某一时刻SocketChannel中当前没有数据可读。
      • 客户端的数据发送完毕。
      • 详情见此博文
      • 但是对于博文中的这一条,经过本人测试,这种情况下返回的是读取的数据的大小,而不是0ByteBufferposition等于limit,这个时候也会返回0

参考博客 : https://www.cnblogs.com/hyy9527/p/13059248.html

https://blog.csdn.net/qq_35751014/article/details/104411347

打赏
  • 版权声明: 本博客所有文章除特别声明外,均采用 Apache License 2.0 许可协议。转载请注明出处!
  • © 2019-2022 Zhuuu
  • PV: UV:

请我喝杯咖啡吧~

支付宝
微信