MatrixClient.java
5.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package com.example.mina.client.base;
import lombok.extern.slf4j.Slf4j;
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.filterchain.IoFilterChain;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.springframework.util.Assert;
import java.net.InetSocketAddress;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* @author dy
* @date 2021/3/10
*/
@Slf4j
public class MatrixClient {
/**
* 客户端与设备端连接的配置信息
*/
protected MatrixConnectConfig matrixConnectConfig;
/**
* 存储客户端的请求指令的队列
*/
protected LinkedBlockingQueue<MatrixCommand> requestsPool;
protected AbstractMatrixIoHandler clientHandler;
protected IoSession ioSession;
private CommandSender sender;
public MatrixClient(MatrixConnectConfig connectConfig) {
this.matrixConnectConfig = connectConfig;
this.requestsPool = new LinkedBlockingQueue<>();
this.sender = new CommandSender(this, this.requestsPool);
this.sender.start();
}
/**
*获取设备的类型
*/
public String getDeviceType() {
return matrixConnectConfig.getDeviceType();
}
/**
* 获取设备ID
*/
public String getDeviceId() {
return matrixConnectConfig.getDeviceId();
}
/**
* 获取设备名称
*/
public String getDeviceName() {
return matrixConnectConfig.getDeviceName();
}
/**
* 返回客户端对响应的Handler
* @return
*/
public void setClientHandler(AbstractMatrixIoHandler clientHandler) {
this.clientHandler = clientHandler;
}
/**
* 返回该盒子的编解码器,默认为tcp的协议,如果是其它协议,可以在自类中重写改方法
* @return
*/
protected void setFilterChain(DefaultIoFilterChainBuilder filterChainBuilder) {
filterChainBuilder.addLast("codec", new ProtocolCodecFilter(new ByteProtocolFactory()));
}
public IoFilterChain getFilterChain() {
//1、创建客户端IoService
IoConnector connector = new NioSocketConnector();
//客户端链接超时时间
connector.setConnectTimeoutMillis(30000);
//2、客户端过滤器
// setFilterChain(connector.getFilterChain());
//3、客户端IoHandler,发生消息
Assert.notNull(clientHandler, "client handler must not be null");
connector.setHandler(clientHandler);
//连接服务端
ConnectFuture connectFuture = connector.connect(new InetSocketAddress(matrixConnectConfig.getHost(),
matrixConnectConfig.getPort()));
// 等待建立连接
connectFuture.awaitUninterruptibly();
// 获取连接会话
this.ioSession = connectFuture.getSession();
this.ioSession.setAttribute(MatrixConstants.SESSION_CONFIG_NAME, matrixConnectConfig);
return ioSession.getFilterChain();
}
public void start() {
//将数据发送线程启动
this.sender.start();
}
/**
* the belows method can be overrided by its child class
*/
public void resetAttenuations() {
MatrixCommand command = MatrixCommand.builder().command(MatrixConstants.COMMAND_RESET).build();
sendCommand(command);
}
public void setAttenuation(int row, int col, int attn) {
MatrixCommand command = MatrixCommand.builder().command(MatrixConstants.COMMAND_SET_ATTN)
.col(col).row(row).attn(attn).build();
sendCommand(command);
}
public void getAttenuation(Integer row, Integer col) {
MatrixCommand command = MatrixCommand.builder().command(MatrixConstants.COMMAND_GET_ATTN)
.col(col).row(row).build();
sendCommand(command);
}
public void setOffset(int col, int offset) {
MatrixCommand command = MatrixCommand.builder().command(MatrixConstants.COMMAND_SET_OFFSET)
.col(col).offset(offset).build();
sendCommand(command);
}
public void getOffset(int col) {
MatrixCommand command = MatrixCommand.builder().command(MatrixConstants.COMMAND_GET_OFFSET)
.col(col).build();
sendCommand(command);
}
/**
* 执行命令
*/
public boolean sendCommand(MatrixCommand command) {
try{
requestsPool.put(command);
return true;
}catch (Exception e) {
log.error("error ocurred, when put a matrix command!", e);
return false;
}
}
/**
* 客户端启动的时候自动为每个客户端启动一个发送线程,将缓存的指令发送出去
*/
private static class CommandSender extends Thread {
private MatrixClient client;
private LinkedBlockingQueue<MatrixCommand> commands;
public CommandSender(MatrixClient client, LinkedBlockingQueue commands ) {
this.client = client;
this.commands = commands;
this.setName(String.format("Sender-%s-%s", client.getDeviceType(), client.getDeviceName()));
}
@Override
public void run() {
while (true) {
try {
MatrixCommand command = commands.poll();
if(command != null){
log.info("======================");
WriteFuture a =client.ioSession.write(command);
}
}catch (Exception e) {
log.error("error occurred when send the request to device, device: " + client.matrixConnectConfig, e);
}
}
}
}
}