From a464050e05cb00f1cd8cc25644ce04cad587866d Mon Sep 17 00:00:00 2001 From: "yineng.huang" Date: Fri, 28 Feb 2025 13:58:06 +0800 Subject: [PATCH] =?UTF-8?q?protocol=E8=A1=A5=E5=85=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../protocol/packet/AsyncPacketListener.java | 63 +++ .../protocol/packet/BooleanCommandData.java | 40 ++ .../protocol/packet/ByteCommandData.java | 27 + .../driver/protocol/packet/CodecUtils.java | 82 ++++ .../driver/protocol/packet/CommandPacket.java | 83 ++++ .../protocol/packet/EmptyCommandData.java | 17 + .../driver/protocol/packet/ICommandData.java | 25 + .../protocol/packet/ICommandPacket.java | 69 +++ .../protocol/packet/IntCommandData.java | 28 ++ .../driver/protocol/packet/PacketHandler.java | 5 + .../protocol/packet/PacketTransfer.java | 461 ++++++++++++++++++ .../protocol/packet/TextCommandData.java | 36 ++ 12 files changed, 936 insertions(+) create mode 100644 cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/AsyncPacketListener.java create mode 100644 cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/BooleanCommandData.java create mode 100644 cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/ByteCommandData.java create mode 100644 cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/CodecUtils.java create mode 100644 cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/CommandPacket.java create mode 100644 cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/EmptyCommandData.java create mode 100644 cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/ICommandData.java create mode 100644 cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/ICommandPacket.java create mode 100644 cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/IntCommandData.java create mode 100644 cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/PacketHandler.java create mode 100644 cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/PacketTransfer.java create mode 100644 cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/TextCommandData.java diff --git a/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/AsyncPacketListener.java b/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/AsyncPacketListener.java new file mode 100644 index 0000000..bb6f55f --- /dev/null +++ b/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/AsyncPacketListener.java @@ -0,0 +1,63 @@ +package net.northking.cctp.upperComputer.driver.protocol.packet; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 异步指令数据包监听器 + */ +public class AsyncPacketListener implements PacketHandler { + + private final Logger logger = LoggerFactory.getLogger(AsyncPacketListener.class); + + private final int commandId; + + private final int requestId; + + private final PacketHandler handler; + + public AsyncPacketListener(int commandId) { + this.commandId = commandId; + this.requestId = 0; + this.handler = this; + } + + public AsyncPacketListener(int commandId, PacketHandler handler) { + this.commandId = commandId; + this.requestId = 0; + this.handler = handler; + } + + public AsyncPacketListener(int commandId, int requestId, PacketHandler handler) { + this.commandId = commandId; + this.requestId = requestId; + this.handler = handler; + } + + public int getCommandId() { + return commandId; + } + + public int getRequestId() { + return requestId; + } + + public boolean tryHandle(PacketTransfer transfer, ICommandPacket packet) { + boolean result = packet.commandId() == commandId && requestId == packet.getRequestId(); + if (result) { + try { + handler.handle(transfer, packet); + } catch (Exception e) { + logger.error("指令处理器处理指令数据时发生异常", e); + } + } + + return result; + } + + @Override + public void handle(PacketTransfer transfer, ICommandPacket packet) { + + } +} diff --git a/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/BooleanCommandData.java b/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/BooleanCommandData.java new file mode 100644 index 0000000..b761eff --- /dev/null +++ b/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/BooleanCommandData.java @@ -0,0 +1,40 @@ +package net.northking.cctp.upperComputer.driver.protocol.packet; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class BooleanCommandData implements ICommandData { + boolean success = false; + + public BooleanCommandData() { + } + + public BooleanCommandData(boolean success) { + this.success = success; + } + + public boolean isSuccess() { + return success; + } + + public void setSuccess(boolean success) { + this.success = success; + } + + @Override + public void buildFromInput(DataInput dataInput) throws IOException { + success = dataInput.readBoolean(); + } + + @Override + public void encodeToOutput(DataOutput dataOutput) throws IOException { + dataOutput.writeBoolean(success); + } + + @Override + public String toString() { + return success ? "true" : "false"; + } + +} diff --git a/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/ByteCommandData.java b/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/ByteCommandData.java new file mode 100644 index 0000000..0c1e542 --- /dev/null +++ b/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/ByteCommandData.java @@ -0,0 +1,27 @@ +package net.northking.cctp.upperComputer.driver.protocol.packet; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class ByteCommandData implements ICommandData { + public byte aByte; + + public void setaByte(byte aByte) { + this.aByte = aByte; + } + + public byte getaByte() { + return aByte; + } + + @Override + public void buildFromInput(DataInput dataInput) throws IOException { + aByte = dataInput.readByte(); + } + + @Override + public void encodeToOutput(DataOutput dataOutput) throws IOException { + dataOutput.writeByte(aByte); + } +} diff --git a/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/CodecUtils.java b/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/CodecUtils.java new file mode 100644 index 0000000..f48f11e --- /dev/null +++ b/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/CodecUtils.java @@ -0,0 +1,82 @@ +package net.northking.cctp.upperComputer.driver.protocol.packet; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; + +public class CodecUtils { + + /** + * 发送文本
+ * 作为DataOutput方法的补充
+ * 文本字节长度+文本字节 + * + * @param output 发送操作对象 + * @param text 要发送的文本内容 + * @throws IOException 发送过程中发生输入输出错误 + */ + public static void writeText(DataOutput output, String text) throws IOException { + byte[] textByteArray = text.getBytes(); + output.writeInt(textByteArray.length); + output.write(textByteArray); + } + + /** + * 读取文本
+ * 作为DataInput方法的补充
+ * 会读取一个整型作为接下来要读取的文本字节长度 + * + * @param input 读取操作对象 + * @return 读取到的文本 + * @throws IOException 读取过程中发生输入输出错误 + */ + public static String readText(DataInput input) throws IOException { + int textLength = input.readInt(); + byte[] textByteArray = new byte[textLength]; + input.readFully(textByteArray); + return new String(textByteArray); + } + + public static int[] readIntArray(DataInput input) throws IOException { + int length = input.readInt(); + int[] array = new int[length]; + for (int i = 0; i < length; i++) { + array[i] = input.readInt(); + } + return array; + } + + public static void writeNullable(DataOutput output, K data, WriteNullableData writer) throws IOException { + if (data == null) { + output.writeBoolean(false); + } else { + output.writeBoolean(true); + writer.write(output, data); + } + } + + public static K readNullable(DataInput input, ReadNullableData reader) throws IOException { + if (input.readBoolean()) { + return reader.read(input); + } + return null; + } + + public static ArrayList readStringArrayList(DataInput input) throws IOException { + int length = input.readInt(); + ArrayList list = new ArrayList(length); + for (int i = 0; i < length; i++) { + list.add(readText(input)); + } + return list; + } + + public interface ReadNullableData { + K read(DataInput input) throws IOException; + } + + public interface WriteNullableData { + void write(DataOutput output, K data) throws IOException; + } +} diff --git a/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/CommandPacket.java b/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/CommandPacket.java new file mode 100644 index 0000000..2810e46 --- /dev/null +++ b/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/CommandPacket.java @@ -0,0 +1,83 @@ +package net.northking.cctp.upperComputer.driver.protocol.packet; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.nio.ByteBuffer; + + +public class CommandPacket implements ICommandPacket { + + private final Logger logger = LoggerFactory.getLogger(CommandPacket.class); + + private final int commandId; + + private int requestId = 0; + + private byte[] carryingData = new byte[0]; + + public CommandPacket(int commandId) { + this.commandId = commandId; + } + + @Override + public int commandId() { + return commandId; + } + + @Override + public int getRequestId() { + return requestId; + } + + @Override + public void setRequestId(int id) { + if (requestId != 0) throw new IllegalStateException("请求ID已经设置,不应修改"); + requestId = id; + } + + @Override + public byte[] getCarryingData() { + return carryingData; + } + + @Override + public void setCarryingData(byte[] data) { + if (data == null) throw new IllegalArgumentException("携带的数据不应为null,请使用new byte[0]"); + this.carryingData = data; + } + + @Override + public ICommandPacket createResponse(byte[] carryingData) { + CommandPacket packet = new CommandPacket(commandId); + packet.setRequestId(requestId); + packet.carryingData = carryingData; + return packet; + } + + @Override + public byte[] toByteArray() { + ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + 4 + carryingData.length); + buffer.putInt(commandId()); + buffer.putInt(requestId); + buffer.putInt(carryingData.length); + if (carryingData.length > 0) { + buffer.put(carryingData); + } + return buffer.array(); + } + + @Override + public boolean fillICommandData(T commandData) { + try { + commandData.buildFromInput(new DataInputStream(new ByteArrayInputStream(carryingData))); + } catch (Exception e) { + logger.error("填充对象" + commandData.getClass().getSimpleName() + "时发生异常", e); + return false; + } + return true; + } +} diff --git a/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/EmptyCommandData.java b/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/EmptyCommandData.java new file mode 100644 index 0000000..9ac6a4b --- /dev/null +++ b/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/EmptyCommandData.java @@ -0,0 +1,17 @@ +package net.northking.cctp.upperComputer.driver.protocol.packet; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class EmptyCommandData implements ICommandData { + @Override + public void buildFromInput(DataInput dataInput) throws IOException { + + } + + @Override + public void encodeToOutput(DataOutput dataOutput) throws IOException { + + } +} diff --git a/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/ICommandData.java b/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/ICommandData.java new file mode 100644 index 0000000..3d3f008 --- /dev/null +++ b/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/ICommandData.java @@ -0,0 +1,25 @@ +package net.northking.cctp.upperComputer.driver.protocol.packet; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * 可编码数据 + */ +public interface ICommandData { + + /** + * 从DataInput中反序列化出此数据 + * + * @param dataInput 读取数据流 + */ + void buildFromInput(DataInput dataInput) throws IOException; + + /** + * 将此数据序列化输出到DataOutput + * + * @param dataOutput 输出数据流 + */ + void encodeToOutput(DataOutput dataOutput) throws IOException; +} diff --git a/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/ICommandPacket.java b/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/ICommandPacket.java new file mode 100644 index 0000000..ff913a9 --- /dev/null +++ b/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/ICommandPacket.java @@ -0,0 +1,69 @@ +package net.northking.cctp.upperComputer.driver.protocol.packet; + +/** + * 指令数据包 + *
+ * 用于处理发送和接收 + */ +public interface ICommandPacket { + /** + * 指令ID,应为常量 + * + * @return 指令ID + */ + int commandId(); + + /** + * 获得请求ID + * + * @return 请求ID + */ + int getRequestId(); + + /** + * 设置请求ID + *
+ * 仅在发送和接收时调用 + * + * @param id 请求ID + */ + void setRequestId(int id); + + /** + * 获得指令携带的数据 + * + * @return 指令携带数据 + */ + byte[] getCarryingData(); + + /** + * 设置指令携带的数据 + * + * @param data 指令携带的数据 + */ + void setCarryingData(byte[] data); + + /** + * 创建指令的回复指令 + * + * @param carryingData 回复的数据 + * @return 新的指令对象 + */ + ICommandPacket createResponse(byte[] carryingData); + + /** + * 将此CommandPacket编码为ByteArray + * + * @return 此CommandPacket编码为的ByteArray + */ + byte[] toByteArray(); + + /** + * 将数据填充至ICommandData对象 + * + * @param commandData 要填充的对象 + * @param ICommandData的具体实现类型 + * @return 是否填充成功 + */ + boolean fillICommandData(T commandData); +} diff --git a/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/IntCommandData.java b/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/IntCommandData.java new file mode 100644 index 0000000..12b7f72 --- /dev/null +++ b/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/IntCommandData.java @@ -0,0 +1,28 @@ +package net.northking.cctp.upperComputer.driver.protocol.packet; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class IntCommandData implements ICommandData { + + public int anInt; + + public void setAnInt(int anInt) { + this.anInt = anInt; + } + + public int getAnInt() { + return anInt; + } + + @Override + public void buildFromInput(DataInput dataInput) throws IOException { + anInt = dataInput.readInt(); + } + + @Override + public void encodeToOutput(DataOutput dataOutput) throws IOException { + dataOutput.writeInt(anInt); + } +} diff --git a/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/PacketHandler.java b/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/PacketHandler.java new file mode 100644 index 0000000..71b2b00 --- /dev/null +++ b/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/PacketHandler.java @@ -0,0 +1,5 @@ +package net.northking.cctp.upperComputer.driver.protocol.packet; + +public interface PacketHandler { + void handle(PacketTransfer transfer, ICommandPacket packet); +} diff --git a/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/PacketTransfer.java b/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/PacketTransfer.java new file mode 100644 index 0000000..3bc172d --- /dev/null +++ b/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/PacketTransfer.java @@ -0,0 +1,461 @@ +package net.northking.cctp.upperComputer.driver.protocol.packet; + + +import net.northking.cctp.upperComputer.utils.ByteUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.net.Socket; +import java.util.ArrayList; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * 指令数据包传送 + *
+ * 提供同步和异步发送方法,不包含Socket创建 + */ +public class PacketTransfer { + + private final Logger logger = LoggerFactory.getLogger(PacketTransfer.class); + + /** + * 同步请求超时响应时间,单位:秒 + */ + public static final int SYNC_TIMEOUT = 3600; + + private final AtomicInteger autoRequestId = new AtomicInteger(); + + private final Socket socket; + + private final DataInputStream inputStream; + + private final DataOutputStream outputStream; + + private final SendHandler sendHandler; + + private final ReceiveHandler receiveHandler; + + private final AsyncPacketHandler asyncPacketHandler; + + private final ArrayBlockingQueue sendQueue = new ArrayBlockingQueue<>(20); + + private final ArrayList syncWaitResponseTargetList = new ArrayList<>(); + + private final ArrayList asyncPacketListeners = new ArrayList<>(); + + /** + * 是否已连接 + */ + private boolean connected; + + /** + * 连接断开监听 + */ + private Runnable onDisconnectListener = null; + + + /** + * 一个TAG,用于区分多个PacketTransfer + */ + private final String TAG; + + public PacketTransfer(Socket socket, String tag) throws IOException { + this.socket = socket; + this.inputStream = new DataInputStream(socket.getInputStream()); + this.outputStream = new DataOutputStream(socket.getOutputStream()); + this.TAG = tag; + this.sendHandler = new SendHandler(); + this.receiveHandler = new ReceiveHandler(); + this.asyncPacketHandler = new AsyncPacketHandler(); + this.connected = true; + } + + + public PacketTransfer(InputStream inputStream, OutputStream outputStream, String tag) { + this(null, inputStream, outputStream, tag); + } + + public PacketTransfer(Socket socket, InputStream inputStream, OutputStream outputStream, String tag) { + this.socket = socket; + this.inputStream = new DataInputStream(inputStream); + this.outputStream = new DataOutputStream(outputStream); + this.TAG = tag; + this.sendHandler = new SendHandler(); + this.receiveHandler = new ReceiveHandler(); + this.asyncPacketHandler = new AsyncPacketHandler(); + this.connected = true; + } + + public ICommandPacket syncSend(int commandId) { + return syncSend(commandId, new EmptyCommandData()); + } + + public ICommandPacket syncSend(int commandId, ICommandData data) { + return syncSend(commandId, autoRequestId.getAndIncrement(), data); + } + + public ICommandPacket syncSend(int commandId, int requestId, ICommandData data) { + ICommandPacket sendPacket; + try { + sendPacket = buildPacketFromData(commandId, requestId, data); + } catch (IOException e) { + logger.error("序列化数据时发生错误", e); + return null; + } + + return syncSend(sendPacket); + } + + public void asyncSend(int commandId) { + asyncSend(commandId, new EmptyCommandData()); + } + + public void asyncSend(int commandId, ICommandData data) { + asyncSend(commandId, autoRequestId.getAndIncrement(), data); + } + + public void asyncSend(int commandId, int requestId, ICommandData data) { + ICommandPacket sendPacket; + try { + sendPacket = buildPacketFromData(commandId, requestId, data); + } catch (IOException e) { + logger.error("序列化数据时发生错误", e); + return; + } + asyncSend(sendPacket); + } + + private ICommandPacket buildPacketFromData(int commandId, int requestId, ICommandData data) throws IOException { + CommandPacket packet = new CommandPacket(commandId); + packet.setRequestId(requestId); + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + DataOutputStream dataOutput = new DataOutputStream(outStream); + data.encodeToOutput(dataOutput); + packet.setCarryingData(outStream.toByteArray()); + return packet; + } + + public ICommandPacket syncSend(ICommandPacket sendPacket) { + logger.debug("设备的nkAgent连接状态:connected = " + connected); + if (!connected) return null; + SendTarget sendTarget = new SendTarget(sendPacket, false); + sendQueue.offer(sendTarget); + try { + synchronized (sendTarget.lock) { + sendTarget.lock.wait(SYNC_TIMEOUT * 1000); + } + } catch (InterruptedException e) { + return null; + } + return sendTarget.responsePacket; + } + + public void asyncSend(ICommandPacket sendPacket) { + if (!connected) return; + SendTarget sendTarget = new SendTarget(sendPacket, true); + sendQueue.offer(sendTarget); + } + + /** + * 添加一个异步结果监听器 + * + * @param listener 监听器 + */ + public void addAsyncListener(AsyncPacketListener listener) { + synchronized (asyncPacketListeners) { + asyncPacketListeners.add(listener); + } + } + + public void setOnDisconnectListener(Runnable onDisconnectListener) { + this.onDisconnectListener = onDisconnectListener; + } + + + + + /** + * 移除一个异步结果监听器 + * + * @param listener 监听器 + */ + public void removeAsyncListener(AsyncPacketListener listener) { + synchronized (asyncPacketListeners) { + asyncPacketListeners.remove(listener); + } + } + + public void close() { + if (socket != null) { + try { + socket.close(); + } catch (IOException e) { + logger.error("关闭Socket时发生IO异常", e); + } + } + } + + public boolean isConnected() { + return connected; + } + + private synchronized void onDisconnected() { + if (!connected) return; + connected = false; + sendHandler.stop(); + receiveHandler.stop(); + asyncPacketHandler.stop(); + sendQueue.clear(); + synchronized (syncWaitResponseTargetList) { + for (SendTarget target : syncWaitResponseTargetList) { + synchronized (target.lock) { + target.lock.notifyAll(); + } + } + syncWaitResponseTargetList.clear(); + } + synchronized (asyncPacketListeners) { + asyncPacketListeners.clear(); + } + if (onDisconnectListener != null) { + try { + onDisconnectListener.run(); + } catch (Exception e) { + logger.error("执行连接断开回调时发生异常", e); + } + } + } + + /** + * 执行发送操作的对象 + *
+ * 内部处理了同步响应锁 + */ + public class SendTarget implements PacketHandler { + private final Object lock = new Object(); + final ICommandPacket packet; + final long time = System.currentTimeMillis(); + private ICommandPacket responsePacket = null; + final boolean isAsync; + + public SendTarget(ICommandPacket packet, boolean isAsync) { + this.packet = packet; + this.isAsync = isAsync; + } + + public ICommandPacket getResponsePacket() throws SyncRequestTimeoutException { + if (isTimeout()) throw new SyncRequestTimeoutException(this); + return responsePacket; + } + + public boolean isTimeout() { + return System.currentTimeMillis() - time > SYNC_TIMEOUT * 1000; + } + + @Override + public void handle(PacketTransfer transfer, ICommandPacket packet) { + this.responsePacket = packet; + synchronized (lock) { + lock.notifyAll(); + } + } + } + + /** + * 发送数据处理器 + */ + private class SendHandler implements Runnable { + + private final Thread thread; + + public SendHandler() { + thread = new Thread(this); + thread.setName("PacketTransfer-" + TAG + "-SendHandler"); + thread.start(); + } + + @Override + public void run() { + while (!thread.isInterrupted()) { + SendTarget sendTarget; + try { + sendTarget = sendQueue.take(); + } catch (InterruptedException e) { + break; + } + if (!sendTarget.isAsync) { + synchronized (syncWaitResponseTargetList) { + syncWaitResponseTargetList.add(sendTarget); + } + } + byte[] sendByteArray = sendTarget.packet.toByteArray(); + logger.debug("发送commandId:" + sendTarget.packet.commandId() + " requestId:" + sendTarget.packet.getRequestId() + " dataLength:" + (sendByteArray.length - 12) + ":" + ByteUtils.bytesToHex(sendByteArray).substring(24)); + try { + outputStream.write(sendByteArray); + logger.debug("发送commandId:" + sendTarget.packet.commandId() + " requestId:" + sendTarget.packet.getRequestId()+"数据完成"); + } catch (IOException e) { + logger.error("PacketTransfer-" + TAG + "发送数据时发生IO异常", e); + break; + } + } + onDisconnected(); + } + + public void stop() { + if (!thread.isInterrupted()) { + thread.interrupt(); + } + } + } + + /** + * 接收数据处理器 + */ + private class ReceiveHandler implements Runnable { + private final Thread thread; + + public ReceiveHandler() { + thread = new Thread(this); + thread.setName("PacketTransfer-" + TAG + "-ReceiveHandler"); + thread.start(); + } + + @Override + public void run() { + while (!thread.isInterrupted()) { + int commandId; + try { + commandId = inputStream.readInt(); + } catch (IOException e) { + logger.error("PacketTransfer-" + TAG + "读取指令ID阶段发生IO异常", e); + break; + } + int requestId; + try { + requestId = inputStream.readInt(); + } catch (IOException e) { + logger.error("PacketTransfer-" + TAG + "读取请求ID阶段发生IO异常", e); + break; + } + int dataLength; + try { + dataLength = inputStream.readInt(); + } catch (IOException e) { + logger.error("PacketTransfer-" + TAG + "读取数据长度阶段发生IO异常", e); + break; + } + byte[] carryingData = new byte[dataLength]; + try { + inputStream.readFully(carryingData); + } catch (IOException e) { + logger.error("PacketTransfer-" + TAG + "读取携带数据阶段发生IO异常", e); + break; + } + logger.debug("收到commandId:" + commandId + " requestId:" + requestId + " dataLength:" + dataLength + ":" + ByteUtils.bytesToHex(carryingData)); + + SendTarget sendTarget = null; + synchronized (syncWaitResponseTargetList) { + int foundIndex = -1; + for (int i = 0; i < syncWaitResponseTargetList.size(); i++) { + SendTarget target = syncWaitResponseTargetList.get(i); + if (target.packet.commandId() == commandId && requestId == target.packet.getRequestId()) { + sendTarget = target; + foundIndex = i; + break; + } + } + if (sendTarget != null) { + syncWaitResponseTargetList.remove(foundIndex); + } + } + if (sendTarget != null) { + //同步请求响应 + ICommandPacket packet = sendTarget.packet.createResponse(carryingData); + sendTarget.handle(PacketTransfer.this, packet); + } else { + //异步请求响应 + ArrayList matchListener = new ArrayList<>(); + synchronized (asyncPacketListeners) { + for (AsyncPacketListener listener : asyncPacketListeners) { + if (listener.getCommandId() == commandId && listener.getRequestId() == requestId) { + matchListener.add(listener); + } + } + } + System.out.println("match listener:" + matchListener.size()); + if (!matchListener.isEmpty()) { + CommandPacket packet = new CommandPacket(commandId); + packet.setRequestId(requestId); + packet.setCarryingData(carryingData); + + for (AsyncPacketListener listener : matchListener) { + asyncPacketHandler.jobQueue.offer(new AsyncPacketHandleJob(listener, packet)); + } + } + + } + } + onDisconnected(); + } + + public void stop() { + if (!thread.isInterrupted()) { + thread.interrupt(); + } + } + } + + /** + * 用一个单独线程处理异步数据响应,防止一些响应时间较长且没有另开线程从而导致阻止之后的数据读取 + */ + private class AsyncPacketHandler implements Runnable { + + private final Thread thread; + + private final ArrayBlockingQueue jobQueue = new ArrayBlockingQueue<>(10000); + + public AsyncPacketHandler() { + thread = new Thread(this); + thread.setName("PacketTransfer-" + TAG + "-AsyncPacketHandler"); + thread.start(); + } + + @Override + public void run() { + while (!thread.isInterrupted()) { + AsyncPacketHandleJob job; + try { + job = jobQueue.take(); + } catch (InterruptedException e) { + break; + } + job.listener.tryHandle(PacketTransfer.this, job.packet); + } + } + + public void stop() { + if (!thread.isInterrupted()) { + thread.interrupt(); + } + } + } + + private static class AsyncPacketHandleJob { + final AsyncPacketListener listener; + final ICommandPacket packet; + + public AsyncPacketHandleJob(AsyncPacketListener listener, ICommandPacket packet) { + this.listener = listener; + this.packet = packet; + } + } + + public class SyncRequestTimeoutException extends Exception { + public SyncRequestTimeoutException(SendTarget sendTarget) { + super("PacketTransfer-" + TAG + "请求发生超时,指令ID:" + sendTarget.packet.commandId() + " 请求ID:" + sendTarget.packet.getRequestId() + " 创建时间:" + sendTarget.time); + } + } + +} diff --git a/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/TextCommandData.java b/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/TextCommandData.java new file mode 100644 index 0000000..ca35736 --- /dev/null +++ b/cctp-atu/atu-upper-computer/src/main/java/net/northking/cctp/upperComputer/driver/protocol/packet/TextCommandData.java @@ -0,0 +1,36 @@ +package net.northking.cctp.upperComputer.driver.protocol.packet; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class TextCommandData implements ICommandData { + + private String text = ""; + + public String getText() { + return text; + } + + public void setText(String text) { + if (text == null) throw new IllegalArgumentException("此处text不能为null"); + this.text = text; + } + + @Override + public void buildFromInput(DataInput dataInput) throws IOException { + text = CodecUtils.readText(dataInput); + } + + @Override + public void encodeToOutput(DataOutput dataOutput) throws IOException { + CodecUtils.writeText(dataOutput, text); + } + + @Override + public String toString() { + return "TextCommandData{" + + "text='" + text + '\'' + + '}'; + } +}