Vert.x是一个在JVM开发reactive应用的框架,可用于开发异步、可伸缩、高并发的Web应用(虽然不限于web应用)。其目的在于为JVM提供一个Node.js的替代方案。开发者可以通过它使用JavaScript、Ruby、Groovy、Java,甚至是混合语言来编写应用。
使用Vertx.x框架,可以用JavaScript、CoffeeScript、Ruby、Python、Groovy或Java开发应用程序的组件,最终应用程序可以是混合语言构建的。
本文试图揭示Vert.x的线程模型的应用,通过源代码分析Vert.x如何使用线程池处理请求的,以及比较Netty和Vert.x使用线程池的异同。
也许你觉得奇怪,默认启动一个Vert.x Verticle实例,它只用一个线程处理事件,在多核的情况下你需要创建多个Verticle实例以充分利用多个CPU Core的性能。
Vert.x 实例
首先先啰嗦地介绍一些Vert.x概念,熟悉Vert.x开发的朋友可以跳过这一节。
在Vert.x里,如果你不使用Vertx对象,你几乎是寸步难行。
Vertx对象扮演着Vert.x控制中心的角色,同时它也提供了大量的功能,例如:
- 编写TCP客户端和服务器
- 编写HTTP客户端和服务器,包括websocket
- Event bus
- 共享数据
- 定时器
- 发布和卸载Verticle
- UDP
- DNS client
- 文件系统访问
- 高可用
- 集群
如果你将Vert.x嵌入到你的应用程序中,你可以向下面这样获得一个Vertx对象的引用
Vertx vertx = Vertx.vertx();
当你实例化Vertx对象时,如果你感觉默认的参数不符合你的需求,你可以指定实例化时的参数:
Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(40));
VertxOptions对象拥有很多关于Vertx实例设置,例如配置集群,高可用设置,线程池大小以及等等其他参数。下面就介绍一下它的线程池。
1 线程池
1、eventLoopGroup
这个对象是NioEventLoopGroup
的一个实例,它的线程池的大小由options.getEventLoopPoolSize()
决定,如果没有设置,默认为CPU核数 * 2。
eventLoopThreadFactory = new VertxThreadFactory("vert.x-eventloop-thread-", checker, false); eventLoopGroup = new NioEventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory); eventLoopGroup.setIoRatio(NETTY_IO_RATIO);
它的EventLoop
和一个Context对应:
protected ContextImpl(……) { …… EventLoopGroup group = vertx.getEventLoopGroup(); if (group != null) { this.eventLoop = group.next(); } else { this.eventLoop = null; } …… }
它用来执行标准的Verticle。
2、WorkerPool
用来执行worker Verticle。
workerPool = Executors.newFixedThreadPool(options.getWorkerPoolSize(), new VertxThreadFactory("vert.x-worker-thread-", checker, true));
3、Internal Blocking Pool
内部使用的线程池,可以用来将阻塞代码异步化。
internalBlockingPool = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(), new VertxThreadFactory("vert.x-internal-blocking-", checker, true));
不要在event loop中执行阻塞操作, 比如访问数据库或者网络资源,这绝对会影响你的应用的性能。对于这些阻塞操作,你可以将它们异步化:
vertx.executeBlocking(future -> { // 下面这行代码可能花费很长时间 String result = someAPI.blockingMethod("hello"); future.complete(result); }, res -> { System.out.println("The result is: " + res.result()); });
默认情况下executeBlocking会在同一个context中执行(同一个verticle实例),它们会串行化执行。如果不关心这个执行的顺序,可以将ordered参数设为false,它们会在worker pool线程池中并行的执行。
另外一种执行阻塞代码的方式就是使用worker verticle,worker verticle总是在worker pool线程池中执行。
2 Verticle
Verticle有点类似Actor模型,也可以实现并发的,可扩展的,易于发布的模型。
一个vert.x应用可以包含多个verticle实例,实例之间可以通过event bus通讯。
2.1 三种类型
http://vertx.io/docs/vertx-core/java/#_verticle_types
1、Standard Verticle: 最通用的类型,总是在event loop中执行。
2、Worker Verticle:它们使用worker pool线程池运行。一个verticle实例绝对不会在两个或者更多线程中并发执行。
3、Multi-threaded worker verticle:它们使用worker pool线程池运行。 一个verticle实例可以在多个线程中并发执行。
实现一个Verticle很简单:
public class MyVerticle extends AbstractVerticle { // 当发布verticle时调用 public void start() { } // 可以不实现。当 verticle 卸载时调用 public void stop() { } }
2.2 发布方式
1、命令行方式
vertx run SomeJavaSourceFile.java
或者通过maven-shade-plugin
打包成一个fat
包:
<transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <manifestEntries> <Main-Class>io.vertx.core.Starter</Main-Class> <Main-Verticle>com.colobu.vertx.Main</Main-Verticle> </manifestEntries> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/services/io.vertx.core.spi.VerticleFactory</resource> </transformer> </transformers>
然后运行 java -jar xxx-fat.jar
,你还可以传递一些参数。
2、编程方式
你也可以编程的方式,通过vertx.deployVerticle
发布:
public class Main extends AbstractVerticle { public static void main(String[] args) { VertxOptions vo = new VertxOptions(); vo.setEventLoopPoolSize(16); Vertx vertx = Vertx.vertx(vo); DeploymentOptions options = new DeploymentOptions(); options.setInstances(100); vertx.deployVerticle(Main.class.getName(), options, e -> { System.out.println(e.succeeded()); System.out.println(e.failed()); System.out.println(e.cause()); System.out.println(e.result()); }); } @Override public void start() { Handler<HttpServerRequest> handler = e -> { HttpServerResponse response = e.response(); response.putHeader("content-type", "application/json").end("Hello world"); }; vertx.createHttpServer().requestHandler(handler).listen(8080); } }
Verticle发布和Vert.x线程模型
以上比较啰嗦,主要介绍了一些Vert.x的一些概念。下面是我想重点介绍的内容。
本节以实现一个简单的http server为例(编程方式发布Verticle),分析 vert.x 的线程和Verticle的关系。只分析标准的Verticle。代码如上。
1 Verticle发布过程
首先先创建一个Vertx实例,可以你可以通过VertxOptions
设置线程池的大小。上面的例子中设置Event Loop线程池的大小为16:
vo.setEventLoopPoolSize(16);
因此即使你创建几百个Verticle,也只会有16个Event Loop处理它们,你可以通过jstack
查看这些线程。你会看到多个名为vert.x-eventloop-thread-<num>
的线程,一个vertx-blocked-thread-checker
线程,一个vert.x-acceptor-thread-0
。
调用void deployVerticle(String name, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler)
方法发布Verticle
。
DeploymentOptions
对象可以设置发布参数,比如是否是worker verticle,多线程worker verticle, ha, 隔离组等, 重要的是instances,它用来指定分布的Verticle实例的数量,默认是一个。
底层调用DeploymentManager
的doDeployVerticle
来实现,它会根据实例数创建相应多的Verticle
,然后调用doDeploy
发布这些Verticle
:
Verticle[] verticles = createVerticles(verticleFactory, identifier, options.getInstances(), cl);
我将doDeploy
方法简化,让我们看一下关键代码:
private void doDeploy(String identifier, String deploymentID, DeploymentOptions options, ContextImpl parentContext, ContextImpl callingContext, Handler<AsyncResult<String>> completionHandler, ClassLoader tccl, Verticle... verticles) { //准备工作 …… for (Verticle verticle: verticles) { //创建上下文 ContextImpl context = options.isWorker() ? vertx.createWorkerContext(options.isMultiThreaded(), deploymentID, conf, tccl) : vertx.createEventLoopContext(deploymentID, conf, tccl); deployment.addVerticle(new VerticleHolder(verticle, context)); context.runOnContext(v -> { try { verticle.init(vertx, context); Future<Void> startFuture = Future.future(); verticle.start(startFuture); startFuture.setHandler(……); } catch (Throwable t) {} }); } }
可以看到#11 行创建了一个上下文ContextImpl, 因为本例中我们不用worker模式,所以这个上下文是通过vertx.createEventLoopContext(deploymentID, conf, tccl)
创建的。每个verticle都会创建一个新的上下文,因此verticle和上下文是意义对应的。
#17 行初始化verticle,#19 行启动这个verticle。还记得我们的例子中实现的start
方法吗,它会在这里被调用。
这样,多个verticle实例被发布了。
2 线程模型
首先插播一下Netty的线程模型,不感兴趣的可以略过。
Netty的线程模型
虽然Vert.x底层籍由Netty实现,但是它的处理方式与Netty NIO的线程模型是不同的。
(以下谈论的Netty线程模型是指NIO的情况)
比如下面的Netty代码片段:
EventLoopGroup parentGroup = new NioEventLoopGroup(1); EventLoopGroup childGroup = new NioEventLoopGroup(50); try { ServerBootstrap b = new ServerBootstrap(); b.group(parentGroup, childGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>(){……}); Channel ch = b.bind("0.0.0.0",8080).sync().channel(); ch.closeFuture().sync(); } finally { parentGroup.shutdownGracefully(); childGroup.shutdownGracefully(); }
NioEventLoopGroup
代表一组EventLoop
,每个EventLoop
映射一个线程,每个Channel
注册一个EventLoop
,但是一个EventLoop
可以关联多个Channel
。
parentGroup
用来处理Accept事件,而childGroup
用来处理其余的IO事件。当有并发连接的时候,Handler
会在childGroup
线程池中执行。你可以指定childGroup
的线程数量,如果没有指定,则从系统属性中读取”io.netty.eventLoopThreads”,如果这个属性没有设置,则使用CPU核数 2 (Runtime.getRuntime().availableProcessors() 2))。一般parentGroup
设置为1,我们只需要一个Acceptor处理客户端的连接即可。
当有多个并发连接的时候,每个连接/Channel被分配到一个EventLoop
上。EventLoop
选择是均匀地 (如果线程数是2的n次方,可以用比较快的选择方法PowerOfTwoEventExecutorChooser):
private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { @Override public EventExecutor next() { return children[childIndex.getAndIncrement() & children.length - 1]; } } private final class GenericEventExecutorChooser implements EventExecutorChooser { @Override public EventExecutor next() { return children[Math.abs(childIndex.getAndIncrement() % children.length)]; } }
因此一旦如果某个EventLoop
处理慢了,则这个线程上的event可能出现堆积。
比如下面的代码故意在某个线程上处理慢一些,导致这个EventLoop
上出现堆积,Netty并没有根据压力将时间分配到其它处理快的EventLoop
上。
public class HelloServerHandler extends ChannelInboundHandlerAdapter { …… @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { String name = Thread.currentThread().getName(); System.out.println(name); if (name.endsWith("-5")) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } …… }
输出结果可以看到nioEventLoopGroup-3-5
处理了同样多的请求,而且都堆积在后面了。
…… nioEventLoopGroup-3-19 nioEventLoopGroup-3-18 nioEventLoopGroup-3-19 nioEventLoopGroup-3-18 nioEventLoopGroup-3-20 nioEventLoopGroup-3-20 nioEventLoopGroup-3-5 nioEventLoopGroup-3-5 nioEventLoopGroup-3-5 nioEventLoopGroup-3-5 nioEventLoopGroup-3-5 nioEventLoopGroup-3-5 nioEventLoopGroup-3-5 nioEventLoopGroup-3-5 nioEventLoopGroup-3-5
因此,我们可以了解到,当启动一个NIO方式的Netty实例的时候,它会使用一个线程池来处理http请求。
Netty 4.0的线程模型被很好的重定义,一个ChannelHandler
实例的方法不会被并发的调用,除非它被@Sharable
标记,因此你不应该增加一个ChannelHandler 实例多次。当你增加一个handler到ChannelPipeline中时,你可以指定一个特定的EventExecutorGroup
来执行这个handler。如果没有指定,则使用Channel注册的EventLoop
来执行。如果两个Handler被指定不同的EventExecutorGroup
,则它们会并发执行,因此如果它们会访问共享数据的化,你需要关注并发控制的问题。更多内容可以查看 Netty的文档。
2.2 Vert.x的线程模型
Vert.x如何在线程中处理事件的呢,还是以我们的例子分析。
回顾一下我们实现的Verticle的start方法。
@Override public void start() { Handler<HttpServerRequest> handler = e -> { HttpServerResponse response = e.response(); response.putHeader("content-type", "application/json").end("Hello world"); }; vertx.createHttpServer().requestHandler(handler).listen(8080); }
在这个start方法中,我们创建了一个http server,让它监听 8080端口, http request的处理交给handler执行。 那么监听线程是哪一个?handler又是在哪个线程池中执行的呢?调用多个Verticle实例的方法为什么没有出现”地址/端口被占用”的异常呢?
首先vertx.createHttpServer()会创建一个HttpServerImpl对象,可以通过HttpServerOptions配置更多的参数,每个Verticle实例都会创建一个HttpServerImpl对象。requestHandler(handler)方法设置处理器,你还可以使用Vert.x-Web设置路由的功能。
listen(8080)
启动http 服务器,它实际调用netty实现的。
我将listen
方法简化,去除一些检查代码和回调处理,只保留关键代码如下:
public synchronized HttpServer listen(int port, String host, Handler<AsyncResult<HttpServer>> listenHandler) { listenContext = vertx.getOrCreateContext(); listening = true; synchronized (vertx.sharedHttpServers()) { id = new ServerID(port, host); HttpServerImpl shared = vertx.sharedHttpServers().get(id); if (shared == null) { serverChannelGroup = new DefaultChannelGroup("vertx-acceptor-channels", GlobalEventExecutor.INSTANCE); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(vertx.getAcceptorEventLoopGroup(), availableWorkers); bootstrap.channelFactory(new VertxNioServerChannelFactory()); bootstrap.childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { …… pipeline.addLast("handler", new ServerHandler()); } }); addHandlers(this, listenContext); vertx.sharedHttpServers().put(id, this); actualServer = this; } else { // Server already exists with that host/port - we will use that actualServer = shared; addHandlers(actualServer, listenContext); metrics = vertx.metricsSPI().createMetrics(this, new SocketAddressImpl(port, host), options); } } return this; }
#6 行可以看到它会检查使用这个IP地址和端口的http server是否存在,如果存在的化直接跳到# 27行。因此回答上面的问题,多个Verticle实例不会引起冲突,因为它们会共享同一个http server。
这个http server通过netty ServerBootstrap创建。#10 行可以看到acceptor是一个单线程执行的,acceptorEventLoopGroup在VertxImpl中定义。
acceptorEventLoopGroup = new NioEventLoopGroup(1, acceptorEventLoopThreadFactory);
#10 行还显示,netty的IO worker线程池由availableWorkers
确定,它是一个VertxEventLoopGroup对象。VertxEventLoopGroup类扩展AbstractEventExecutorGroup
,实现了EventLoopGroup
接口:
…… @Override public synchronized EventLoop next() { if (workers.isEmpty()) { throw new IllegalStateException(); } else { EventLoop worker = workers.get(pos).worker; pos++; checkPos(); return worker; } } public synchronized void addWorker(EventLoop worker) { EventLoopHolder holder = findHolder(worker); if (holder == null) { workers.add(new EventLoopHolder(worker)); } else { holder.count++; } } ……
线程的数量由worker
的数量决定,worker的类型是EventLoop,对应一个线程,有多少worker
就会有多少线程。
通过addWorker
可以增加线程的数量,worker不会重复。
回到刚才的listen
方法, #21 行addHandlers方法会配置handler在哪一个event loop中执行:
private void addHandlers(HttpServerImpl server, ContextImpl context) { if (requestStream.handler() != null) { server.reqHandlerManager.addHandler(requestStream.handler(), context); } if (wsStream.handler() != null) { server.wsHandlerManager.addHandler(wsStream.handler(), context); } }
server.reqHandlerManager.addHandler
方法如下:
public synchronized void addHandler(Handler<T> handler, ContextImpl context) { EventLoop worker = context.nettyEventLoop(); availableWorkers.addWorker(worker); Handlers<T> handlers = new Handlers<>(); Handlers<T> prev = handlerMap.putIfAbsent(worker, handlers); if (prev != null) { handlers = prev; } handlers.addHandler(new HandlerHolder<>(context, handler)); hasHandlers = true; }
#2 行得到这个上下文的EventLoop。 还记得上下文的EventLoop怎么创建出来的吗?每个Verticle
实例关联一个上下文,因此一个Verticle
实例只会创建一个worker。
把这个worker加入到availableWorkers,这样就增加了一个事件处理线程。
因此我们可以看出正常情况下Vert.x的每个Verticle
实例只会用一个线程处理请求,在多核情况下一定要配置instance的数量。
如果配置的instance的数量大于eventLoopPoolSize数量,那么就会有一个Event Loop处理多个instance的情况。 线程配置的过多有时不会带来性能的提升,由于线程也有context swicthing,反而会带来性能的降低。