MatrixClient.java
5.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package com.example.mina.client.base;
import lombok.extern.slf4j.Slf4j;
import org.apache.mina.core.filterchain.IoFilterChain;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.springframework.util.Assert;
import java.net.InetSocketAddress;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @author dy
* @date 2021/3/10
*/
@Slf4j
public class MatrixClient {
/**
* 客户端与设备端连接的配置信息
*/
protected MatrixConnectConfig matrixConnectConfig;
/**
* 存储客户端的请求指令的队列
*/
protected LinkedBlockingQueue<MatrixCommand> requestsPool;
/**
* the data proxy
*/
private MatrixDataProxy matrixDataProxy;
protected AbstractMatrixIoHandler clientHandler;
protected IoSession ioSession;
private CommandSender sender;
public MatrixClient(MatrixConnectConfig connectConfig) {
this.matrixConnectConfig = connectConfig;
this.requestsPool = new LinkedBlockingQueue<>();
this.sender = new CommandSender(this, this.requestsPool);
}
/**
*获取设备的类型
*/
public String getDeviceType() {
return matrixConnectConfig.getDeviceType();
}
/**
* 获取设备ID
*/
public String getDeviceId() {
return matrixConnectConfig.getDeviceId();
}
/**
* 获取设备名称
*/
public String getDeviceName() {
return matrixConnectConfig.getDeviceName();
}
/**
* 返回客户端对响应的Handler
* @return
*/
public void setClientHandler(AbstractMatrixIoHandler clientHandler) {
this.clientHandler = clientHandler;
}
/**
* get the io filter chain
*/
public IoFilterChain getFilterChain() {
return ioSession.getFilterChain();
}
/**
* Set the data proxy
* @return
*/
public void setMatrixDataProxy(MatrixDataProxy matrixDataProxy) {
this.matrixDataProxy = matrixDataProxy;
}
public void connect() {
//1、创建客户端IoService
IoConnector connector = new NioSocketConnector();
//客户端链接超时时间
connector.setConnectTimeoutMillis(30000);
//2、客户端过滤器
// setFilterChain(connector.getFilterChain());
//3、客户端IoHandler,发生消息
Assert.notNull(clientHandler, "client handler must not be null");
connector.setHandler(clientHandler);
//连接服务端
ConnectFuture connectFuture = connector.connect(new InetSocketAddress(matrixConnectConfig.getHost(),
matrixConnectConfig.getPort()));
// 等待建立连接
connectFuture.awaitUninterruptibly();
// 获取连接会话
this.ioSession = connectFuture.getSession();
this.ioSession.setAttribute(MatrixConstants.SESSION_CONFIG_NAME, matrixConnectConfig);
this.ioSession.setAttribute(MatrixConstants.SESSION_DATA_PROXY_NAME, matrixDataProxy);
}
public void start() {
//将数据发送线程启动
this.sender.start();
}
/**
* the belows method can be overrided by its child class
*/
public void resetAttenuations() {
MatrixCommand command = MatrixCommand.builder().command(MatrixConstants.COMMAND_RESET).build();
sendCommand(command);
}
public void setAttenuation(int row, int col, int attn) {
MatrixCommand command = MatrixCommand.builder().command(MatrixConstants.COMMAND_SET_ATTN)
.col(col).row(row).attn(attn).build();
sendCommand(command);
}
public void getAttenuation(Integer row, Integer col) {
MatrixCommand command = MatrixCommand.builder().command(MatrixConstants.COMMAND_GET_ATTN)
.col(col).row(row).build();
sendCommand(command);
}
public void setOffset(int col, int offset) {
MatrixCommand command = MatrixCommand.builder().command(MatrixConstants.COMMAND_SET_OFFSET)
.col(col).offset(offset).build();
sendCommand(command);
}
public void getOffset(int col) {
MatrixCommand command = MatrixCommand.builder().command(MatrixConstants.COMMAND_GET_OFFSET)
.col(col).build();
sendCommand(command);
}
/**
* 执行命令
*/
public boolean sendCommand(MatrixCommand command) {
try{
requestsPool.put(command);
return true;
}catch (Exception e) {
log.error("error ocurred, when put a matrix command!", e);
return false;
}
}
/**
* 客户端启动的时候自动为每个客户端启动一个发送线程,将缓存的指令发送出去
*/
private static class CommandSender extends Thread {
private MatrixClient client;
private LinkedBlockingQueue<MatrixCommand> commands;
public CommandSender(MatrixClient client, LinkedBlockingQueue commands ) {
this.client = client;
this.commands = commands;
this.setName(String.format("Sender-%s-%s", client.getDeviceType(), client.getDeviceName()));
}
@Override
public void run() {
while (true) {
try {
MatrixCommand command = commands.poll();
if(command != null){
log.info("======================");
WriteFuture a =client.ioSession.write(command);
}
}catch (Exception e) {
log.error("error occurred when send the request to device, device: " + client.matrixConnectConfig, e);
}
}
}
}
}