From 6c09d340798519959be6a37852c36682547be2be Mon Sep 17 00:00:00 2001 From: kaizhi.wu Date: Tue, 16 Mar 2021 00:51:06 +0800 Subject: [PATCH] feat: 重构客户端 1,完善client端的配置信息; 2,设计将同步数据从内存转入数据库的设计; 3,为客户端增加一个同步线程自动同步数据; --- src/main/java/com/example/mina/client/base/AbstractClient.java | 80 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------ src/main/java/com/example/mina/client/base/AbstractVirtualBoxClientHandler.java | 17 ++++++----------- src/main/java/com/example/mina/client/base/BoxConnectConfig.java | 6 ++++++ src/main/java/com/example/mina/client/base/DownloadListener.java | 17 +++++++++++++++++ src/main/java/com/example/mina/client/box/aeroflex/AeroflexClient.java | 12 ------------ src/main/java/com/example/mina/client/box/aeroflex/AeroflexClientHandler.java | 6 +++++- src/main/java/com/example/mina/client/box/lte3000/Lte3000Client.java | 10 ---------- src/main/java/com/example/mina/client/box/lte3000/Lte3000ClientHandler.java | 40 +++++++++++++++++++++------------------- 8 files changed, 111 insertions(+), 77 deletions(-) create mode 100644 src/main/java/com/example/mina/client/base/DownloadListener.java diff --git a/src/main/java/com/example/mina/client/base/AbstractClient.java b/src/main/java/com/example/mina/client/base/AbstractClient.java index 6690654..8bac343 100644 --- a/src/main/java/com/example/mina/client/base/AbstractClient.java +++ b/src/main/java/com/example/mina/client/base/AbstractClient.java @@ -1,20 +1,22 @@ package com.example.mina.client.base; -import com.example.mina.server.base.AbstractHardwareDataBuffer; +import lombok.extern.slf4j.Slf4j; +import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.service.IoConnector; import org.apache.mina.core.session.IoSession; -import org.apache.mina.filter.codec.ProtocolCodecFactory; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.transport.socket.nio.NioSocketConnector; import java.net.InetSocketAddress; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; /** * @author dy * @date 2021/3/10 */ +@Slf4j public abstract class AbstractClient { /** @@ -23,58 +25,62 @@ public abstract class AbstractClient { protected BoxConnectConfig boxConnectConfig; /** - * 存储设备返回的响应数据缓存 - */ - protected AbstractHardwareDataBuffer responsePool; - - /** * 存储客户端的请求指令的队列 */ - protected ArrayBlockingQueue requestsPool; + protected LinkedBlockingQueue requestsPool; protected IoSession ioSession; + private CommandSender sender; public AbstractClient(BoxConnectConfig connectConfig) { this.boxConnectConfig = connectConfig; - this.requestsPool = new ArrayBlockingQueue(100); - //TODO 初始化响应缓存 - this.responsePool = null; + this.requestsPool = new LinkedBlockingQueue<>(); + this.sender = new CommandSender(this, this.requestsPool); } /** *获取设备的类型 */ - abstract public String getDeviceType(); + public String getDeviceType() { + return boxConnectConfig.getDeviceType(); + } /** - * 返回客户端对响应的Handler - * @return + * 获取设备ID */ - abstract protected AbstractVirtualBoxClientHandler getIoHandler(); + public String getDeviceId() { + return boxConnectConfig.getDeviceId(); + } /** - * 返回该盒子的编解码器 + * 获取设备名称 + */ + public String getDeviceName() { + return boxConnectConfig.getDeviceName(); + } + + /** + * 返回客户端对响应的Handler * @return */ - abstract protected ProtocolCodecFactory getCodecFactory(); + abstract protected AbstractVirtualBoxClientHandler getIoHandler(); /** - * 返回缓存数据,提供给Request执行; + * 返回该盒子的编解码器,默认为tcp的协议,如果是其它协议,可以在自类中重写改方法 * @return */ - public AbstractHardwareDataBuffer getResponsePool() { - return this.responsePool; + protected void setFilterChain(DefaultIoFilterChainBuilder filterChainBuilder) { + filterChainBuilder.addLast("codec", new ProtocolCodecFilter(new ByteProtocolFactory())); } - public void start() { //1、创建客户端IoService IoConnector connector = new NioSocketConnector(); //客户端链接超时时间 connector.setConnectTimeoutMillis(30000); //2、客户端过滤器 - connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(getCodecFactory())); + setFilterChain(connector.getFilterChain()); //3、客户端IoHandler,发生消息 connector.setHandler(getIoHandler()); @@ -86,7 +92,8 @@ public abstract class AbstractClient { // 获取连接会话 this.ioSession = connectFuture.getSession(); this.ioSession.setAttribute(BoxConstants.SESSION_CONFIG_NAME, boxConnectConfig); - + //将数据发送线程启动 + this.sender.start(); } /** @@ -100,4 +107,29 @@ public abstract class AbstractClient { return request.execute(); } + /** + * 客户端启动的时候自动为每个客户端启动一个发送线程,将缓存的指令发送出去 + */ + private static class CommandSender extends Thread { + private AbstractClient client; + private LinkedBlockingQueue commands; + + public CommandSender(AbstractClient client, LinkedBlockingQueue commands ) { + this.client = client; + this.commands = commands; + } + + @Override + public void run() { + while (true) { + try { + BaseRequest command = commands.poll(5, TimeUnit.SECONDS); + + client.ioSession.write(command); + }catch (Exception e) { + log.error("error occurred when send the request to device, device: " + client.boxConnectConfig, e); + } + } + } + } } diff --git a/src/main/java/com/example/mina/client/base/AbstractVirtualBoxClientHandler.java b/src/main/java/com/example/mina/client/base/AbstractVirtualBoxClientHandler.java index 6d593b9..1554ccd 100644 --- a/src/main/java/com/example/mina/client/base/AbstractVirtualBoxClientHandler.java +++ b/src/main/java/com/example/mina/client/base/AbstractVirtualBoxClientHandler.java @@ -1,7 +1,6 @@ package com.example.mina.client.base; -import com.example.mina.server.base.AbstractHardwareDataBuffer; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.mina.core.buffer.IoBuffer; @@ -16,14 +15,10 @@ import org.apache.mina.core.session.IoSession; @Slf4j public abstract class AbstractVirtualBoxClientHandler extends IoHandlerAdapter { - protected String deviceType; + protected DownloadListener downloadListener; - protected AbstractHardwareDataBuffer responsePool; - - - public AbstractVirtualBoxClientHandler(String deviceType, AbstractHardwareDataBuffer abstractHardwareDataBuffer) { - this.deviceType = deviceType; - this.responsePool = abstractHardwareDataBuffer; + public AbstractVirtualBoxClientHandler(DownloadListener downloadListener) { + this.downloadListener = downloadListener; } protected BoxConnectConfig getConnectConfig(IoSession session) { @@ -48,7 +43,7 @@ public abstract class AbstractVirtualBoxClientHandler extends IoHandlerAdapter { //TODO 客户端将数据服务器的字符串数组读取出来 byte[] response = null; - log.info("the client recieved the device response, deviceType: {}, device:{}, response is: {}", deviceType, + log.info("the client recieved the device response, device:{}, response is: {}", getConnectConfig(session), response); handleCommandResponse(response); } @@ -72,12 +67,12 @@ public abstract class AbstractVirtualBoxClientHandler extends IoHandlerAdapter { @Override public void sessionIdle(IoSession session, IdleStatus status) { - log.info("the session with box was idle, deviceType:{}, device is {}", deviceType, getConnectConfig(session)); + log.info("the session with box was idle, device is {}", getConnectConfig(session)); } @Override public void sessionOpened(IoSession session) { - log.info("the system has connected to device, deviceType: {}, device is {}", deviceType, getConnectConfig(session)); + log.info("the system has connected to device, device is {}", getConnectConfig(session)); } abstract public boolean handleCommandResponse(byte[] response); diff --git a/src/main/java/com/example/mina/client/base/BoxConnectConfig.java b/src/main/java/com/example/mina/client/base/BoxConnectConfig.java index 3a7c30e..8b4bea8 100644 --- a/src/main/java/com/example/mina/client/base/BoxConnectConfig.java +++ b/src/main/java/com/example/mina/client/base/BoxConnectConfig.java @@ -10,6 +10,12 @@ import java.util.Map; @Builder public class BoxConnectConfig { + private String deviceId; + + private String deviceType; + + private String deviceName; + private String host; private Integer port; diff --git a/src/main/java/com/example/mina/client/base/DownloadListener.java b/src/main/java/com/example/mina/client/base/DownloadListener.java new file mode 100644 index 0000000..7aebbb1 --- /dev/null +++ b/src/main/java/com/example/mina/client/base/DownloadListener.java @@ -0,0 +1,17 @@ +package com.example.mina.client.base; + +public class DownloadListener { + + /** + * 当从设备端将数据同步到客户端的时候,将数据直接存入数据中, + * 该类会在ioHandler及其子类中被调用 + * 设置attenuation的值,并将其存储到matrix_data的数据表中 + */ + public void setAttenuation(String deviceId, int row, int col, int val) { + //TODO 实现将数据写入到数据库的逻辑 + } + + private String getCurrentThreadId() { + return Thread.currentThread().getName(); + } +} diff --git a/src/main/java/com/example/mina/client/box/aeroflex/AeroflexClient.java b/src/main/java/com/example/mina/client/box/aeroflex/AeroflexClient.java index cd20785..064feae 100644 --- a/src/main/java/com/example/mina/client/box/aeroflex/AeroflexClient.java +++ b/src/main/java/com/example/mina/client/box/aeroflex/AeroflexClient.java @@ -3,8 +3,6 @@ package com.example.mina.client.box.aeroflex; import com.example.mina.client.base.AbstractClient; import com.example.mina.client.base.AbstractVirtualBoxClientHandler; import com.example.mina.client.base.BoxConnectConfig; -import com.example.mina.client.base.BoxConstants; -import org.apache.mina.filter.codec.ProtocolCodecFactory; /** * @author dy @@ -17,16 +15,6 @@ public class AeroflexClient extends AbstractClient { } @Override - public String getDeviceType() { - return BoxConstants.DEVICE_TYPE_AERO; - } - - @Override - protected ProtocolCodecFactory getCodecFactory() { - return null; - } - - @Override protected AbstractVirtualBoxClientHandler getIoHandler() { return null; } diff --git a/src/main/java/com/example/mina/client/box/aeroflex/AeroflexClientHandler.java b/src/main/java/com/example/mina/client/box/aeroflex/AeroflexClientHandler.java index 27816d8..6771817 100644 --- a/src/main/java/com/example/mina/client/box/aeroflex/AeroflexClientHandler.java +++ b/src/main/java/com/example/mina/client/box/aeroflex/AeroflexClientHandler.java @@ -1,7 +1,7 @@ package com.example.mina.client.box.aeroflex; import com.example.mina.client.base.AbstractVirtualBoxClientHandler; -import com.example.mina.client.base.BaseResponse; +import com.example.mina.client.base.DownloadListener; /** * @author dy @@ -9,6 +9,10 @@ import com.example.mina.client.base.BaseResponse; */ public class AeroflexClientHandler extends AbstractVirtualBoxClientHandler { + public AeroflexClientHandler(DownloadListener downloadListener) { + super(downloadListener); + } + @Override public boolean handleCommandResponse(byte[] response) { diff --git a/src/main/java/com/example/mina/client/box/lte3000/Lte3000Client.java b/src/main/java/com/example/mina/client/box/lte3000/Lte3000Client.java index 49d1e00..4633202 100644 --- a/src/main/java/com/example/mina/client/box/lte3000/Lte3000Client.java +++ b/src/main/java/com/example/mina/client/box/lte3000/Lte3000Client.java @@ -3,7 +3,6 @@ package com.example.mina.client.box.lte3000; import com.example.mina.client.base.AbstractClient; import com.example.mina.client.base.AbstractVirtualBoxClientHandler; import com.example.mina.client.base.BoxConnectConfig; -import org.apache.mina.filter.codec.ProtocolCodecFactory; /** * @author dy @@ -15,17 +14,8 @@ public class Lte3000Client extends AbstractClient { } @Override - public String getDeviceType() { - return null; - } - - @Override protected AbstractVirtualBoxClientHandler getIoHandler() { return null; } - @Override - protected ProtocolCodecFactory getCodecFactory() { - return null; - } } diff --git a/src/main/java/com/example/mina/client/box/lte3000/Lte3000ClientHandler.java b/src/main/java/com/example/mina/client/box/lte3000/Lte3000ClientHandler.java index c423558..f5270e2 100644 --- a/src/main/java/com/example/mina/client/box/lte3000/Lte3000ClientHandler.java +++ b/src/main/java/com/example/mina/client/box/lte3000/Lte3000ClientHandler.java @@ -1,11 +1,9 @@ package com.example.mina.client.box.lte3000; import com.example.mina.client.base.AbstractVirtualBoxClientHandler; -import com.example.mina.client.base.BaseRequest; -import com.example.mina.server.base.AbstractHardwareDataBuffer; +import com.example.mina.client.base.DownloadListener; import com.example.mina.server.util.Lte3000CommandHelper; import lombok.extern.slf4j.Slf4j; -import org.apache.mina.core.session.IoSession; /** * @author dy @@ -14,6 +12,10 @@ import org.apache.mina.core.session.IoSession; @Slf4j public class Lte3000ClientHandler extends AbstractVirtualBoxClientHandler { + public Lte3000ClientHandler(DownloadListener downloadListener) { + super(downloadListener); + } + protected boolean isSameValue(byte value, int expect) { return ((0xFF & value) == expect); } @@ -33,13 +35,13 @@ public class Lte3000ClientHandler extends AbstractVirtualBoxClientHandler { int col = Lte3000CommandHelper.getBcdPort(response[4], response[5], response[6]); int row = Lte3000CommandHelper.getBcdPort(response[4], response[5], response[6]); // limits only one set "ON" in a column, others should be OFF - for (int i = 1; i <= responsePool.getMaxRow(); i++) { - if (i == row) { - responsePool.setAttenuation(i, col, 1); - } else { - responsePool.setAttenuation(i, col, 0); - } - } +// for (int i = 1; i <= responsePool.getMaxRow(); i++) { +// if (i == row) { +// responsePool.setAttenuation(i, col, 1); +// } else { +// responsePool.setAttenuation(i, col, 0); +// } +// } TODO 再次调用DownloadListener的方法来设置衰减值 log.info("RESPONSE Ox4F. ROW = {}, COL = {}", row, col); return true; } @@ -50,13 +52,13 @@ public class Lte3000ClientHandler extends AbstractVirtualBoxClientHandler { int col = Lte3000CommandHelper.getBcdPort(response[4], response[5], response[6]); int row = Lte3000CommandHelper.getBcdPort(response[7], response[8], response[9]); // limits only one set "ON" in a column, others should be OFF - for (int i = 1; i <= responsePool.getMaxRow(); i++) { - if (i == row) { - responsePool.setAttenuation(i, col, 1); - } else { - responsePool.setAttenuation(i, col, 0); - } - } +// for (int i = 1; i <= responsePool.getMaxRow(); i++) { +// if (i == row) { +// responsePool.setAttenuation(i, col, 1); +// } else { +// responsePool.setAttenuation(i, col, 0); +// } +// } TODO 再次调用DownloadListener的方法来设置衰减值 log.info("RESPONSE Ox53. ROW = {}, COL = {}", row, col); return true; } @@ -70,7 +72,7 @@ public class Lte3000ClientHandler extends AbstractVirtualBoxClientHandler { int col = Lte3000CommandHelper.getBcdPort(response[7], response[8], response[9]); int attn = Lte3000CommandHelper.getBcdAttn(response[11], response[12], response[13], response[14], response[15]); - responsePool.setOffset(col, attn); + //responsePool.setOffset(col, attn); TODO 再次调用DownloadListener的方法来设置衰减值 log.info("RESPONSE XGMO. ATTN = {}, COL = {}", attn, col); return true; } @@ -92,7 +94,7 @@ public class Lte3000ClientHandler extends AbstractVirtualBoxClientHandler { */ int col = Lte3000CommandHelper.getBcdPort(response[7], response[8], response[9]); int attn = Lte3000CommandHelper.getBcdAttn(response[9], response[10], response[11], response[12], response[13]); - responsePool.setOffset(col, attn); + //responsePool.setOffset(col, attn); TODO 再次调用DownloadListener的方法来设置衰减值 log.info("RESPONSE XGRO. ATTN = {}, COL = {}", attn, col); return true; } -- libgit2 0.21.2