Commit da3133945cb73e243431b4524621feaad4dc0951

Authored by wu
1 parent 10bd6938
Exists in develop

feat: 重构客户端

1,增加experiment的模块;
2,为client、experiment增加工厂,用于读取系统已有的模块;
3,统一命名为matrix;
4,为client增加client manager,由manager统一管理和创建client;
Showing 31 changed files with 828 additions and 506 deletions   Show diff stats
src/main/java/com/example/mina/client/base/AbstractClient.java
... ... @@ -1,135 +0,0 @@
1   -package com.example.mina.client.base;
2   -
3   -import lombok.extern.slf4j.Slf4j;
4   -import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
5   -import org.apache.mina.core.future.ConnectFuture;
6   -import org.apache.mina.core.service.IoConnector;
7   -import org.apache.mina.core.session.IoSession;
8   -import org.apache.mina.filter.codec.ProtocolCodecFilter;
9   -import org.apache.mina.transport.socket.nio.NioSocketConnector;
10   -
11   -import java.net.InetSocketAddress;
12   -import java.util.concurrent.LinkedBlockingQueue;
13   -import java.util.concurrent.TimeUnit;
14   -
15   -/**
16   - * @author dy
17   - * @date 2021/3/10
18   - */
19   -@Slf4j
20   -public abstract class AbstractClient {
21   -
22   - /**
23   - * 客户端与设备端连接的配置信息
24   - */
25   - protected BoxConnectConfig boxConnectConfig;
26   -
27   - /**
28   - * 存储客户端的请求指令的队列
29   - */
30   - protected LinkedBlockingQueue<BaseRequest> requestsPool;
31   -
32   - protected IoSession ioSession;
33   -
34   - private CommandSender sender;
35   -
36   - public AbstractClient(BoxConnectConfig connectConfig) {
37   - this.boxConnectConfig = connectConfig;
38   - this.requestsPool = new LinkedBlockingQueue<>();
39   - this.sender = new CommandSender(this, this.requestsPool);
40   - }
41   -
42   - /**
43   - *获取设备的类型
44   - */
45   - public String getDeviceType() {
46   - return boxConnectConfig.getDeviceType();
47   - }
48   -
49   - /**
50   - * 获取设备ID
51   - */
52   - public String getDeviceId() {
53   - return boxConnectConfig.getDeviceId();
54   - }
55   -
56   - /**
57   - * 获取设备名称
58   - */
59   - public String getDeviceName() {
60   - return boxConnectConfig.getDeviceName();
61   - }
62   -
63   - /**
64   - * 返回客户端对响应的Handler
65   - * @return
66   - */
67   - abstract protected AbstractVirtualBoxClientHandler getIoHandler();
68   -
69   - /**
70   - * 返回该盒子的编解码器,默认为tcp的协议,如果是其它协议,可以在自类中重写改方法
71   - * @return
72   - */
73   - protected void setFilterChain(DefaultIoFilterChainBuilder filterChainBuilder) {
74   - filterChainBuilder.addLast("codec", new ProtocolCodecFilter(new ByteProtocolFactory()));
75   - }
76   -
77   - public void start() {
78   - //1、创建客户端IoService
79   - IoConnector connector = new NioSocketConnector();
80   - //客户端链接超时时间
81   - connector.setConnectTimeoutMillis(30000);
82   - //2、客户端过滤器
83   - setFilterChain(connector.getFilterChain());
84   - //3、客户端IoHandler,发生消息
85   - connector.setHandler(getIoHandler());
86   -
87   - //连接服务端
88   - ConnectFuture connectFuture = connector.connect(new InetSocketAddress(boxConnectConfig.getHost(),
89   - boxConnectConfig.getPort()));
90   - // 等待建立连接
91   - connectFuture.awaitUninterruptibly();
92   - // 获取连接会话
93   - this.ioSession = connectFuture.getSession();
94   - this.ioSession.setAttribute(BoxConstants.SESSION_CONFIG_NAME, boxConnectConfig);
95   - //将数据发送线程启动
96   - this.sender.start();
97   - }
98   -
99   - /**
100   - * 执行指令
101   - * @param request
102   - */
103   - public <Q extends BaseRequest<R>, R extends BaseResponse> R execute(Q request) {
104   - requestsPool.offer(request);
105   - ioSession.write(request);
106   -
107   - return request.execute();
108   - }
109   -
110   - /**
111   - * 客户端启动的时候自动为每个客户端启动一个发送线程,将缓存的指令发送出去
112   - */
113   - private static class CommandSender extends Thread {
114   - private AbstractClient client;
115   - private LinkedBlockingQueue<BaseRequest> commands;
116   -
117   - public CommandSender(AbstractClient client, LinkedBlockingQueue commands ) {
118   - this.client = client;
119   - this.commands = commands;
120   - }
121   -
122   - @Override
123   - public void run() {
124   - while (true) {
125   - try {
126   - BaseRequest command = commands.poll(5, TimeUnit.SECONDS);
127   -
128   - client.ioSession.write(command);
129   - }catch (Exception e) {
130   - log.error("error occurred when send the request to device, device: " + client.boxConnectConfig, e);
131   - }
132   - }
133   - }
134   - }
135   -}
src/main/java/com/example/mina/client/base/AbstractClientFactory.java 0 → 100644
... ... @@ -0,0 +1,38 @@
  1 +package com.example.mina.client.base;
  2 +
  3 +import lombok.extern.slf4j.Slf4j;
  4 +import org.apache.mina.core.filterchain.IoFilterChain;
  5 +import org.apache.mina.filter.codec.ProtocolCodecFilter;
  6 +
  7 +@Slf4j
  8 +public abstract class AbstractClientFactory {
  9 +
  10 + protected MatrixDataProxy matrixDataProxy;
  11 +
  12 + public void setMatrixDataProxy(MatrixDataProxy matrixDataProxy) {
  13 + this.matrixDataProxy = matrixDataProxy;
  14 + }
  15 +
  16 + abstract public AbstractMatrixIoHandler getClientHandler();
  17 +
  18 + public void buildFilterChain(IoFilterChain ioFilterChain){
  19 + ioFilterChain.addLast("codec", new ProtocolCodecFilter(new ByteProtocolFactory()));
  20 + }
  21 +
  22 + public MatrixClient createClient(MatrixConnectConfig connectConfig){
  23 +
  24 +
  25 + try{
  26 +
  27 + MatrixClient client = new MatrixClient(connectConfig);
  28 +
  29 + //TODO initiate the client
  30 + buildFilterChain(client.getFilterChain());
  31 + client.setClientHandler(getClientHandler());
  32 + return client;
  33 + }catch (Exception e) {
  34 + log.error("Can not create the client, error: ", e);
  35 + throw new RuntimeException("create client error!");
  36 + }
  37 + }
  38 +}
... ...
src/main/java/com/example/mina/client/base/AbstractMatrixIoHandler.java 0 → 100644
... ... @@ -0,0 +1,79 @@
  1 +package com.example.mina.client.base;
  2 +
  3 +
  4 +import lombok.Data;
  5 +import lombok.extern.slf4j.Slf4j;
  6 +import org.apache.mina.core.buffer.IoBuffer;
  7 +import org.apache.mina.core.service.IoHandlerAdapter;
  8 +import org.apache.mina.core.session.IdleStatus;
  9 +import org.apache.mina.core.session.IoSession;
  10 +
  11 +/**
  12 + * @author dy
  13 + */
  14 +@Data
  15 +@Slf4j
  16 +public abstract class AbstractMatrixIoHandler extends IoHandlerAdapter {
  17 +
  18 + protected MatrixDataProxy matrixDataProxy;
  19 +
  20 + public AbstractMatrixIoHandler(MatrixDataProxy matrixDataProxy) {
  21 + this.matrixDataProxy = matrixDataProxy;
  22 + }
  23 +
  24 + protected MatrixConnectConfig getConnectConfig(IoSession session) {
  25 + Object boxConnectConfig = session.getAttribute(MatrixConstants.SESSION_CONFIG_NAME);
  26 +
  27 + if( boxConnectConfig instanceof MatrixConnectConfig) {
  28 + return (MatrixConnectConfig) boxConnectConfig;
  29 + }
  30 + return null;
  31 + }
  32 +
  33 + @Override
  34 + public void exceptionCaught(IoSession session, Throwable throwable) {
  35 + }
  36 +
  37 + @Override
  38 + public void messageReceived(IoSession session, Object message) {
  39 + if (!(message instanceof IoBuffer)) {
  40 + log.error("客户端接收到的消息不为定义的响应类! message:" + message);
  41 + throw new RuntimeException("Unsupported response message");
  42 + }
  43 +
  44 + //TODO 客户端将数据服务器的字符串数组读取出来
  45 + byte[] response = null;
  46 + log.info("the client recieved the device response, device:{}, response is: {}",
  47 + getConnectConfig(session), response);
  48 + handleCommandResponse(response);
  49 + }
  50 +
  51 + @Override
  52 + public void messageSent(IoSession session, Object message) {
  53 + }
  54 +
  55 + @Override
  56 + public void inputClosed(IoSession session) {
  57 + }
  58 +
  59 + @Override
  60 + public void sessionClosed(IoSession session) {
  61 + }
  62 +
  63 + @Override
  64 + public void sessionCreated(IoSession session) {
  65 +
  66 + }
  67 +
  68 + @Override
  69 + public void sessionIdle(IoSession session, IdleStatus status) {
  70 + log.info("the session with box was idle, device is {}", getConnectConfig(session));
  71 + }
  72 +
  73 + @Override
  74 + public void sessionOpened(IoSession session) {
  75 + log.info("the system has connected to device, device is {}", getConnectConfig(session));
  76 + }
  77 +
  78 + abstract public boolean handleCommandResponse(byte[] response);
  79 +}
... ...
src/main/java/com/example/mina/client/base/AbstractVirtualBoxClientHandler.java
... ... @@ -1,79 +0,0 @@
1   -package com.example.mina.client.base;
2   -
3   -
4   -import lombok.Data;
5   -import lombok.extern.slf4j.Slf4j;
6   -import org.apache.mina.core.buffer.IoBuffer;
7   -import org.apache.mina.core.service.IoHandlerAdapter;
8   -import org.apache.mina.core.session.IdleStatus;
9   -import org.apache.mina.core.session.IoSession;
10   -
11   -/**
12   - * @author dy
13   - */
14   -@Data
15   -@Slf4j
16   -public abstract class AbstractVirtualBoxClientHandler extends IoHandlerAdapter {
17   -
18   - protected DownloadListener downloadListener;
19   -
20   - public AbstractVirtualBoxClientHandler(DownloadListener downloadListener) {
21   - this.downloadListener = downloadListener;
22   - }
23   -
24   - protected BoxConnectConfig getConnectConfig(IoSession session) {
25   - Object boxConnectConfig = session.getAttribute(BoxConstants.SESSION_CONFIG_NAME);
26   -
27   - if( boxConnectConfig instanceof BoxConnectConfig) {
28   - return (BoxConnectConfig) boxConnectConfig;
29   - }
30   - return null;
31   - }
32   -
33   - @Override
34   - public void exceptionCaught(IoSession session, Throwable throwable) {
35   - }
36   -
37   - @Override
38   - public void messageReceived(IoSession session, Object message) {
39   - if (!(message instanceof IoBuffer)) {
40   - log.error("客户端接收到的消息不为定义的响应类! message:" + message);
41   - throw new RuntimeException("Unsupported response message");
42   - }
43   -
44   - //TODO 客户端将数据服务器的字符串数组读取出来
45   - byte[] response = null;
46   - log.info("the client recieved the device response, device:{}, response is: {}",
47   - getConnectConfig(session), response);
48   - handleCommandResponse(response);
49   - }
50   -
51   - @Override
52   - public void messageSent(IoSession session, Object message) {
53   - }
54   -
55   - @Override
56   - public void inputClosed(IoSession session) {
57   - }
58   -
59   - @Override
60   - public void sessionClosed(IoSession session) {
61   - }
62   -
63   - @Override
64   - public void sessionCreated(IoSession session) {
65   -
66   - }
67   -
68   - @Override
69   - public void sessionIdle(IoSession session, IdleStatus status) {
70   - log.info("the session with box was idle, device is {}", getConnectConfig(session));
71   - }
72   -
73   - @Override
74   - public void sessionOpened(IoSession session) {
75   - log.info("the system has connected to device, device is {}", getConnectConfig(session));
76   - }
77   -
78   - abstract public boolean handleCommandResponse(byte[] response);
79   -}
src/main/java/com/example/mina/client/base/BaseRequest.java
... ... @@ -1,28 +0,0 @@
1   -package com.example.mina.client.base;
2   -
3   -import lombok.AllArgsConstructor;
4   -import lombok.Data;
5   -import lombok.NoArgsConstructor;
6   -import lombok.experimental.SuperBuilder;
7   -
8   -/**
9   - * @author dy
10   - * @date 2021/3/12
11   - */
12   -@SuperBuilder
13   -@Data
14   -@AllArgsConstructor
15   -@NoArgsConstructor
16   -public abstract class BaseRequest <R extends BaseResponse>{
17   -
18   - protected AbstractClient client;
19   -
20   - /**
21   - * 指令名称
22   - * @return
23   - */
24   - abstract public String getCommandName();
25   -
26   - abstract public R execute();
27   -
28   -}
src/main/java/com/example/mina/client/base/BaseResponse.java
... ... @@ -1,9 +0,0 @@
1   -package com.example.mina.client.base;
2   -
3   -import lombok.Data;
4   -
5   -@Data
6   -public abstract class BaseResponse {
7   - private String responseType;
8   -
9   -}
src/main/java/com/example/mina/client/base/BoxConnectConfig.java
... ... @@ -1,30 +0,0 @@
1   -package com.example.mina.client.base;
2   -
3   -
4   -import lombok.Builder;
5   -import lombok.Data;
6   -
7   -import java.util.Map;
8   -
9   -@Data
10   -@Builder
11   -public class BoxConnectConfig {
12   -
13   - private String deviceId;
14   -
15   - private String deviceType;
16   -
17   - private String deviceName;
18   -
19   - private String host;
20   -
21   - private Integer port;
22   -
23   - private Integer rows;
24   -
25   - private Integer cols;
26   -
27   - private Integer maxAttenuation;
28   -
29   - private Map<String, Object> options;
30   -}
src/main/java/com/example/mina/client/base/BoxConstants.java
... ... @@ -1,13 +0,0 @@
1   -package com.example.mina.client.base;
2   -
3   -public class BoxConstants {
4   -
5   - public static final String SESSION_CONFIG_NAME = "SESSION_CONFIG";
6   -
7   - public static final String DEVICE_TYPE_AERO = "AEROFLEX";
8   -
9   - public static final String DEVICE_TYPE_LTE3000 = "LTE3000";
10   -
11   - public static final String COMMAND_RESET = "RESET";
12   -
13   -}
src/main/java/com/example/mina/client/base/ClientManager.java 0 → 100644
... ... @@ -0,0 +1,57 @@
  1 +package com.example.mina.client.base;
  2 +
  3 +import lombok.extern.slf4j.Slf4j;
  4 +import org.springframework.stereotype.Component;
  5 +import org.springframework.util.Assert;
  6 +
  7 +import java.util.Collections;
  8 +import java.util.Map;
  9 +import java.util.Set;
  10 +
  11 +@Component
  12 +@Slf4j
  13 +public class ClientManager<T extends AbstractClientFactory> {
  14 +
  15 + private Map<String, T> clientFactories;
  16 +
  17 + private Map<String, MatrixClient> clients;
  18 +
  19 + public ClientManager() {
  20 + this.clients = Collections.emptyMap();
  21 + }
  22 +
  23 + public void setClientFactories(Map<String, T> clientFactories) {
  24 + this.clientFactories = clientFactories;
  25 + }
  26 +
  27 + public Set<String> getSupportedMatrix() {
  28 + return this.clientFactories.keySet();
  29 + }
  30 +
  31 + public T getClientFactory(String name) {
  32 + return this.clientFactories.get(name);
  33 + }
  34 +
  35 + public MatrixClient getOrCreateClient(MatrixConnectConfig connectConfig) {
  36 + Assert.notNull(connectConfig, "matrix connect config is null");
  37 + Assert.hasLength(connectConfig.getDeviceId(), "deviceId can not be empty");
  38 +
  39 + String deviceId = connectConfig.getDeviceId();
  40 +
  41 + if(clients.containsKey(deviceId)) {
  42 + return clients.get(deviceId);
  43 + }
  44 +
  45 + String deviceType = connectConfig.getDeviceType();
  46 + //check the device type is supported by system
  47 + T clientFactory = getClientFactory(deviceType);
  48 + if(clientFactory == null) {
  49 + log.error("the matrix type is not supported by the system! matrix type is: " + deviceType);
  50 + throw new RuntimeException("the matrix type is not supported by the system! matrix type is: " + deviceType);
  51 + }
  52 + MatrixClient client = clientFactory.createClient(connectConfig);
  53 +
  54 + clients.put(deviceId, client);
  55 + return client;
  56 + }
  57 +}
... ...
src/main/java/com/example/mina/client/base/DownloadListener.java
... ... @@ -1,17 +0,0 @@
1   -package com.example.mina.client.base;
2   -
3   -public class DownloadListener {
4   -
5   - /**
6   - * 当从设备端将数据同步到客户端的时候,将数据直接存入数据中,
7   - * 该类会在ioHandler及其子类中被调用
8   - * 设置attenuation的值,并将其存储到matrix_data的数据表中
9   - */
10   - public void setAttenuation(String deviceId, int row, int col, int val) {
11   - //TODO 实现将数据写入到数据库的逻辑
12   - }
13   -
14   - private String getCurrentThreadId() {
15   - return Thread.currentThread().getName();
16   - }
17   -}
src/main/java/com/example/mina/client/base/Experiment.java 0 → 100644
... ... @@ -0,0 +1,81 @@
  1 +package com.example.mina.client.base;
  2 +
  3 +import lombok.extern.slf4j.Slf4j;
  4 +
  5 +import java.time.ZonedDateTime;
  6 +
  7 +@Slf4j
  8 +public abstract class Experiment <T extends MatrixClient, O extends ExperimentOptions> extends Thread{
  9 +
  10 + protected T client;
  11 + protected O options;
  12 + protected int iteratedTimes;
  13 +
  14 + //this is used for stop the task manually
  15 + private volatile boolean isRunning;
  16 +
  17 + protected ExperimentProgressListner progressListner;
  18 +
  19 + public Experiment() {
  20 + }
  21 +
  22 + public Experiment(T client, O options) {
  23 + this.client = client;
  24 + this.options = options;
  25 + this.setName(getExperimentTaskName());
  26 + this.iteratedTimes = 0;
  27 + }
  28 +
  29 + private String getExperimentTaskName() {
  30 + return String.format("%s-%s", this.client.getDeviceType(), this.client.getDeviceName());
  31 + }
  32 +
  33 + public void setClient(T t) {
  34 + this.client = t;
  35 + }
  36 + public void setOptions(O options) {
  37 + this.options = options;
  38 + }
  39 + public void setProgressListner(ExperimentProgressListner experimentProgressListner) {
  40 + this.progressListner = experimentProgressListner;
  41 + }
  42 +
  43 + public String getExperimentId() {
  44 + return options.getExperimentId();
  45 + }
  46 +
  47 + abstract public String getExperimentName();
  48 +
  49 + abstract public void schedule();
  50 +
  51 + public void sendCommand(MatrixCommand command){
  52 + client.sendCommand(command);
  53 + }
  54 +
  55 + public boolean isContinue() {
  56 + return iteratedTimes < options.getMaxIterCount() &&
  57 + ZonedDateTime.now().isBefore(options.getFinishAt()) &&
  58 + isRunning;
  59 + }
  60 +
  61 + public void start() {
  62 +
  63 + progressListner.startExperiment(getExperimentId());
  64 + while (isContinue()) {
  65 + try{
  66 + schedule();
  67 + iteratedTimes++ ;
  68 + progressListner.updateProgress(getExperimentId(), iteratedTimes);
  69 + Thread.sleep(options.getPause() * 1000);
  70 + }catch (Exception e) {
  71 + log.info("thread sleep error", e);
  72 + }
  73 + }
  74 + progressListner.stopExperiment(getExperimentId());
  75 + }
  76 +
  77 + public void finish() {
  78 + isRunning = false;
  79 + }
  80 +
  81 +}
... ...
src/main/java/com/example/mina/client/base/ExperimentFactory.java 0 → 100644
... ... @@ -0,0 +1,62 @@
  1 +package com.example.mina.client.base;
  2 +
  3 +import lombok.extern.slf4j.Slf4j;
  4 +import org.springframework.util.CollectionUtils;
  5 +
  6 +import java.util.Map;
  7 +
  8 +@Slf4j
  9 +public abstract class ExperimentFactory < T extends Experiment> {
  10 +
  11 + private Map<String, T> experiments;
  12 +
  13 + public ExperimentFactory() {
  14 + experiments = CollectionUtils.newHashMap(10);
  15 + }
  16 +
  17 + public T getExperiment(String matrixId) {
  18 +
  19 + return experiments.getOrDefault(matrixId, null);
  20 + }
  21 +
  22 + public void stopExperiment(String matrixId) {
  23 + T t = getExperiment(matrixId);
  24 +
  25 + if( t != null && t.isContinue()){
  26 + t.finish();
  27 + experiments.remove(matrixId);
  28 + }
  29 + }
  30 +
  31 + abstract public boolean isSupport();
  32 +
  33 + abstract Class<T> getClazz();
  34 +
  35 + public ExperimentProgressListner getListner() {
  36 + return null; //TODO 生成任务进度更新监听器
  37 + }
  38 +
  39 + public T createExperiment(MatrixClient client, ExperimentOptions options){
  40 +
  41 + if(isSupport()) {
  42 + log.error("the experiment is not supported by the matrix! matrix id: " + client.getDeviceId());
  43 + throw new RuntimeException("the experiment is not supported by the matrix! matrix id: " + client.getDeviceId());
  44 + }
  45 +
  46 + Class<T> tClass = getClazz();
  47 +
  48 + try{
  49 +
  50 + T t = tClass.newInstance();
  51 +
  52 + t.setClient(client);
  53 + t.setOptions(options);
  54 + t.setProgressListner(getListner());
  55 + return t;
  56 + }catch (Exception e) {
  57 + log.error("create a experiment task is failed, matrix id: " + client.getDeviceId());
  58 + throw new RuntimeException("create a experiment task is failed, matrix id: " + client.getDeviceId());
  59 + }
  60 + }
  61 +
  62 +}
... ...
src/main/java/com/example/mina/client/base/ExperimentOptions.java 0 → 100644
... ... @@ -0,0 +1,24 @@
  1 +package com.example.mina.client.base;
  2 +
  3 +import lombok.Data;
  4 +
  5 +import java.time.ZonedDateTime;
  6 +
  7 +@Data
  8 +public abstract class ExperimentOptions {
  9 + private String experimentId;
  10 +
  11 + private int startAttenuation;
  12 +
  13 + private int endAttenuation;
  14 +
  15 + private int step;
  16 +
  17 + private int pause;
  18 +
  19 + private int maxIterCount;
  20 +
  21 + private ZonedDateTime finishAt;
  22 +
  23 + abstract boolean checkOptions();
  24 +}
... ...
src/main/java/com/example/mina/client/base/ExperimentProgressListner.java 0 → 100644
... ... @@ -0,0 +1,21 @@
  1 +package com.example.mina.client.base;
  2 +
  3 +
  4 +public interface ExperimentProgressListner {
  5 +
  6 + /**
  7 + * start the experiment
  8 + */
  9 + void startExperiment(String experimentId);
  10 +
  11 + /**
  12 + * update iteration
  13 + */
  14 + void updateProgress(String experimentId, int iters);
  15 +
  16 +
  17 + /**
  18 + * Stop the experiment
  19 + */
  20 + void stopExperiment(String experimentId);
  21 +}
... ...
src/main/java/com/example/mina/client/base/MatrixClient.java 0 → 100644
... ... @@ -0,0 +1,175 @@
  1 +package com.example.mina.client.base;
  2 +
  3 +import lombok.extern.slf4j.Slf4j;
  4 +import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
  5 +import org.apache.mina.core.filterchain.IoFilterChain;
  6 +import org.apache.mina.core.future.ConnectFuture;
  7 +import org.apache.mina.core.service.IoConnector;
  8 +import org.apache.mina.core.session.IoSession;
  9 +import org.apache.mina.filter.codec.ProtocolCodecFilter;
  10 +import org.apache.mina.transport.socket.nio.NioSocketConnector;
  11 +import org.springframework.util.Assert;
  12 +
  13 +import java.net.InetSocketAddress;
  14 +import java.util.concurrent.LinkedBlockingQueue;
  15 +import java.util.concurrent.TimeUnit;
  16 +
  17 +/**
  18 + * @author dy
  19 + * @date 2021/3/10
  20 + */
  21 +@Slf4j
  22 +public class MatrixClient {
  23 +
  24 + /**
  25 + * 客户端与设备端连接的配置信息
  26 + */
  27 + protected MatrixConnectConfig matrixConnectConfig;
  28 +
  29 + /**
  30 + * 存储客户端的请求指令的队列
  31 + */
  32 + protected LinkedBlockingQueue<MatrixCommand> requestsPool;
  33 +
  34 + protected AbstractMatrixIoHandler clientHandler;
  35 +
  36 + protected IoSession ioSession;
  37 +
  38 + private CommandSender sender;
  39 +
  40 + public MatrixClient(MatrixConnectConfig connectConfig) {
  41 + this.matrixConnectConfig = connectConfig;
  42 + this.requestsPool = new LinkedBlockingQueue<>();
  43 + this.sender = new CommandSender(this, this.requestsPool);
  44 +
  45 + this.sender.start();
  46 + }
  47 +
  48 + /**
  49 + *获取设备的类型
  50 + */
  51 + public String getDeviceType() {
  52 + return matrixConnectConfig.getDeviceType();
  53 + }
  54 +
  55 + /**
  56 + * 获取设备ID
  57 + */
  58 + public String getDeviceId() {
  59 + return matrixConnectConfig.getDeviceId();
  60 + }
  61 +
  62 + /**
  63 + * 获取设备名称
  64 + */
  65 + public String getDeviceName() {
  66 + return matrixConnectConfig.getDeviceName();
  67 + }
  68 +
  69 + /**
  70 + * 返回客户端对响应的Handler
  71 + * @return
  72 + */
  73 + public void setClientHandler(AbstractMatrixIoHandler clientHandler) {
  74 + this.clientHandler = clientHandler;
  75 + }
  76 +
  77 +
  78 + /**
  79 + * 返回该盒子的编解码器,默认为tcp的协议,如果是其它协议,可以在自类中重写改方法
  80 + * @return
  81 + */
  82 + protected void setFilterChain(DefaultIoFilterChainBuilder filterChainBuilder) {
  83 + filterChainBuilder.addLast("codec", new ProtocolCodecFilter(new ByteProtocolFactory()));
  84 + }
  85 +
  86 + public IoFilterChain getFilterChain() {
  87 + return ioSession.getFilterChain();
  88 + }
  89 +
  90 + public void start() {
  91 + //1、创建客户端IoService
  92 + IoConnector connector = new NioSocketConnector();
  93 + //客户端链接超时时间
  94 + connector.setConnectTimeoutMillis(30000);
  95 + //2、客户端过滤器
  96 +// setFilterChain(connector.getFilterChain());
  97 + //3、客户端IoHandler,发生消息
  98 + Assert.notNull(clientHandler, "client handler must not be null");
  99 + connector.setHandler(clientHandler);
  100 +
  101 + //连接服务端
  102 + ConnectFuture connectFuture = connector.connect(new InetSocketAddress(matrixConnectConfig.getHost(),
  103 + matrixConnectConfig.getPort()));
  104 + // 等待建立连接
  105 + connectFuture.awaitUninterruptibly();
  106 + // 获取连接会话
  107 + this.ioSession = connectFuture.getSession();
  108 + this.ioSession.setAttribute(MatrixConstants.SESSION_CONFIG_NAME, matrixConnectConfig);
  109 + //将数据发送线程启动
  110 + this.sender.start();
  111 + }
  112 +
  113 + /**
  114 + * the belows method can be overrided by its child class
  115 + */
  116 + public void resetAttenuations() {
  117 + MatrixCommand command = MatrixCommand.builder().command(MatrixConstants.COMMAND_RESET).build();
  118 +
  119 + sendCommand(command);
  120 + }
  121 +
  122 + public void setAttenuation(int row, int col, int attn) {
  123 + MatrixCommand command = MatrixCommand.builder().command(MatrixConstants.COMMAND_SET_ATTN)
  124 + .col(col).row(row).attn(attn).build();
  125 +
  126 + sendCommand(command);
  127 + }
  128 +
  129 + public void setOffset(int row, int offset) {
  130 + MatrixCommand command = MatrixCommand.builder().command(MatrixConstants.COMMAND_SET_OFFSET)
  131 + .row(row).offset(offset).build();
  132 +
  133 + sendCommand(command);
  134 + }
  135 +
  136 + /**
  137 + * 执行命令
  138 + */
  139 + public boolean sendCommand(MatrixCommand command) {
  140 + try{
  141 + requestsPool.put(command);
  142 + return true;
  143 + }catch (Exception e) {
  144 + log.error("error ocurred, when put a matrix command!", e);
  145 + return false;
  146 + }
  147 + }
  148 +
  149 + /**
  150 + * 客户端启动的时候自动为每个客户端启动一个发送线程,将缓存的指令发送出去
  151 + */
  152 + private static class CommandSender extends Thread {
  153 + private MatrixClient client;
  154 + private LinkedBlockingQueue<MatrixCommand> commands;
  155 +
  156 + public CommandSender(MatrixClient client, LinkedBlockingQueue commands ) {
  157 + this.client = client;
  158 + this.commands = commands;
  159 + this.setName(String.format("Sender-%s-%s", client.getDeviceType(), client.getDeviceName()));
  160 + }
  161 +
  162 + @Override
  163 + public void run() {
  164 + while (true) {
  165 + try {
  166 + MatrixCommand command = commands.poll(5, TimeUnit.SECONDS);
  167 +
  168 + client.ioSession.write(command);
  169 + }catch (Exception e) {
  170 + log.error("error occurred when send the request to device, device: " + client.matrixConnectConfig, e);
  171 + }
  172 + }
  173 + }
  174 + }
  175 +}
... ...
src/main/java/com/example/mina/client/base/MatrixCommand.java 0 → 100644
... ... @@ -0,0 +1,21 @@
  1 +package com.example.mina.client.base;
  2 +
  3 +import lombok.Builder;
  4 +import lombok.Data;
  5 +
  6 +@Data
  7 +@Builder
  8 +public class MatrixCommand {
  9 +
  10 + private String matrixId;
  11 +
  12 + private Integer command;
  13 +
  14 + private Integer row;
  15 +
  16 + private Integer col;
  17 +
  18 + private Integer attn;
  19 +
  20 + private Integer offset;
  21 +}
... ...
src/main/java/com/example/mina/client/base/MatrixConnectConfig.java 0 → 100644
... ... @@ -0,0 +1,30 @@
  1 +package com.example.mina.client.base;
  2 +
  3 +
  4 +import lombok.Builder;
  5 +import lombok.Data;
  6 +
  7 +import java.util.Map;
  8 +
  9 +@Data
  10 +@Builder
  11 +public class MatrixConnectConfig {
  12 +
  13 + private String deviceId;
  14 +
  15 + private String deviceType;
  16 +
  17 + private String deviceName;
  18 +
  19 + private String host;
  20 +
  21 + private Integer port;
  22 +
  23 + private Integer rows;
  24 +
  25 + private Integer cols;
  26 +
  27 + private Integer maxAttenuation;
  28 +
  29 + private Map<String, Object> options;
  30 +}
... ...
src/main/java/com/example/mina/client/base/MatrixConstants.java 0 → 100644
... ... @@ -0,0 +1,23 @@
  1 +package com.example.mina.client.base;
  2 +
  3 +public class MatrixConstants {
  4 +
  5 + public static final String SESSION_CONFIG_NAME = "SESSION_CONFIG";
  6 +
  7 + public static final String MATRIX_TYPE_AERO = "AEROFLEX";
  8 +
  9 + public static final String MATRIX_TYPE_LTE3000 = "LTE3000";
  10 +
  11 + public static final int COMMAND_RESET = 1;
  12 +
  13 + public static final int COMMAND_SET_ATTN = 2;
  14 +
  15 + public static final int COMMAND_GET_ATTN = 3;
  16 +
  17 + public static final int COMMAND_SET_OFFSET = 4;
  18 +
  19 + public static final int COMMAND_GET_OFFSET = 5;
  20 +
  21 + public static final String EXPERIMENT_HANDOVER = "HANDOVER";
  22 +
  23 +}
... ...
src/main/java/com/example/mina/client/base/MatrixDataProxy.java 0 → 100644
... ... @@ -0,0 +1,33 @@
  1 +package com.example.mina.client.base;
  2 +
  3 +public class MatrixDataProxy {
  4 +
  5 + /**
  6 + * 当从设备端将数据同步到客户端的时候,将数据直接存入数据中,
  7 + * 该类会在ioHandler及其子类中被调用
  8 + * 设置attenuation的值,并将其存储到matrix_data的数据表中
  9 + */
  10 + public void setAttenuation(String deviceId, int row, int col, int val) {
  11 + //TODO 实现将数据写入到数据库的逻辑
  12 + }
  13 +
  14 + public int getAttenuation(String deviceId, int row, int col) {
  15 + return -1;
  16 + }
  17 +
  18 + public void setOffset(int row, int val) {
  19 +
  20 + }
  21 +
  22 + public int getOffset(int row) {
  23 + return -1;
  24 + }
  25 +
  26 + public void resetAttenuation() {
  27 +
  28 + }
  29 +
  30 + private String getCurrentThreadId() {
  31 + return Thread.currentThread().getName();
  32 + }
  33 +}
... ...
src/main/java/com/example/mina/client/base/MatrixResponse.java 0 → 100644
... ... @@ -0,0 +1,4 @@
  1 +package com.example.mina.client.base;
  2 +
  3 +public class MatrixResponse {
  4 +}
... ...
src/main/java/com/example/mina/client/base/request/ResetRequest.java
... ... @@ -1,17 +0,0 @@
1   -package com.example.mina.client.base.request;
2   -
3   -import com.example.mina.client.base.BaseRequest;
4   -import com.example.mina.client.base.BoxConstants;
5   -import com.example.mina.client.base.response.ResetResponse;
6   -
7   -public class ResetRequest extends BaseRequest<ResetResponse> {
8   - @Override
9   - public String getCommandName() {
10   - return BoxConstants.COMMAND_RESET;
11   - }
12   -
13   - @Override
14   - public ResetResponse execute() {
15   - return null;
16   - }
17   -}
src/main/java/com/example/mina/client/base/response/ResetResponse.java
... ... @@ -1,7 +0,0 @@
1   -package com.example.mina.client.base.response;
2   -
3   -import com.example.mina.client.base.BaseResponse;
4   -
5   -public class ResetResponse extends BaseResponse {
6   -
7   -}
src/main/java/com/example/mina/client/box/aeroflex/AeroFlexClientFactory.java 0 → 100644
... ... @@ -0,0 +1,15 @@
  1 +package com.example.mina.client.box.aeroflex;
  2 +
  3 +import com.example.mina.client.base.AbstractClientFactory;
  4 +import com.example.mina.client.base.AbstractMatrixIoHandler;
  5 +import com.example.mina.client.base.MatrixConstants;
  6 +import org.springframework.stereotype.Component;
  7 +
  8 +@Component(MatrixConstants.MATRIX_TYPE_AERO)
  9 +public class AeroFlexClientFactory extends AbstractClientFactory {
  10 +
  11 + @Override
  12 + public AbstractMatrixIoHandler getClientHandler() {
  13 + return null;
  14 + }
  15 +}
... ...
src/main/java/com/example/mina/client/box/aeroflex/AeroflexClient.java
... ... @@ -1,23 +0,0 @@
1   -package com.example.mina.client.box.aeroflex;
2   -
3   -import com.example.mina.client.base.AbstractClient;
4   -import com.example.mina.client.base.AbstractVirtualBoxClientHandler;
5   -import com.example.mina.client.base.BoxConnectConfig;
6   -
7   -/**
8   - * @author dy
9   - * @date 2021/3/10
10   - */
11   -public class AeroflexClient extends AbstractClient {
12   -
13   - public AeroflexClient(BoxConnectConfig boxConnectConfig) {
14   - super(boxConnectConfig);
15   - }
16   -
17   - @Override
18   - protected AbstractVirtualBoxClientHandler getIoHandler() {
19   - return null;
20   - }
21   -
22   -
23   -}
src/main/java/com/example/mina/client/box/aeroflex/AeroflexClientHandler.java
... ... @@ -1,22 +0,0 @@
1   -package com.example.mina.client.box.aeroflex;
2   -
3   -import com.example.mina.client.base.AbstractVirtualBoxClientHandler;
4   -import com.example.mina.client.base.DownloadListener;
5   -
6   -/**
7   - * @author dy
8   - * @date 2021/3/10
9   - */
10   -public class AeroflexClientHandler extends AbstractVirtualBoxClientHandler {
11   -
12   - public AeroflexClientHandler(DownloadListener downloadListener) {
13   - super(downloadListener);
14   - }
15   -
16   - @Override
17   - public boolean handleCommandResponse(byte[] response) {
18   -
19   - return false;
20   - }
21   -
22   -}
src/main/java/com/example/mina/client/box/aeroflex/AeroflexClientIoHandler.java 0 → 100644
... ... @@ -0,0 +1,22 @@
  1 +package com.example.mina.client.box.aeroflex;
  2 +
  3 +import com.example.mina.client.base.AbstractMatrixIoHandler;
  4 +import com.example.mina.client.base.MatrixDataProxy;
  5 +
  6 +/**
  7 + * @author dy
  8 + * @date 2021/3/10
  9 + */
  10 +public class AeroflexClientIoHandler extends AbstractMatrixIoHandler {
  11 +
  12 + public AeroflexClientIoHandler(MatrixDataProxy matrixDataProxy) {
  13 + super(matrixDataProxy);
  14 + }
  15 +
  16 + @Override
  17 + public boolean handleCommandResponse(byte[] response) {
  18 +
  19 + return false;
  20 + }
  21 +
  22 +}
... ...
src/main/java/com/example/mina/client/box/lte3000/Lte3000Client.java
... ... @@ -1,21 +0,0 @@
1   -package com.example.mina.client.box.lte3000;
2   -
3   -import com.example.mina.client.base.AbstractClient;
4   -import com.example.mina.client.base.AbstractVirtualBoxClientHandler;
5   -import com.example.mina.client.base.BoxConnectConfig;
6   -
7   -/**
8   - * @author dy
9   - * @date 2021/3/10
10   - */
11   -public class Lte3000Client extends AbstractClient {
12   - public Lte3000Client(BoxConnectConfig connectConfig) {
13   - super(connectConfig);
14   - }
15   -
16   - @Override
17   - protected AbstractVirtualBoxClientHandler getIoHandler() {
18   - return null;
19   - }
20   -
21   -}
src/main/java/com/example/mina/client/box/lte3000/Lte3000ClientFactory.java 0 → 100644
... ... @@ -0,0 +1,15 @@
  1 +package com.example.mina.client.box.lte3000;
  2 +
  3 +import com.example.mina.client.base.AbstractClientFactory;
  4 +import com.example.mina.client.base.AbstractMatrixIoHandler;
  5 +import com.example.mina.client.base.MatrixConstants;
  6 +import org.springframework.stereotype.Component;
  7 +
  8 +@Component(MatrixConstants.MATRIX_TYPE_LTE3000)
  9 +public class Lte3000ClientFactory extends AbstractClientFactory {
  10 +
  11 + @Override
  12 + public AbstractMatrixIoHandler getClientHandler() {
  13 + return null;
  14 + }
  15 +}
... ...
src/main/java/com/example/mina/client/box/lte3000/Lte3000ClientHandler.java
No preview for this file type
src/main/java/com/example/mina/client/box/lte3000/Lte3000ClientIoHandler.java 0 → 100644
No preview for this file type
src/main/java/com/example/mina/client/experiment/HandoverExperiment.java 0 → 100644
... ... @@ -0,0 +1,23 @@
  1 +package com.example.mina.client.experiment;
  2 +
  3 +import com.example.mina.client.base.MatrixClient;
  4 +import com.example.mina.client.base.MatrixConstants;
  5 +import com.example.mina.client.base.Experiment;
  6 +import com.example.mina.client.base.ExperimentOptions;
  7 +
  8 +public class HandoverExperiment extends Experiment {
  9 +
  10 + public HandoverExperiment(MatrixClient client, ExperimentOptions options) {
  11 + super(client, options);
  12 + }
  13 +
  14 + @Override
  15 + public String getExperimentName() {
  16 + return MatrixConstants.EXPERIMENT_HANDOVER;
  17 + }
  18 +
  19 + @Override
  20 + public void schedule() {
  21 + }
  22 +
  23 +}
... ...