package com.example.mina.client.base; import lombok.extern.slf4j.Slf4j; import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder; import org.apache.mina.core.filterchain.IoFilterChain; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.future.WriteFuture; import org.apache.mina.core.service.IoConnector; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.transport.socket.nio.NioSocketConnector; import org.springframework.util.Assert; import java.net.InetSocketAddress; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; /** * @author dy * @date 2021/3/10 */ @Slf4j public class MatrixClient { /** * 客户端与设备端连接的配置信息 */ protected MatrixConnectConfig matrixConnectConfig; /** * 存储客户端的请求指令的队列 */ protected LinkedBlockingQueue requestsPool; protected AbstractMatrixIoHandler clientHandler; protected IoSession ioSession; private CommandSender sender; public MatrixClient(MatrixConnectConfig connectConfig) { this.matrixConnectConfig = connectConfig; this.requestsPool = new LinkedBlockingQueue<>(); this.sender = new CommandSender(this, this.requestsPool); this.sender.start(); } /** *获取设备的类型 */ public String getDeviceType() { return matrixConnectConfig.getDeviceType(); } /** * 获取设备ID */ public String getDeviceId() { return matrixConnectConfig.getDeviceId(); } /** * 获取设备名称 */ public String getDeviceName() { return matrixConnectConfig.getDeviceName(); } /** * 返回客户端对响应的Handler * @return */ public void setClientHandler(AbstractMatrixIoHandler clientHandler) { this.clientHandler = clientHandler; } /** * 返回该盒子的编解码器,默认为tcp的协议,如果是其它协议,可以在自类中重写改方法 * @return */ protected void setFilterChain(DefaultIoFilterChainBuilder filterChainBuilder) { filterChainBuilder.addLast("codec", new ProtocolCodecFilter(new ByteProtocolFactory())); } public IoFilterChain getFilterChain() { //1、创建客户端IoService IoConnector connector = new NioSocketConnector(); //客户端链接超时时间 connector.setConnectTimeoutMillis(30000); //2、客户端过滤器 // setFilterChain(connector.getFilterChain()); //3、客户端IoHandler,发生消息 Assert.notNull(clientHandler, "client handler must not be null"); connector.setHandler(clientHandler); //连接服务端 ConnectFuture connectFuture = connector.connect(new InetSocketAddress(matrixConnectConfig.getHost(), matrixConnectConfig.getPort())); // 等待建立连接 connectFuture.awaitUninterruptibly(); // 获取连接会话 this.ioSession = connectFuture.getSession(); this.ioSession.setAttribute(MatrixConstants.SESSION_CONFIG_NAME, matrixConnectConfig); return ioSession.getFilterChain(); } public void start() { //将数据发送线程启动 this.sender.start(); } /** * the belows method can be overrided by its child class */ public void resetAttenuations() { MatrixCommand command = MatrixCommand.builder().command(MatrixConstants.COMMAND_RESET).build(); sendCommand(command); } public void setAttenuation(int row, int col, int attn) { MatrixCommand command = MatrixCommand.builder().command(MatrixConstants.COMMAND_SET_ATTN) .col(col).row(row).attn(attn).build(); sendCommand(command); } public void getAttenuation(Integer row, Integer col) { MatrixCommand command = MatrixCommand.builder().command(MatrixConstants.COMMAND_GET_ATTN) .col(col).row(row).build(); sendCommand(command); } public void setOffset(int col, int offset) { MatrixCommand command = MatrixCommand.builder().command(MatrixConstants.COMMAND_SET_OFFSET) .col(col).offset(offset).build(); sendCommand(command); } public void getOffset(int col) { MatrixCommand command = MatrixCommand.builder().command(MatrixConstants.COMMAND_GET_OFFSET) .col(col).build(); sendCommand(command); } /** * 执行命令 */ public boolean sendCommand(MatrixCommand command) { try{ requestsPool.put(command); return true; }catch (Exception e) { log.error("error ocurred, when put a matrix command!", e); return false; } } /** * 客户端启动的时候自动为每个客户端启动一个发送线程,将缓存的指令发送出去 */ private static class CommandSender extends Thread { private MatrixClient client; private LinkedBlockingQueue commands; public CommandSender(MatrixClient client, LinkedBlockingQueue commands ) { this.client = client; this.commands = commands; this.setName(String.format("Sender-%s-%s", client.getDeviceType(), client.getDeviceName())); } @Override public void run() { while (true) { try { MatrixCommand command = commands.poll(5, TimeUnit.SECONDS); if(command != null){ log.info("======================"); WriteFuture a =client.ioSession.write(command); } }catch (Exception e) { log.error("error occurred when send the request to device, device: " + client.matrixConnectConfig, e); } } } } }