AbstractClient.java
3.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package com.example.mina.client.base;
import lombok.extern.slf4j.Slf4j;
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import java.net.InetSocketAddress;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* @author dy
* @date 2021/3/10
*/
@Slf4j
public abstract class AbstractClient {
/**
* 客户端与设备端连接的配置信息
*/
protected BoxConnectConfig boxConnectConfig;
/**
* 存储客户端的请求指令的队列
*/
protected LinkedBlockingQueue<BaseRequest> requestsPool;
protected IoSession ioSession;
private CommandSender sender;
public AbstractClient(BoxConnectConfig connectConfig) {
this.boxConnectConfig = connectConfig;
this.requestsPool = new LinkedBlockingQueue<>();
this.sender = new CommandSender(this, this.requestsPool);
}
/**
*获取设备的类型
*/
public String getDeviceType() {
return boxConnectConfig.getDeviceType();
}
/**
* 获取设备ID
*/
public String getDeviceId() {
return boxConnectConfig.getDeviceId();
}
/**
* 获取设备名称
*/
public String getDeviceName() {
return boxConnectConfig.getDeviceName();
}
/**
* 返回客户端对响应的Handler
* @return
*/
abstract protected AbstractVirtualBoxClientHandler getIoHandler();
/**
* 返回该盒子的编解码器,默认为tcp的协议,如果是其它协议,可以在自类中重写改方法
* @return
*/
protected void setFilterChain(DefaultIoFilterChainBuilder filterChainBuilder) {
filterChainBuilder.addLast("codec", new ProtocolCodecFilter(new ByteProtocolFactory()));
}
public void start() {
//1、创建客户端IoService
IoConnector connector = new NioSocketConnector();
//客户端链接超时时间
connector.setConnectTimeoutMillis(30000);
//2、客户端过滤器
setFilterChain(connector.getFilterChain());
//3、客户端IoHandler,发生消息
connector.setHandler(getIoHandler());
//连接服务端
ConnectFuture connectFuture = connector.connect(new InetSocketAddress(boxConnectConfig.getHost(),
boxConnectConfig.getPort()));
// 等待建立连接
connectFuture.awaitUninterruptibly();
// 获取连接会话
this.ioSession = connectFuture.getSession();
this.ioSession.setAttribute(BoxConstants.SESSION_CONFIG_NAME, boxConnectConfig);
//将数据发送线程启动
this.sender.start();
}
/**
* 执行指令
* @param request
*/
public <Q extends BaseRequest<R>, R extends BaseResponse> R execute(Q request) {
requestsPool.offer(request);
ioSession.write(request);
return request.execute();
}
/**
* 客户端启动的时候自动为每个客户端启动一个发送线程,将缓存的指令发送出去
*/
private static class CommandSender extends Thread {
private AbstractClient client;
private LinkedBlockingQueue<BaseRequest> commands;
public CommandSender(AbstractClient client, LinkedBlockingQueue commands ) {
this.client = client;
this.commands = commands;
}
@Override
public void run() {
while (true) {
try {
BaseRequest command = commands.poll(5, TimeUnit.SECONDS);
client.ioSession.write(command);
}catch (Exception e) {
log.error("error occurred when send the request to device, device: " + client.boxConnectConfig, e);
}
}
}
}
}