protocol补充

hz_1122
yineng.huang 2025-02-28 13:58:06 +08:00
parent 3f7e65f70a
commit a464050e05
12 changed files with 936 additions and 0 deletions

View File

@ -0,0 +1,63 @@
package net.northking.cctp.upperComputer.driver.protocol.packet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class AsyncPacketListener implements PacketHandler {
private final Logger logger = LoggerFactory.getLogger(AsyncPacketListener.class);
private final int commandId;
private final int requestId;
private final PacketHandler handler;
public AsyncPacketListener(int commandId) {
this.commandId = commandId;
this.requestId = 0;
this.handler = this;
}
public AsyncPacketListener(int commandId, PacketHandler handler) {
this.commandId = commandId;
this.requestId = 0;
this.handler = handler;
}
public AsyncPacketListener(int commandId, int requestId, PacketHandler handler) {
this.commandId = commandId;
this.requestId = requestId;
this.handler = handler;
}
public int getCommandId() {
return commandId;
}
public int getRequestId() {
return requestId;
}
public boolean tryHandle(PacketTransfer transfer, ICommandPacket packet) {
boolean result = packet.commandId() == commandId && requestId == packet.getRequestId();
if (result) {
try {
handler.handle(transfer, packet);
} catch (Exception e) {
logger.error("指令处理器处理指令数据时发生异常", e);
}
}
return result;
}
@Override
public void handle(PacketTransfer transfer, ICommandPacket packet) {
}
}

View File

@ -0,0 +1,40 @@
package net.northking.cctp.upperComputer.driver.protocol.packet;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class BooleanCommandData implements ICommandData {
boolean success = false;
public BooleanCommandData() {
}
public BooleanCommandData(boolean success) {
this.success = success;
}
public boolean isSuccess() {
return success;
}
public void setSuccess(boolean success) {
this.success = success;
}
@Override
public void buildFromInput(DataInput dataInput) throws IOException {
success = dataInput.readBoolean();
}
@Override
public void encodeToOutput(DataOutput dataOutput) throws IOException {
dataOutput.writeBoolean(success);
}
@Override
public String toString() {
return success ? "true" : "false";
}
}

View File

@ -0,0 +1,27 @@
package net.northking.cctp.upperComputer.driver.protocol.packet;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class ByteCommandData implements ICommandData {
public byte aByte;
public void setaByte(byte aByte) {
this.aByte = aByte;
}
public byte getaByte() {
return aByte;
}
@Override
public void buildFromInput(DataInput dataInput) throws IOException {
aByte = dataInput.readByte();
}
@Override
public void encodeToOutput(DataOutput dataOutput) throws IOException {
dataOutput.writeByte(aByte);
}
}

View File

@ -0,0 +1,82 @@
package net.northking.cctp.upperComputer.driver.protocol.packet;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
public class CodecUtils {
/**
* <br>
* DataOutput<br>
* +
*
* @param output
* @param text
* @throws IOException
*/
public static void writeText(DataOutput output, String text) throws IOException {
byte[] textByteArray = text.getBytes();
output.writeInt(textByteArray.length);
output.write(textByteArray);
}
/**
* <br>
* DataInput<br>
*
*
* @param input
* @return
* @throws IOException
*/
public static String readText(DataInput input) throws IOException {
int textLength = input.readInt();
byte[] textByteArray = new byte[textLength];
input.readFully(textByteArray);
return new String(textByteArray);
}
public static int[] readIntArray(DataInput input) throws IOException {
int length = input.readInt();
int[] array = new int[length];
for (int i = 0; i < length; i++) {
array[i] = input.readInt();
}
return array;
}
public static <K> void writeNullable(DataOutput output, K data, WriteNullableData<K> writer) throws IOException {
if (data == null) {
output.writeBoolean(false);
} else {
output.writeBoolean(true);
writer.write(output, data);
}
}
public static <K> K readNullable(DataInput input, ReadNullableData<K> reader) throws IOException {
if (input.readBoolean()) {
return reader.read(input);
}
return null;
}
public static ArrayList<String> readStringArrayList(DataInput input) throws IOException {
int length = input.readInt();
ArrayList<String> list = new ArrayList<String>(length);
for (int i = 0; i < length; i++) {
list.add(readText(input));
}
return list;
}
public interface ReadNullableData<K> {
K read(DataInput input) throws IOException;
}
public interface WriteNullableData<K> {
void write(DataOutput output, K data) throws IOException;
}
}

View File

@ -0,0 +1,83 @@
package net.northking.cctp.upperComputer.driver.protocol.packet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.nio.ByteBuffer;
public class CommandPacket implements ICommandPacket {
private final Logger logger = LoggerFactory.getLogger(CommandPacket.class);
private final int commandId;
private int requestId = 0;
private byte[] carryingData = new byte[0];
public CommandPacket(int commandId) {
this.commandId = commandId;
}
@Override
public int commandId() {
return commandId;
}
@Override
public int getRequestId() {
return requestId;
}
@Override
public void setRequestId(int id) {
if (requestId != 0) throw new IllegalStateException("请求ID已经设置不应修改");
requestId = id;
}
@Override
public byte[] getCarryingData() {
return carryingData;
}
@Override
public void setCarryingData(byte[] data) {
if (data == null) throw new IllegalArgumentException("携带的数据不应为null请使用new byte[0]");
this.carryingData = data;
}
@Override
public ICommandPacket createResponse(byte[] carryingData) {
CommandPacket packet = new CommandPacket(commandId);
packet.setRequestId(requestId);
packet.carryingData = carryingData;
return packet;
}
@Override
public byte[] toByteArray() {
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + 4 + carryingData.length);
buffer.putInt(commandId());
buffer.putInt(requestId);
buffer.putInt(carryingData.length);
if (carryingData.length > 0) {
buffer.put(carryingData);
}
return buffer.array();
}
@Override
public <T extends ICommandData> boolean fillICommandData(T commandData) {
try {
commandData.buildFromInput(new DataInputStream(new ByteArrayInputStream(carryingData)));
} catch (Exception e) {
logger.error("填充对象" + commandData.getClass().getSimpleName() + "时发生异常", e);
return false;
}
return true;
}
}

View File

@ -0,0 +1,17 @@
package net.northking.cctp.upperComputer.driver.protocol.packet;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class EmptyCommandData implements ICommandData {
@Override
public void buildFromInput(DataInput dataInput) throws IOException {
}
@Override
public void encodeToOutput(DataOutput dataOutput) throws IOException {
}
}

View File

@ -0,0 +1,25 @@
package net.northking.cctp.upperComputer.driver.protocol.packet;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
*
*/
public interface ICommandData {
/**
* DataInput
*
* @param dataInput
*/
void buildFromInput(DataInput dataInput) throws IOException;
/**
* DataOutput
*
* @param dataOutput
*/
void encodeToOutput(DataOutput dataOutput) throws IOException;
}

View File

@ -0,0 +1,69 @@
package net.northking.cctp.upperComputer.driver.protocol.packet;
/**
*
* <br>
*
*/
public interface ICommandPacket {
/**
* ID
*
* @return ID
*/
int commandId();
/**
* ID
*
* @return ID
*/
int getRequestId();
/**
* ID
* <br>
*
*
* @param id ID
*/
void setRequestId(int id);
/**
*
*
* @return
*/
byte[] getCarryingData();
/**
*
*
* @param data
*/
void setCarryingData(byte[] data);
/**
*
*
* @param carryingData
* @return
*/
ICommandPacket createResponse(byte[] carryingData);
/**
* CommandPacketByteArray
*
* @return CommandPacketByteArray
*/
byte[] toByteArray();
/**
* ICommandData
*
* @param commandData
* @param <T> ICommandData
* @return
*/
<T extends ICommandData> boolean fillICommandData(T commandData);
}

View File

@ -0,0 +1,28 @@
package net.northking.cctp.upperComputer.driver.protocol.packet;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class IntCommandData implements ICommandData {
public int anInt;
public void setAnInt(int anInt) {
this.anInt = anInt;
}
public int getAnInt() {
return anInt;
}
@Override
public void buildFromInput(DataInput dataInput) throws IOException {
anInt = dataInput.readInt();
}
@Override
public void encodeToOutput(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(anInt);
}
}

View File

@ -0,0 +1,5 @@
package net.northking.cctp.upperComputer.driver.protocol.packet;
public interface PacketHandler {
void handle(PacketTransfer transfer, ICommandPacket packet);
}

View File

@ -0,0 +1,461 @@
package net.northking.cctp.upperComputer.driver.protocol.packet;
import net.northking.cctp.upperComputer.utils.ByteUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.Socket;
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
* <br>
* Socket
*/
public class PacketTransfer {
private final Logger logger = LoggerFactory.getLogger(PacketTransfer.class);
/**
*
*/
public static final int SYNC_TIMEOUT = 3600;
private final AtomicInteger autoRequestId = new AtomicInteger();
private final Socket socket;
private final DataInputStream inputStream;
private final DataOutputStream outputStream;
private final SendHandler sendHandler;
private final ReceiveHandler receiveHandler;
private final AsyncPacketHandler asyncPacketHandler;
private final ArrayBlockingQueue<SendTarget> sendQueue = new ArrayBlockingQueue<>(20);
private final ArrayList<SendTarget> syncWaitResponseTargetList = new ArrayList<>();
private final ArrayList<AsyncPacketListener> asyncPacketListeners = new ArrayList<>();
/**
*
*/
private boolean connected;
/**
*
*/
private Runnable onDisconnectListener = null;
/**
* TAGPacketTransfer
*/
private final String TAG;
public PacketTransfer(Socket socket, String tag) throws IOException {
this.socket = socket;
this.inputStream = new DataInputStream(socket.getInputStream());
this.outputStream = new DataOutputStream(socket.getOutputStream());
this.TAG = tag;
this.sendHandler = new SendHandler();
this.receiveHandler = new ReceiveHandler();
this.asyncPacketHandler = new AsyncPacketHandler();
this.connected = true;
}
public PacketTransfer(InputStream inputStream, OutputStream outputStream, String tag) {
this(null, inputStream, outputStream, tag);
}
public PacketTransfer(Socket socket, InputStream inputStream, OutputStream outputStream, String tag) {
this.socket = socket;
this.inputStream = new DataInputStream(inputStream);
this.outputStream = new DataOutputStream(outputStream);
this.TAG = tag;
this.sendHandler = new SendHandler();
this.receiveHandler = new ReceiveHandler();
this.asyncPacketHandler = new AsyncPacketHandler();
this.connected = true;
}
public ICommandPacket syncSend(int commandId) {
return syncSend(commandId, new EmptyCommandData());
}
public ICommandPacket syncSend(int commandId, ICommandData data) {
return syncSend(commandId, autoRequestId.getAndIncrement(), data);
}
public ICommandPacket syncSend(int commandId, int requestId, ICommandData data) {
ICommandPacket sendPacket;
try {
sendPacket = buildPacketFromData(commandId, requestId, data);
} catch (IOException e) {
logger.error("序列化数据时发生错误", e);
return null;
}
return syncSend(sendPacket);
}
public void asyncSend(int commandId) {
asyncSend(commandId, new EmptyCommandData());
}
public void asyncSend(int commandId, ICommandData data) {
asyncSend(commandId, autoRequestId.getAndIncrement(), data);
}
public void asyncSend(int commandId, int requestId, ICommandData data) {
ICommandPacket sendPacket;
try {
sendPacket = buildPacketFromData(commandId, requestId, data);
} catch (IOException e) {
logger.error("序列化数据时发生错误", e);
return;
}
asyncSend(sendPacket);
}
private ICommandPacket buildPacketFromData(int commandId, int requestId, ICommandData data) throws IOException {
CommandPacket packet = new CommandPacket(commandId);
packet.setRequestId(requestId);
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
DataOutputStream dataOutput = new DataOutputStream(outStream);
data.encodeToOutput(dataOutput);
packet.setCarryingData(outStream.toByteArray());
return packet;
}
public ICommandPacket syncSend(ICommandPacket sendPacket) {
logger.debug("设备的nkAgent连接状态connected = " + connected);
if (!connected) return null;
SendTarget sendTarget = new SendTarget(sendPacket, false);
sendQueue.offer(sendTarget);
try {
synchronized (sendTarget.lock) {
sendTarget.lock.wait(SYNC_TIMEOUT * 1000);
}
} catch (InterruptedException e) {
return null;
}
return sendTarget.responsePacket;
}
public void asyncSend(ICommandPacket sendPacket) {
if (!connected) return;
SendTarget sendTarget = new SendTarget(sendPacket, true);
sendQueue.offer(sendTarget);
}
/**
*
*
* @param listener
*/
public void addAsyncListener(AsyncPacketListener listener) {
synchronized (asyncPacketListeners) {
asyncPacketListeners.add(listener);
}
}
public void setOnDisconnectListener(Runnable onDisconnectListener) {
this.onDisconnectListener = onDisconnectListener;
}
/**
*
*
* @param listener
*/
public void removeAsyncListener(AsyncPacketListener listener) {
synchronized (asyncPacketListeners) {
asyncPacketListeners.remove(listener);
}
}
public void close() {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
logger.error("关闭Socket时发生IO异常", e);
}
}
}
public boolean isConnected() {
return connected;
}
private synchronized void onDisconnected() {
if (!connected) return;
connected = false;
sendHandler.stop();
receiveHandler.stop();
asyncPacketHandler.stop();
sendQueue.clear();
synchronized (syncWaitResponseTargetList) {
for (SendTarget target : syncWaitResponseTargetList) {
synchronized (target.lock) {
target.lock.notifyAll();
}
}
syncWaitResponseTargetList.clear();
}
synchronized (asyncPacketListeners) {
asyncPacketListeners.clear();
}
if (onDisconnectListener != null) {
try {
onDisconnectListener.run();
} catch (Exception e) {
logger.error("执行连接断开回调时发生异常", e);
}
}
}
/**
*
* <br>
*
*/
public class SendTarget implements PacketHandler {
private final Object lock = new Object();
final ICommandPacket packet;
final long time = System.currentTimeMillis();
private ICommandPacket responsePacket = null;
final boolean isAsync;
public SendTarget(ICommandPacket packet, boolean isAsync) {
this.packet = packet;
this.isAsync = isAsync;
}
public ICommandPacket getResponsePacket() throws SyncRequestTimeoutException {
if (isTimeout()) throw new SyncRequestTimeoutException(this);
return responsePacket;
}
public boolean isTimeout() {
return System.currentTimeMillis() - time > SYNC_TIMEOUT * 1000;
}
@Override
public void handle(PacketTransfer transfer, ICommandPacket packet) {
this.responsePacket = packet;
synchronized (lock) {
lock.notifyAll();
}
}
}
/**
*
*/
private class SendHandler implements Runnable {
private final Thread thread;
public SendHandler() {
thread = new Thread(this);
thread.setName("PacketTransfer-" + TAG + "-SendHandler");
thread.start();
}
@Override
public void run() {
while (!thread.isInterrupted()) {
SendTarget sendTarget;
try {
sendTarget = sendQueue.take();
} catch (InterruptedException e) {
break;
}
if (!sendTarget.isAsync) {
synchronized (syncWaitResponseTargetList) {
syncWaitResponseTargetList.add(sendTarget);
}
}
byte[] sendByteArray = sendTarget.packet.toByteArray();
logger.debug("发送commandId:" + sendTarget.packet.commandId() + " requestId:" + sendTarget.packet.getRequestId() + " dataLength:" + (sendByteArray.length - 12) + ":" + ByteUtils.bytesToHex(sendByteArray).substring(24));
try {
outputStream.write(sendByteArray);
logger.debug("发送commandId:" + sendTarget.packet.commandId() + " requestId:" + sendTarget.packet.getRequestId()+"数据完成");
} catch (IOException e) {
logger.error("PacketTransfer-" + TAG + "发送数据时发生IO异常", e);
break;
}
}
onDisconnected();
}
public void stop() {
if (!thread.isInterrupted()) {
thread.interrupt();
}
}
}
/**
*
*/
private class ReceiveHandler implements Runnable {
private final Thread thread;
public ReceiveHandler() {
thread = new Thread(this);
thread.setName("PacketTransfer-" + TAG + "-ReceiveHandler");
thread.start();
}
@Override
public void run() {
while (!thread.isInterrupted()) {
int commandId;
try {
commandId = inputStream.readInt();
} catch (IOException e) {
logger.error("PacketTransfer-" + TAG + "读取指令ID阶段发生IO异常", e);
break;
}
int requestId;
try {
requestId = inputStream.readInt();
} catch (IOException e) {
logger.error("PacketTransfer-" + TAG + "读取请求ID阶段发生IO异常", e);
break;
}
int dataLength;
try {
dataLength = inputStream.readInt();
} catch (IOException e) {
logger.error("PacketTransfer-" + TAG + "读取数据长度阶段发生IO异常", e);
break;
}
byte[] carryingData = new byte[dataLength];
try {
inputStream.readFully(carryingData);
} catch (IOException e) {
logger.error("PacketTransfer-" + TAG + "读取携带数据阶段发生IO异常", e);
break;
}
logger.debug("收到commandId:" + commandId + " requestId:" + requestId + " dataLength:" + dataLength + ":" + ByteUtils.bytesToHex(carryingData));
SendTarget sendTarget = null;
synchronized (syncWaitResponseTargetList) {
int foundIndex = -1;
for (int i = 0; i < syncWaitResponseTargetList.size(); i++) {
SendTarget target = syncWaitResponseTargetList.get(i);
if (target.packet.commandId() == commandId && requestId == target.packet.getRequestId()) {
sendTarget = target;
foundIndex = i;
break;
}
}
if (sendTarget != null) {
syncWaitResponseTargetList.remove(foundIndex);
}
}
if (sendTarget != null) {
//同步请求响应
ICommandPacket packet = sendTarget.packet.createResponse(carryingData);
sendTarget.handle(PacketTransfer.this, packet);
} else {
//异步请求响应
ArrayList<AsyncPacketListener> matchListener = new ArrayList<>();
synchronized (asyncPacketListeners) {
for (AsyncPacketListener listener : asyncPacketListeners) {
if (listener.getCommandId() == commandId && listener.getRequestId() == requestId) {
matchListener.add(listener);
}
}
}
System.out.println("match listener:" + matchListener.size());
if (!matchListener.isEmpty()) {
CommandPacket packet = new CommandPacket(commandId);
packet.setRequestId(requestId);
packet.setCarryingData(carryingData);
for (AsyncPacketListener listener : matchListener) {
asyncPacketHandler.jobQueue.offer(new AsyncPacketHandleJob(listener, packet));
}
}
}
}
onDisconnected();
}
public void stop() {
if (!thread.isInterrupted()) {
thread.interrupt();
}
}
}
/**
* 线线
*/
private class AsyncPacketHandler implements Runnable {
private final Thread thread;
private final ArrayBlockingQueue<AsyncPacketHandleJob> jobQueue = new ArrayBlockingQueue<>(10000);
public AsyncPacketHandler() {
thread = new Thread(this);
thread.setName("PacketTransfer-" + TAG + "-AsyncPacketHandler");
thread.start();
}
@Override
public void run() {
while (!thread.isInterrupted()) {
AsyncPacketHandleJob job;
try {
job = jobQueue.take();
} catch (InterruptedException e) {
break;
}
job.listener.tryHandle(PacketTransfer.this, job.packet);
}
}
public void stop() {
if (!thread.isInterrupted()) {
thread.interrupt();
}
}
}
private static class AsyncPacketHandleJob {
final AsyncPacketListener listener;
final ICommandPacket packet;
public AsyncPacketHandleJob(AsyncPacketListener listener, ICommandPacket packet) {
this.listener = listener;
this.packet = packet;
}
}
public class SyncRequestTimeoutException extends Exception {
public SyncRequestTimeoutException(SendTarget sendTarget) {
super("PacketTransfer-" + TAG + "请求发生超时指令ID" + sendTarget.packet.commandId() + " 请求ID" + sendTarget.packet.getRequestId() + " 创建时间:" + sendTarget.time);
}
}
}

View File

@ -0,0 +1,36 @@
package net.northking.cctp.upperComputer.driver.protocol.packet;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class TextCommandData implements ICommandData {
private String text = "";
public String getText() {
return text;
}
public void setText(String text) {
if (text == null) throw new IllegalArgumentException("此处text不能为null");
this.text = text;
}
@Override
public void buildFromInput(DataInput dataInput) throws IOException {
text = CodecUtils.readText(dataInput);
}
@Override
public void encodeToOutput(DataOutput dataOutput) throws IOException {
CodecUtils.writeText(dataOutput, text);
}
@Override
public String toString() {
return "TextCommandData{" +
"text='" + text + '\'' +
'}';
}
}