Commit 6c09d340798519959be6a37852c36682547be2be
1 parent
69bfaf7f
Exists in
develop
feat: 重构客户端
1,完善client端的配置信息; 2,设计将同步数据从内存转入数据库的设计; 3,为客户端增加一个同步线程自动同步数据;
Showing
8 changed files
with
111 additions
and
77 deletions
Show diff stats
src/main/java/com/example/mina/client/base/AbstractClient.java
1 | package com.example.mina.client.base; | 1 | package com.example.mina.client.base; |
2 | 2 | ||
3 | -import com.example.mina.server.base.AbstractHardwareDataBuffer; | 3 | +import lombok.extern.slf4j.Slf4j; |
4 | +import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder; | ||
4 | import org.apache.mina.core.future.ConnectFuture; | 5 | import org.apache.mina.core.future.ConnectFuture; |
5 | import org.apache.mina.core.service.IoConnector; | 6 | import org.apache.mina.core.service.IoConnector; |
6 | import org.apache.mina.core.session.IoSession; | 7 | import org.apache.mina.core.session.IoSession; |
7 | -import org.apache.mina.filter.codec.ProtocolCodecFactory; | ||
8 | import org.apache.mina.filter.codec.ProtocolCodecFilter; | 8 | import org.apache.mina.filter.codec.ProtocolCodecFilter; |
9 | import org.apache.mina.transport.socket.nio.NioSocketConnector; | 9 | import org.apache.mina.transport.socket.nio.NioSocketConnector; |
10 | 10 | ||
11 | import java.net.InetSocketAddress; | 11 | import java.net.InetSocketAddress; |
12 | -import java.util.concurrent.ArrayBlockingQueue; | 12 | +import java.util.concurrent.LinkedBlockingQueue; |
13 | +import java.util.concurrent.TimeUnit; | ||
13 | 14 | ||
14 | /** | 15 | /** |
15 | * @author dy | 16 | * @author dy |
16 | * @date 2021/3/10 | 17 | * @date 2021/3/10 |
17 | */ | 18 | */ |
19 | +@Slf4j | ||
18 | public abstract class AbstractClient { | 20 | public abstract class AbstractClient { |
19 | 21 | ||
20 | /** | 22 | /** |
@@ -23,58 +25,62 @@ public abstract class AbstractClient { | @@ -23,58 +25,62 @@ public abstract class AbstractClient { | ||
23 | protected BoxConnectConfig boxConnectConfig; | 25 | protected BoxConnectConfig boxConnectConfig; |
24 | 26 | ||
25 | /** | 27 | /** |
26 | - * 存储设备返回的响应数据缓存 | ||
27 | - */ | ||
28 | - protected AbstractHardwareDataBuffer responsePool; | ||
29 | - | ||
30 | - /** | ||
31 | * 存储客户端的请求指令的队列 | 28 | * 存储客户端的请求指令的队列 |
32 | */ | 29 | */ |
33 | - protected ArrayBlockingQueue<BaseRequest> requestsPool; | 30 | + protected LinkedBlockingQueue<BaseRequest> requestsPool; |
34 | 31 | ||
35 | protected IoSession ioSession; | 32 | protected IoSession ioSession; |
36 | 33 | ||
34 | + private CommandSender sender; | ||
37 | 35 | ||
38 | public AbstractClient(BoxConnectConfig connectConfig) { | 36 | public AbstractClient(BoxConnectConfig connectConfig) { |
39 | this.boxConnectConfig = connectConfig; | 37 | this.boxConnectConfig = connectConfig; |
40 | - this.requestsPool = new ArrayBlockingQueue<BaseRequest>(100); | ||
41 | - //TODO 初始化响应缓存 | ||
42 | - this.responsePool = null; | 38 | + this.requestsPool = new LinkedBlockingQueue<>(); |
39 | + this.sender = new CommandSender(this, this.requestsPool); | ||
43 | } | 40 | } |
44 | 41 | ||
45 | /** | 42 | /** |
46 | *获取设备的类型 | 43 | *获取设备的类型 |
47 | */ | 44 | */ |
48 | - abstract public String getDeviceType(); | 45 | + public String getDeviceType() { |
46 | + return boxConnectConfig.getDeviceType(); | ||
47 | + } | ||
49 | 48 | ||
50 | /** | 49 | /** |
51 | - * 返回客户端对响应的Handler | ||
52 | - * @return | 50 | + * 获取设备ID |
53 | */ | 51 | */ |
54 | - abstract protected AbstractVirtualBoxClientHandler getIoHandler(); | 52 | + public String getDeviceId() { |
53 | + return boxConnectConfig.getDeviceId(); | ||
54 | + } | ||
55 | 55 | ||
56 | /** | 56 | /** |
57 | - * 返回该盒子的编解码器 | 57 | + * 获取设备名称 |
58 | + */ | ||
59 | + public String getDeviceName() { | ||
60 | + return boxConnectConfig.getDeviceName(); | ||
61 | + } | ||
62 | + | ||
63 | + /** | ||
64 | + * 返回客户端对响应的Handler | ||
58 | * @return | 65 | * @return |
59 | */ | 66 | */ |
60 | - abstract protected ProtocolCodecFactory getCodecFactory(); | 67 | + abstract protected AbstractVirtualBoxClientHandler getIoHandler(); |
61 | 68 | ||
62 | /** | 69 | /** |
63 | - * 返回缓存数据,提供给Request执行; | 70 | + * 返回该盒子的编解码器,默认为tcp的协议,如果是其它协议,可以在自类中重写改方法 |
64 | * @return | 71 | * @return |
65 | */ | 72 | */ |
66 | - public AbstractHardwareDataBuffer getResponsePool() { | ||
67 | - return this.responsePool; | 73 | + protected void setFilterChain(DefaultIoFilterChainBuilder filterChainBuilder) { |
74 | + filterChainBuilder.addLast("codec", new ProtocolCodecFilter(new ByteProtocolFactory())); | ||
68 | } | 75 | } |
69 | 76 | ||
70 | - | ||
71 | public void start() { | 77 | public void start() { |
72 | //1、创建客户端IoService | 78 | //1、创建客户端IoService |
73 | IoConnector connector = new NioSocketConnector(); | 79 | IoConnector connector = new NioSocketConnector(); |
74 | //客户端链接超时时间 | 80 | //客户端链接超时时间 |
75 | connector.setConnectTimeoutMillis(30000); | 81 | connector.setConnectTimeoutMillis(30000); |
76 | //2、客户端过滤器 | 82 | //2、客户端过滤器 |
77 | - connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(getCodecFactory())); | 83 | + setFilterChain(connector.getFilterChain()); |
78 | //3、客户端IoHandler,发生消息 | 84 | //3、客户端IoHandler,发生消息 |
79 | connector.setHandler(getIoHandler()); | 85 | connector.setHandler(getIoHandler()); |
80 | 86 | ||
@@ -86,7 +92,8 @@ public abstract class AbstractClient { | @@ -86,7 +92,8 @@ public abstract class AbstractClient { | ||
86 | // 获取连接会话 | 92 | // 获取连接会话 |
87 | this.ioSession = connectFuture.getSession(); | 93 | this.ioSession = connectFuture.getSession(); |
88 | this.ioSession.setAttribute(BoxConstants.SESSION_CONFIG_NAME, boxConnectConfig); | 94 | this.ioSession.setAttribute(BoxConstants.SESSION_CONFIG_NAME, boxConnectConfig); |
89 | - | 95 | + //将数据发送线程启动 |
96 | + this.sender.start(); | ||
90 | } | 97 | } |
91 | 98 | ||
92 | /** | 99 | /** |
@@ -100,4 +107,29 @@ public abstract class AbstractClient { | @@ -100,4 +107,29 @@ public abstract class AbstractClient { | ||
100 | return request.execute(); | 107 | return request.execute(); |
101 | } | 108 | } |
102 | 109 | ||
110 | + /** | ||
111 | + * 客户端启动的时候自动为每个客户端启动一个发送线程,将缓存的指令发送出去 | ||
112 | + */ | ||
113 | + private static class CommandSender extends Thread { | ||
114 | + private AbstractClient client; | ||
115 | + private LinkedBlockingQueue<BaseRequest> commands; | ||
116 | + | ||
117 | + public CommandSender(AbstractClient client, LinkedBlockingQueue commands ) { | ||
118 | + this.client = client; | ||
119 | + this.commands = commands; | ||
120 | + } | ||
121 | + | ||
122 | + @Override | ||
123 | + public void run() { | ||
124 | + while (true) { | ||
125 | + try { | ||
126 | + BaseRequest command = commands.poll(5, TimeUnit.SECONDS); | ||
127 | + | ||
128 | + client.ioSession.write(command); | ||
129 | + }catch (Exception e) { | ||
130 | + log.error("error occurred when send the request to device, device: " + client.boxConnectConfig, e); | ||
131 | + } | ||
132 | + } | ||
133 | + } | ||
134 | + } | ||
103 | } | 135 | } |
src/main/java/com/example/mina/client/base/AbstractVirtualBoxClientHandler.java
1 | package com.example.mina.client.base; | 1 | package com.example.mina.client.base; |
2 | 2 | ||
3 | 3 | ||
4 | -import com.example.mina.server.base.AbstractHardwareDataBuffer; | ||
5 | import lombok.Data; | 4 | import lombok.Data; |
6 | import lombok.extern.slf4j.Slf4j; | 5 | import lombok.extern.slf4j.Slf4j; |
7 | import org.apache.mina.core.buffer.IoBuffer; | 6 | import org.apache.mina.core.buffer.IoBuffer; |
@@ -16,14 +15,10 @@ import org.apache.mina.core.session.IoSession; | @@ -16,14 +15,10 @@ import org.apache.mina.core.session.IoSession; | ||
16 | @Slf4j | 15 | @Slf4j |
17 | public abstract class AbstractVirtualBoxClientHandler extends IoHandlerAdapter { | 16 | public abstract class AbstractVirtualBoxClientHandler extends IoHandlerAdapter { |
18 | 17 | ||
19 | - protected String deviceType; | 18 | + protected DownloadListener downloadListener; |
20 | 19 | ||
21 | - protected AbstractHardwareDataBuffer responsePool; | ||
22 | - | ||
23 | - | ||
24 | - public AbstractVirtualBoxClientHandler(String deviceType, AbstractHardwareDataBuffer abstractHardwareDataBuffer) { | ||
25 | - this.deviceType = deviceType; | ||
26 | - this.responsePool = abstractHardwareDataBuffer; | 20 | + public AbstractVirtualBoxClientHandler(DownloadListener downloadListener) { |
21 | + this.downloadListener = downloadListener; | ||
27 | } | 22 | } |
28 | 23 | ||
29 | protected BoxConnectConfig getConnectConfig(IoSession session) { | 24 | protected BoxConnectConfig getConnectConfig(IoSession session) { |
@@ -48,7 +43,7 @@ public abstract class AbstractVirtualBoxClientHandler extends IoHandlerAdapter { | @@ -48,7 +43,7 @@ public abstract class AbstractVirtualBoxClientHandler extends IoHandlerAdapter { | ||
48 | 43 | ||
49 | //TODO 客户端将数据服务器的字符串数组读取出来 | 44 | //TODO 客户端将数据服务器的字符串数组读取出来 |
50 | byte[] response = null; | 45 | byte[] response = null; |
51 | - log.info("the client recieved the device response, deviceType: {}, device:{}, response is: {}", deviceType, | 46 | + log.info("the client recieved the device response, device:{}, response is: {}", |
52 | getConnectConfig(session), response); | 47 | getConnectConfig(session), response); |
53 | handleCommandResponse(response); | 48 | handleCommandResponse(response); |
54 | } | 49 | } |
@@ -72,12 +67,12 @@ public abstract class AbstractVirtualBoxClientHandler extends IoHandlerAdapter { | @@ -72,12 +67,12 @@ public abstract class AbstractVirtualBoxClientHandler extends IoHandlerAdapter { | ||
72 | 67 | ||
73 | @Override | 68 | @Override |
74 | public void sessionIdle(IoSession session, IdleStatus status) { | 69 | public void sessionIdle(IoSession session, IdleStatus status) { |
75 | - log.info("the session with box was idle, deviceType:{}, device is {}", deviceType, getConnectConfig(session)); | 70 | + log.info("the session with box was idle, device is {}", getConnectConfig(session)); |
76 | } | 71 | } |
77 | 72 | ||
78 | @Override | 73 | @Override |
79 | public void sessionOpened(IoSession session) { | 74 | public void sessionOpened(IoSession session) { |
80 | - log.info("the system has connected to device, deviceType: {}, device is {}", deviceType, getConnectConfig(session)); | 75 | + log.info("the system has connected to device, device is {}", getConnectConfig(session)); |
81 | } | 76 | } |
82 | 77 | ||
83 | abstract public boolean handleCommandResponse(byte[] response); | 78 | abstract public boolean handleCommandResponse(byte[] response); |
src/main/java/com/example/mina/client/base/BoxConnectConfig.java
@@ -10,6 +10,12 @@ import java.util.Map; | @@ -10,6 +10,12 @@ import java.util.Map; | ||
10 | @Builder | 10 | @Builder |
11 | public class BoxConnectConfig { | 11 | public class BoxConnectConfig { |
12 | 12 | ||
13 | + private String deviceId; | ||
14 | + | ||
15 | + private String deviceType; | ||
16 | + | ||
17 | + private String deviceName; | ||
18 | + | ||
13 | private String host; | 19 | private String host; |
14 | 20 | ||
15 | private Integer port; | 21 | private Integer port; |
src/main/java/com/example/mina/client/base/DownloadListener.java
0 → 100644
@@ -0,0 +1,17 @@ | @@ -0,0 +1,17 @@ | ||
1 | +package com.example.mina.client.base; | ||
2 | + | ||
3 | +public class DownloadListener { | ||
4 | + | ||
5 | + /** | ||
6 | + * 当从设备端将数据同步到客户端的时候,将数据直接存入数据中, | ||
7 | + * 该类会在ioHandler及其子类中被调用 | ||
8 | + * 设置attenuation的值,并将其存储到matrix_data的数据表中 | ||
9 | + */ | ||
10 | + public void setAttenuation(String deviceId, int row, int col, int val) { | ||
11 | + //TODO 实现将数据写入到数据库的逻辑 | ||
12 | + } | ||
13 | + | ||
14 | + private String getCurrentThreadId() { | ||
15 | + return Thread.currentThread().getName(); | ||
16 | + } | ||
17 | +} |
src/main/java/com/example/mina/client/box/aeroflex/AeroflexClient.java
@@ -3,8 +3,6 @@ package com.example.mina.client.box.aeroflex; | @@ -3,8 +3,6 @@ package com.example.mina.client.box.aeroflex; | ||
3 | import com.example.mina.client.base.AbstractClient; | 3 | import com.example.mina.client.base.AbstractClient; |
4 | import com.example.mina.client.base.AbstractVirtualBoxClientHandler; | 4 | import com.example.mina.client.base.AbstractVirtualBoxClientHandler; |
5 | import com.example.mina.client.base.BoxConnectConfig; | 5 | import com.example.mina.client.base.BoxConnectConfig; |
6 | -import com.example.mina.client.base.BoxConstants; | ||
7 | -import org.apache.mina.filter.codec.ProtocolCodecFactory; | ||
8 | 6 | ||
9 | /** | 7 | /** |
10 | * @author dy | 8 | * @author dy |
@@ -17,16 +15,6 @@ public class AeroflexClient extends AbstractClient { | @@ -17,16 +15,6 @@ public class AeroflexClient extends AbstractClient { | ||
17 | } | 15 | } |
18 | 16 | ||
19 | @Override | 17 | @Override |
20 | - public String getDeviceType() { | ||
21 | - return BoxConstants.DEVICE_TYPE_AERO; | ||
22 | - } | ||
23 | - | ||
24 | - @Override | ||
25 | - protected ProtocolCodecFactory getCodecFactory() { | ||
26 | - return null; | ||
27 | - } | ||
28 | - | ||
29 | - @Override | ||
30 | protected AbstractVirtualBoxClientHandler getIoHandler() { | 18 | protected AbstractVirtualBoxClientHandler getIoHandler() { |
31 | return null; | 19 | return null; |
32 | } | 20 | } |
src/main/java/com/example/mina/client/box/aeroflex/AeroflexClientHandler.java
1 | package com.example.mina.client.box.aeroflex; | 1 | package com.example.mina.client.box.aeroflex; |
2 | 2 | ||
3 | import com.example.mina.client.base.AbstractVirtualBoxClientHandler; | 3 | import com.example.mina.client.base.AbstractVirtualBoxClientHandler; |
4 | -import com.example.mina.client.base.BaseResponse; | 4 | +import com.example.mina.client.base.DownloadListener; |
5 | 5 | ||
6 | /** | 6 | /** |
7 | * @author dy | 7 | * @author dy |
@@ -9,6 +9,10 @@ import com.example.mina.client.base.BaseResponse; | @@ -9,6 +9,10 @@ import com.example.mina.client.base.BaseResponse; | ||
9 | */ | 9 | */ |
10 | public class AeroflexClientHandler extends AbstractVirtualBoxClientHandler { | 10 | public class AeroflexClientHandler extends AbstractVirtualBoxClientHandler { |
11 | 11 | ||
12 | + public AeroflexClientHandler(DownloadListener downloadListener) { | ||
13 | + super(downloadListener); | ||
14 | + } | ||
15 | + | ||
12 | @Override | 16 | @Override |
13 | public boolean handleCommandResponse(byte[] response) { | 17 | public boolean handleCommandResponse(byte[] response) { |
14 | 18 |
src/main/java/com/example/mina/client/box/lte3000/Lte3000Client.java
@@ -3,7 +3,6 @@ package com.example.mina.client.box.lte3000; | @@ -3,7 +3,6 @@ package com.example.mina.client.box.lte3000; | ||
3 | import com.example.mina.client.base.AbstractClient; | 3 | import com.example.mina.client.base.AbstractClient; |
4 | import com.example.mina.client.base.AbstractVirtualBoxClientHandler; | 4 | import com.example.mina.client.base.AbstractVirtualBoxClientHandler; |
5 | import com.example.mina.client.base.BoxConnectConfig; | 5 | import com.example.mina.client.base.BoxConnectConfig; |
6 | -import org.apache.mina.filter.codec.ProtocolCodecFactory; | ||
7 | 6 | ||
8 | /** | 7 | /** |
9 | * @author dy | 8 | * @author dy |
@@ -15,17 +14,8 @@ public class Lte3000Client extends AbstractClient { | @@ -15,17 +14,8 @@ public class Lte3000Client extends AbstractClient { | ||
15 | } | 14 | } |
16 | 15 | ||
17 | @Override | 16 | @Override |
18 | - public String getDeviceType() { | ||
19 | - return null; | ||
20 | - } | ||
21 | - | ||
22 | - @Override | ||
23 | protected AbstractVirtualBoxClientHandler getIoHandler() { | 17 | protected AbstractVirtualBoxClientHandler getIoHandler() { |
24 | return null; | 18 | return null; |
25 | } | 19 | } |
26 | 20 | ||
27 | - @Override | ||
28 | - protected ProtocolCodecFactory getCodecFactory() { | ||
29 | - return null; | ||
30 | - } | ||
31 | } | 21 | } |
src/main/java/com/example/mina/client/box/lte3000/Lte3000ClientHandler.java
No preview for this file type