Apache Mina使用问题

最近有使用到mina框架做TCP连接,此处记录下其中的一些问题。

1.mina的基本使用
作为一个框架,它解决了很多基础的问题,类似很多web框架一样,可以让你把精力集中在业务逻辑上。mina已经完成了底层对于连接的建立和管理,在创建客户端或服务端的时候绑定对应的handler,即可在连接建立后的各种事件触发下执行相应操作。
服务端建立示例:

服务端建立
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static final int PORT = 9321;
NioSocketAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
acceptor.getFilterChain().addLast(
"codec",
new ProtocolCodecFilter(new TextLineCodecFactory(Charset
.forName("UTF-8"))));
// 心跳
KeepAliveMessageFactory kamf = new ServerKeepAliveMessageFactoryImpl();
KeepAliveFilter kaf = new KeepAliveFilter(kamf, IdleStatus.BOTH_IDLE);
kaf.setForwardEvent(true);
kaf.setRequestInterval(22);
acceptor.getFilterChain().addLast("heartbeat", kaf);
acceptor.setHandler(new ServerHandler());
acceptor.addListener(new ServerIOListener());//
acceptor.getSessionConfig().setReadBufferSize(2048);
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);

try {
acceptor.bind(new InetSocketAddress(PORT));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

以上就建立了一个server端,并监听端口号为PORT的端口。其中,getFilterChain()往其中添加过滤器,若传输的内容非普通文本,而是对象,则编码的地方需要使用 new ObjectSerializationCodecFactory() 才能够在两端读写对象,同时传输的对象需要支持序列化。Mina自己带有心跳的部分,通过添加心跳的过滤器,可在心跳工厂中实现对应方法,并按照设定时间发起心跳以检测对方是否在线。

心跳工厂方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;

/*import cn.com.softvan.checkisalive.bean.MessageBean;
import cn.com.softvan.checkisalive.common.CommonMsgAction;
*/


/**
* 服务器心跳工厂实现
* @author Administrator
*
*/
public class ServerKeepAliveFactoryImpl implements KeepAliveMessageFactory{

/** 心跳包内容 */
private static final String REQUEST = "x1";
private static final String RESPONSE = "x2";

@Override
public Object getRequest(IoSession arg0) {
return REQUEST;
}

@Override
public Object getResponse(IoSession arg0, Object arg1) {
return RESPONSE;
}

@Override
public boolean isRequest(IoSession arg0, Object arg1) {
if (REQUEST.equals(arg1))
return true;
return false;
}

@Override
public boolean isResponse(IoSession arg0, Object arg1) {

return false;
}

}

心跳工厂主要实现 KeepAliveMessageFactory 接口,其中包含四个方法,在客户端和服务端的实现方式稍有不同,主要取决于是由谁主动发起心跳请求。在上述例子中,服务端被动接受客户端发来的心跳,因此它需要获取response信息,因为是接受方,因此需要判断发来的信息是否属于心跳请求,同时因为它不会收到回复,所以 isResponse 返回false。其中的请求和回复消息由自己约定。在客户端和服务端应当相同。客户端的方法则和服务端相反,需要判断信息是否为回复,isRequest 则返回false。

客户端的基本实现:

客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
      NioSocketConnector connector = new NioSocketConnector();
connector.setConnectTimeoutMillis(3*1000);
connector.getFilterChain().addLast(
"codec",
new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));

connector.getFilterChain().addLast("logger", new LoggingFilter());

// 心跳
KeepAliveMessageFactory kamf = new ClientKeepAliveFactory();
KeepAliveFilter kaf = new KeepAliveFilter(kamf, IdleStatus.BOTH_IDLE);
kaf.setForwardEvent(true);
kaf.setRequestInterval(20));
kaf.setRequestTimeout(5));

connector.getFilterChain().addLast("heartbeat", kaf);
connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30));
connector.setHandler(new ClientHandler());
//监听器
connector.addListener(new ClientServiceListener());

for (;;) {
try {
ConnectFuture future = connector.connect(new InetSocketAddress(
"127.0.0.1", 9321));
future.awaitUninterruptibly();
IoSession session = future.getSession();
if(session.isConnected()){
log.info("连接已建立");
}
break;
} catch (RuntimeIoException e) {
log.error("连接发生异常,将在5秒后重试");
e.printStackTrace();
Thread.sleep(5000);
}
}

客户端和服务端基本类似,主要将 NioSocketAcceptor 替换为 NioSocketConnector 其中同样绑定各种过滤器和事件监听等。所必须的为 setHandler 中的handler,可通过继承 IoHandlerAdapter 来实现对session建立销毁,消息接受和发送等事件的触发处理。

2.一些问题
在上述的使用当中遇到一些问题,在此总结一下:

1) 空闲时间的问题
mina可设置空闲时间,即上述的IdleTime, BOTH_IDLE表示读写空闲,当读写都未发生超过30秒时,会出发idle事件,该事件将在handler中被捕捉到。但此处的设置与心跳的请求间隔 setRequestInterval 会产生冲突。网上说此处的时间要比idleTime小,这样保证在空闲事件发生之前发送心跳,则正常情况将不会有空闲时间,以此来保持长连接。但实际使用过程中发现当设置了心跳请求间隔时间后,mina似乎会忽略空闲时间。当心跳请求的间隔时间大于一分钟时,若服务端未设置心跳间隔时间,mina会默认一分钟就进入IDLE的状态,同时发现服务端的空闲时间也变成了心跳请求时间。在此之前看到有说服务端的心跳间隔时间和超时时间不必设置,因为时被动接受心跳的。但是如果心跳大于一份间隔时,不设置会出现意外的情况。但超时间确实没有用。

另外一点则是,根据打印的日志表明,每次发送心跳前都会出发idle事件,大概有了 RequestInterval 之后,setIdleTime 确实被忽略了。
2)重连的问题
测试过在连接断开,session被关闭的时候,会同时触发handler中的sessionClosed事件和Listener(继承IoServiceListener)中的sessionDestroyed事件,可在这两个地方进行连接的重新建立。但是,在上述客户端中的future.awaitUninterruptibly();方法无法正常使用。由于mina时异步操作。因此发起连接后连接不一定建立完成,该方法用于阻塞线程,直到连接建立完成。如果在handler或listener中调用了此方法,如果连接超时时间大于3秒左右,将会抛出死锁的异常。超时时间需要比较大并且要在这两个地方进行重连的话,不可使用此方法阻塞线程,可通过定义对应超时时间,并在连接后通过Thread.sleep()睡眠相同的时间来阻塞线程(但不知这样做有没有其他问题)

3)环境差异问题
在使用的过程中发现在windows和linux下运行的情况有些不同。若在windows下,客户端和服务端建立连接后,手动停止某一端,另一端会马上报错抛异常,而在linux下则不会,linxu下将等到空闲时间到达,或心跳发送时才会发现连接已断开。如果服务端和客户端处于不同机器,手动停止某一端表现一样,如果断开网络连接,则windows和linux都时在心跳检测或空闲时发现异常(空闲要发现异常除非在空闲时进行了某些操作,若默认无操作则依旧不会抛异常)。