MatrixClient.java 5.72 KB
package com.example.mina.client.base;

import lombok.extern.slf4j.Slf4j;
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.filterchain.IoFilterChain;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.springframework.util.Assert;

import java.net.InetSocketAddress;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * @author dy
 * @date 2021/3/10
 */
@Slf4j
public class MatrixClient {

    /**
     * 客户端与设备端连接的配置信息
     */
    protected MatrixConnectConfig matrixConnectConfig;

    /**
     * 存储客户端的请求指令的队列
     */
    protected LinkedBlockingQueue<MatrixCommand> requestsPool;

    protected AbstractMatrixIoHandler clientHandler;

    protected IoSession ioSession;

    private CommandSender sender;

    public MatrixClient(MatrixConnectConfig connectConfig) {
        this.matrixConnectConfig = connectConfig;
        this.requestsPool = new LinkedBlockingQueue<>();
        this.sender = new CommandSender(this, this.requestsPool);

        this.sender.start();
    }

    /**
     *获取设备的类型
     */
    public String getDeviceType() {
        return matrixConnectConfig.getDeviceType();
    }

    /**
     * 获取设备ID
     */
    public String getDeviceId() {
        return matrixConnectConfig.getDeviceId();
    }

    /**
     * 获取设备名称
     */
    public String getDeviceName() {
        return matrixConnectConfig.getDeviceName();
    }

    /**
     * 返回客户端对响应的Handler
     * @return
     */
    public void setClientHandler(AbstractMatrixIoHandler clientHandler) {
        this.clientHandler = clientHandler;
    }


    /**
     * 返回该盒子的编解码器,默认为tcp的协议,如果是其它协议,可以在自类中重写改方法
     * @return
     */
    protected void setFilterChain(DefaultIoFilterChainBuilder filterChainBuilder) {
        filterChainBuilder.addLast("codec", new ProtocolCodecFilter(new ByteProtocolFactory()));
    }

    public IoFilterChain getFilterChain() {
        //1、创建客户端IoService
        IoConnector connector = new NioSocketConnector();
        //客户端链接超时时间
        connector.setConnectTimeoutMillis(30000);
        //2、客户端过滤器
//        setFilterChain(connector.getFilterChain());
        //3、客户端IoHandler,发生消息
        Assert.notNull(clientHandler, "client handler must not be null");
        connector.setHandler(clientHandler);

        //连接服务端
        ConnectFuture connectFuture = connector.connect(new InetSocketAddress(matrixConnectConfig.getHost(),
                matrixConnectConfig.getPort()));
        // 等待建立连接
        connectFuture.awaitUninterruptibly();
        // 获取连接会话
        this.ioSession = connectFuture.getSession();
        this.ioSession.setAttribute(MatrixConstants.SESSION_CONFIG_NAME, matrixConnectConfig);

        return ioSession.getFilterChain();
    }

    public void start() {
        //将数据发送线程启动
        this.sender.start();
    }

    /**
     * the belows method can be overrided by its child class
     */
    public void resetAttenuations() {
        MatrixCommand command = MatrixCommand.builder().command(MatrixConstants.COMMAND_RESET).build();

        sendCommand(command);
    }

    public void setAttenuation(int row, int col, int attn, MatrixCommand.Type type) {
        MatrixCommand command = MatrixCommand.builder().command(MatrixConstants.COMMAND_SET_ATTN)
                .col(col).row(row).attn(attn).type(type).build();

        sendCommand(command);
    }

    public void setOffset(int col, int offset) {
        MatrixCommand command = MatrixCommand.builder().command(MatrixConstants.COMMAND_SET_OFFSET)
                .col(col).offset(offset).build();

        sendCommand(command);
    }

    public void getOffset(int col) {
        MatrixCommand command = MatrixCommand.builder().command(MatrixConstants.COMMAND_GET_OFFSET)
                .col(col).build();

        sendCommand(command);
    }

    /**
     * 执行命令
     */
    public boolean sendCommand(MatrixCommand command) {
        try{
            requestsPool.put(command);
            return true;
        }catch (Exception e) {
            log.error("error ocurred, when put a matrix command!", e);
            return false;
        }
    }

    /**
     * 客户端启动的时候自动为每个客户端启动一个发送线程,将缓存的指令发送出去
     */
    private static class CommandSender extends Thread {
        private MatrixClient client;
        private LinkedBlockingQueue<MatrixCommand> commands;

        public CommandSender(MatrixClient client, LinkedBlockingQueue commands ) {
            this.client = client;
            this.commands = commands;
            this.setName(String.format("Sender-%s-%s", client.getDeviceType(), client.getDeviceName()));
        }

        @Override
        public void run() {
            while (true) {
                try {
                    MatrixCommand command = commands.poll(5, TimeUnit.SECONDS);

                    if(command != null){
                        log.info("======================");
                        WriteFuture a =client.ioSession.write(command);
                    }

                }catch (Exception e) {
                    log.error("error occurred when send the request to device, device: " + client.matrixConnectConfig, e);
                }
            }
        }
    }
}