mercredi 17 septembre 2014

Multi threaded UDP server with Netty on Linux

This article is only relevant when Netty is embedded in an application that is intended to run on a Linux distribution that supports native transport (e.g. CentOS >= 6.5).

Bootstrapping your UDP server is as simple as follows:

 Bootstrap bootstrap = new Bootstrap()  
      .group(new EpollEventLoopGroup(workerThreads))  
      .channel(EpollDatagramChannel.class)  
      .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)  
      .handler(channelInitializer);

 ChannelFuture future = bootstrap.bind(host, port).await();  
 if(!future.isSuccess())  
      throw new Exception(String.format("Fail to bind on [host = %s , port = %d].", host, port), future.cause());  

However, no matter the value of workerThreads is, only one Channel will be created and therefore only one thread of the EventLoopGroup will be used. It is generally the right architecture when building a UDP server since UDP is connection- less. However, processing incoming datagrams may involve relatively long operations (e.g. accessing a database) and therefore you would like to take advantage of multi-threading in such case.

As of Netty 4.0.23.Final, there is no "Netty" way to do it, that would for example involve an ExecutorService to dispatch processing of incoming datagrams to multiple threads. You could for example add an inbound handler at the beginning of the pipeline that would be responsible of dispatching incoming datagrams but it may get you in some trouble depending on your pipeline.

A possible solution using native transport is to create multiple Channel that listen on the same port (note the enabled SO_REUSEPORT option when the Bootstrap is initialized).

 Bootstrap bootstrap = new Bootstrap()  
      .group(new EpollEventLoopGroup(workerThreads))  
      .channel(EpollDatagramChannel.class)  
      .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)  
      .option(EpollChannelOption.SO_REUSEPORT, true)  
      .handler(channelInitializer);

 ChannelFuture future;  
 for(int i = 0; i < workerThreads; ++i) {  
      future = bootstrap.bind(host, port).await();  
      if(!future.isSuccess())  
           throw new Exception(String.format("Fail to bind on [host = %s , port = %d].", host, port), future.cause());  
 }  

It is also important to note that this solution works well for datagrams received from different remote transport addresses. The SO_REUSEPORT option is implemented by the kernel and will dispatch received datagrams to the created sockets based on the source transport address. Therefore, datagrams received from the same transport address will be dispatched to the same socket and therefore will be processed serially by the same Channel.

Aucun commentaire:

Enregistrer un commentaire