package com.example.mina.client.base; import com.example.mina.server.base.AbstractHardwareDataBuffer; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.service.IoConnector; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFactory; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.transport.socket.nio.NioSocketConnector; import java.net.InetSocketAddress; import java.util.concurrent.ArrayBlockingQueue; /** * @author dy * @date 2021/3/10 */ public abstract class AbstractClient { /** * 客户端与设备端连接的配置信息 */ protected BoxConnectConfig boxConnectConfig; /** * 存储设备返回的响应数据缓存 */ protected AbstractHardwareDataBuffer responsePool; /** * 存储客户端的请求指令的队列 */ protected ArrayBlockingQueue requestsPool; protected IoSession ioSession; public AbstractClient(BoxConnectConfig connectConfig) { this.boxConnectConfig = connectConfig; this.requestsPool = new ArrayBlockingQueue(100); //TODO 初始化响应缓存 this.responsePool = null; } /** *获取设备的类型 */ abstract public String getDeviceType(); /** * 返回客户端对响应的Handler * @return */ abstract protected AbstractVirtualBoxClientHandler getIoHandler(); /** * 返回该盒子的编解码器 * @return */ abstract protected ProtocolCodecFactory getCodecFactory(); /** * 返回缓存数据,提供给Request执行; * @return */ public AbstractHardwareDataBuffer getResponsePool() { return this.responsePool; } public void start() { //1、创建客户端IoService IoConnector connector = new NioSocketConnector(); //客户端链接超时时间 connector.setConnectTimeoutMillis(30000); //2、客户端过滤器 connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(getCodecFactory())); //3、客户端IoHandler,发生消息 connector.setHandler(getIoHandler()); //连接服务端 ConnectFuture connectFuture = connector.connect(new InetSocketAddress(boxConnectConfig.getHost(), boxConnectConfig.getPort())); // 等待建立连接 connectFuture.awaitUninterruptibly(); // 获取连接会话 this.ioSession = connectFuture.getSession(); this.ioSession.setAttribute(BoxConstants.SESSION_CONFIG_NAME, boxConnectConfig); } /** * 执行指令 * @param request */ public , R extends BaseResponse> R execute(Q request) { requestsPool.offer(request); ioSession.write(request); return request.execute(); } }