Netty-05-异步模型

Netty-05-异步模型

前言

  • 本文注重讲解Netty的异步模型任务的队列

1. 任务队列

mark

  • 任务队列由NioEventLoop 维护并且不断执行,当我们收到请求之后,在当前的 channel 中对应的 pipeline中的各个 Hanlder进行业务的处理和请求的过滤。
  • 当某些业务需要消费大量事件的时候,我们可以将这些任务提交到由 NioEventLoop 维护的 taskQueue 或者 ScheduleTaskQueue中, 让当前的 NioEventLoop 线程在空闲的时候去执行这些任务。
  • 下面将介绍提交任务的三种方式:

1.1 用户程序自定义的普通任务

  • 该方式会将任务提交到taskQueue队列中。提交到该队列中的任务会按照提交顺序依次执行。
1
2
3
4
5
6
channelHandlerContext.channel().eventLoop().execute(new Runnable(){
@Override
public void run() {
//...
}
});

1.2 用户自定的定时任务

  • 该方式会将任务提交到scheduleTaskQueue定时任务队列中。该队列是底层是优先队列PriorityQueue实现的,固该队列中的任务会按照时间的先后顺序定时执行。
1
2
3
4
5
6
channelHandlerContext.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
//...
}
}, 60, TimeUnit.SECONDS);

1.3 为其他的 EventLoop 线程对应的 Channel添加任务

  • 可以在ChannelInitializer中,将刚创建的各个Channel以及对应的标识加入到统一的集合中去
  • 然后可以根据表示获取 Channel 对应的 NioEventLoop,然后就可以调用execute()或者schedule()方法。

2. 异步模型

2.1 异步的概念

  • 异步的概念和同步是相对的,当一个异步过程调用发出后,调用者不能立即的得到结果。实际处理这个调用的组件在完成后,通过状态,通知和回调来通知调用者
  • Netty 中的 I/O 操作是异步的,包括 BindWriteConnect 等操作会简单的返回一个 ChannelFuture
  • 调用者并不能立即获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机构获取IO 的操作结果。

机制描述

  • Netty 的异步模型是建立在 futurecallback 的之上的。callback 就是回调。重点说 Future,它的核心思想是:假设一个方法 fun,计算过程可能非常耗时,等待 fun返回显然不合适。那么可以在调用 fun 的时候,立马返回一个 Future,后续可以通过 Future去监控方法 fun 的处理过程(即 : Future-Listener 机制)

关于 Future 的说明

  • 表示异步的执行结果, 可以通过它提供的方法来检测执行是否完成,比如检索计算等等.
  • ChannelFuture 是一个接口 : public interface ChannelFuture extends Future<Void>。我们可以添加监听器,当监听的事件发生时,就会通知到监听器

工作原理示意图

mark

mark

  • 在使用 Netty 进行编程时,拦截操作和转换出入站数据只需要您提供 callback 或利用future 即可。这使得链式操作简单、高效, 并有利于编写可重用的、通用的代码。
  • Netty 框架的目标就是让你的业务逻辑从网络基础应用编码中分离出来、解脱出来

2.2 Future-Listener 机制

  • Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态,注册监听函数来执行完成后的操作。

常用方法如下:

方法名称 方法作用
isDone() 判断当前操作是否完成
isSuccess() 判断已完成的当前操作是否成功
getCause() 获取已完成当前操作失败的原因
isCancelled() 判断已完成的当前操作是否被取消
addListener() 注册监听器,当前操作(Future)已完成,将会通知指定的监听器

举例说明

  • 绑定端口操作时异步操作,当绑定操作处理完,将会调用相应的监听器处理逻辑。
1
2
3
4
5
6
7
serverBootstrap.bind(port).addListener(future -> {
if(future.isSuccess()) {
System.out.println(newDate() + ": 端口["+ port + "]绑定成功!");
} else{
System.err.println("端口["+ port + "]绑定失败!");
}
});

2.3 HTTP 实战入门

目标 : 浏览器访问Netty服务器后,返回HelloWorld

  1. 启动器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class TestServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();

try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new TestServerInitializer());

// 绑定端口号操作
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 关闭线程池
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
  1. 自定义ChannelInitializer

    用于给Channel对应的pipeline添加handler。该ChannelInitializer中的代码在SocketChannel被创建时都会执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;


public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel channel) throws Exception {

// 向管道加入处理器
// 首先得到管道
ChannelPipeline pipeline = channel.pipeline();

// 1. 加入一个netty提供的HttpServerCodec
// netty自带的http编码解码器
pipeline.addLast("MyHttpServerCodec",new HttpServerCodec());

// 2. 增加自定义handler
pipeline.addLast("MyTestHttpServerHandler",new TestHttpServerHandler());
}
}
  1. 自定义Handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;

import java.net.URI;

public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
/**
* 读取客户端的数据
* @param channelHandlerContext
* @param httpObject 客户端和服务端互相通讯所使用的对象
* @throws Exception
*/
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
// 判断msg 是不是 HttpRequest请求
if(httpObject instanceof HttpRequest){
System.out.println("msg 类型 = " + httpObject.getClass());
System.out.println("客户端地址:" + channelHandlerContext.channel().remoteAddress());

// 获取http请求
HttpRequest request = (HttpRequest) httpObject;

// 从http请求中获取uri
URI uri = new URI(request.uri());

// 过滤http中的请求
if ("/favicon.ico".equals(uri.getPath())){
System.out.println("请求了 favicon.ico,不做响应");
}

// 回复信息给浏览器[http协议]
ByteBuf content = Unpooled.copiedBuffer("helloWorld", CharsetUtil.UTF_8);
// 构造一个http响应回复给浏览器
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH,content.readableBytes());

// 将构建好的response返回给客户端
channelHandlerContext.writeAndFlush(response);
}
}
}
打赏
  • 版权声明: 本博客所有文章除特别声明外,均采用 Apache License 2.0 许可协议。转载请注明出处!
  • © 2019-2022 Zhuuu
  • PV: UV:

请我喝杯咖啡吧~

支付宝
微信