Add

Reactor模式和NIO


转至 www.jdon.com

注: 理解ACE中的Reactor模式,  高效的响应IO等事件.

当前分布式计算  Services盛行天下,这些网络服务的底层都离不开对socket的操作。他们都有一个共同的结构:
1. Read request
2. Decode request
3. Process service
4. Encode reply
5. Send reply

经典的网络服务的设计如下图,在每个线程中完成对数据的处理:

但这种模式在用户负载增加时,性能将下降非常的快。我们需要重新寻找一个新的方案,保持数据处理的流畅,很显然,事件触发机制是最好的解决办法,当有事件发生时,会触动handler,然后开始数据的处理。

Reactor模式类似于AWT中的Event处理:

Reactor模式参与者

1.Reactor 负责响应IO事件,一旦发生,广播发送给相应的Handler去处理,这类似于AWT的thread
2.Handler 是负责非堵塞行为,类似于AWT ActionListeners;同时负责将handlers与event事件绑定,类似于AWT addActionListener

如图:

Java的NIO为reactor模式提供了实现的基础机制,它的Selector当发现某个channel有数据时,会通过SlectorKey来告知我们,在此我们实现事件和handler的绑定。

我们来看看Reactor模式代码:

public Reactor implements Runnable{

  final Selector selector;
final ServerSocketChannel serverSocket;

  Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(),port);
serverSocket.socket().bind(address);

serverSocket.configureBlocking(false);
//向selector注册该channel
SelectionKey sk =serverSocket.register(selector,SelectionKey.OP_ACCEPT);

logger.("–>Start serverSocket.register!");

//利用sk的attache功能绑定Acceptor 如果有事情,触发Acceptor
sk.attach(new Acceptor());
logger.debug("–>attach(new Acceptor()!");
}

public void run() { // normally in a new Thread
try {
while (!Thread.interrupted())
{
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
//Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。
while (it.hasNext())
//来一个事件 第一次触发一个accepter线程
//以后触发SocketReadHandler
dispatch((SelectionKey)(it.next()));
selected.clear();
}
}catch (IOException ex) {
logger.debug("reactor stop!"+ex);
}
}

  //运行Acceptor或SocketReadHandler
void dispatch(SelectionKey k) {
Runnable r = (Runnable)(k.attachment());
if (r != null){
// r.run();

    }
}

  class Acceptor implements Runnable { // inner
public void run() {
try {
logger.debug("–>ready for accept!");
SocketChannel c = serverSocket.accept();
if (c != null)
//调用Handler来处理channel
new SocketReadHandler(selector, c);
}
catch(IOException ex) {
logger.debug("accept stop!"+ex);
}
}
}
}

以上代码中巧妙使用了SocketChannel的attach功能,将Hanlder和可能会发生事件的channel链接在一起,当发生事件时,可以立即触发相应链接的Handler。

再看看Handler代码:

public class SocketReadHandler implements Runnable {

  public static Logger logger = Logger.getLogger(SocketReadHandler.class);

  private Test test=new Test();

  final SocketChannel socket;
final SelectionKey sk;

static final int READING = 0, SENDING = 1;
int state = READING;

  public SocketReadHandler(Selector sel, SocketChannel c)
throws IOException {

    socket = c;

    socket.configureBlocking(false);
sk = socket.register(sel, 0);

    //将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法。
//参看dispatch(SelectionKey k)
sk.attach(this);

//同时将SelectionKey标记为可读,以便读取。
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}

  public void run() {
try{
// test.read(socket,input);
readRequest() ;
}catch(Exception ex){
logger.debug("readRequest error"+ex);
}
}

/**
* 处理读取data
* @param key
* @throws Exception
*/
private void readRequest() throws Exception {

  ByteBuffer input = ByteBuffer.allocate(1024);
input.clear();
try{

    int bytesRead = socket.read(input);

    ……

    //激活线程池 处理这些request
requestHandle(new Request(socket,btt));

    …..

}catch(Exception e) {
}

}

注意在Handler里面又执行了一次attach,这样,覆盖前面的Acceptor,下次该Handler又有READ事件发生时,将直接触发Handler.从而开始了数据的读 处理 写 发出等流程处理。

将数据读出后,可以将这些数据处理线程做成一个线程池,这样,数据读出后,立即扔到线程池中,这样加速处理速度:

更进一步,我们可以使用多个Selector分别处理连接和读事件。

一个高性能的Java网络服务机制就要形成,激动人心的集群并行计算即将实现。

Random Posts Recent Comments

  • 女友糖尿病害我蛀牙 Says:

    汗一个…...

  • Htj06 Says:

    zhenyouchuangyi...

  • 电商圈 Says:

    试图该怎么建立啊,,怎在程序中是吸纳...

  • edward Says:

    看得人心旷神怡,好文,情不自禁的顶一下...

  • Daniel Says:

    我也在处理这个问题,没有找到好的方法。我用了楼上兄弟的方法,还是可以的。不知道您找到好的方法了吗、我暂时楼上兄弟的方法。...

  • 卡,卡 Says:

    弱弱问一句:博主,你博客的模板这样设计pv高吗?...

  • 站长工具 Says:

    博主,兔年快乐!...

  • health Says:

    great post!!I hope I can read more in your website....

  • pdu Says:

    好博文,支持分享...

  • 站长工具 Says:

    博主的文章很不错,我是站长工具-站长精灵的作者,一款专业的SEO工具软件(可以帮您提高博客的流量),想跟您交换个链接,不知可否...

Tag Cloud

arm audio blog brew cache class debug flash google html j2me java javascript Joke linux lua mobile mtk php python ror ruby server shell stream unix web windows 优化 动态加载 女人 女生 平台 开发 手机 技术 流媒体 测试 漫画 生活 男人 男生 缓存 芯片