新增JT1078 Template支持

This commit is contained in:
QingObject
2023-04-28 10:10:06 +08:00
parent 813fd772d1
commit 97b673d6ad
29 changed files with 1868 additions and 0 deletions

View File

@@ -0,0 +1,146 @@
package com.genersoft.iot.vmp.jt1078.codec.decode;
import com.genersoft.iot.vmp.jt1078.proc.Header;
import com.genersoft.iot.vmp.jt1078.proc.factory.CodecFactory;
import com.genersoft.iot.vmp.jt1078.proc.request.Re;
import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
import com.genersoft.iot.vmp.jt1078.session.Session;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
/**
* @author QingtaiJiang
* @date 2023/4/27 18:10
* @email qingtaij@163.com
*/
public class Jt808Decoder extends ByteToMessageDecoder {
private final static Logger log = LoggerFactory.getLogger(Jt808Decoder.class);
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Session session = ctx.channel().attr(Session.KEY).get();
log.info("> {} hex:{}", session, ByteBufUtil.hexDump(in));
try {
ByteBuf buf = unEscapeAndCheck(in);
Header header = new Header();
header.setMsgId(ByteBufUtil.hexDump(buf.readSlice(2)));
header.setMsgPro(buf.readUnsignedShort());
if (header.is2019Version()) {
header.setVersion(buf.readUnsignedByte());
String devId = ByteBufUtil.hexDump(buf.readSlice(10));
header.setDevId(devId.replaceFirst("^0*", ""));
} else {
header.setDevId(ByteBufUtil.hexDump(buf.readSlice(6)).replaceFirst("^0*", ""));
}
header.setSn(buf.readUnsignedShort());
Re handler = CodecFactory.getHandler(header.getMsgId());
if (handler == null) {
log.error("get msgId is null {}", header.getMsgId());
return;
}
Rs decode = handler.decode(buf, header, session);
if (decode != null) {
out.add(decode);
}
} finally {
in.skipBytes(in.readableBytes());
}
}
/**
* 转义与验证校验码
*
* @param byteBuf 转义Buf
* @return 转义好的数据
*/
public ByteBuf unEscapeAndCheck(ByteBuf byteBuf) throws Exception {
int low = byteBuf.readerIndex();
int high = byteBuf.writerIndex();
byte checkSum = 0;
int calculationCheckSum = 0;
byte aByte = byteBuf.getByte(high - 2);
byte protocolEscapeFlag7d = 0x7d;
//0x7d转义
byte protocolEscapeFlag01 = 0x01;
//0x7e转义
byte protocolEscapeFlag02 = 0x02;
if (aByte == protocolEscapeFlag7d) {
byte b2 = byteBuf.getByte(high - 1);
if (b2 == protocolEscapeFlag01) {
checkSum = protocolEscapeFlag7d;
} else if (b2 == protocolEscapeFlag02) {
checkSum = 0x7e;
} else {
log.error("转义1异常:{}", ByteBufUtil.hexDump(byteBuf));
throw new Exception("转义错误");
}
high = high - 2;
} else {
high = high - 1;
checkSum = byteBuf.getByte(high);
}
List<ByteBuf> bufList = new ArrayList<>();
int index = low;
while (index < high) {
byte b = byteBuf.getByte(index);
if (b == protocolEscapeFlag7d) {
byte c = byteBuf.getByte(index + 1);
if (c == protocolEscapeFlag01) {
ByteBuf slice = slice0x01(byteBuf, low, index);
bufList.add(slice);
b = protocolEscapeFlag7d;
} else if (c == protocolEscapeFlag02) {
ByteBuf slice = slice0x02(byteBuf, low, index);
bufList.add(slice);
b = 0x7e;
} else {
log.error("转义2异常:{}", ByteBufUtil.hexDump(byteBuf));
throw new Exception("转义错误");
}
index += 2;
low = index;
} else {
index += 1;
}
calculationCheckSum = calculationCheckSum ^ b;
}
if (calculationCheckSum == checkSum) {
if (bufList.size() == 0) {
return byteBuf.slice(low, high);
} else {
bufList.add(byteBuf.slice(low, high - low));
return new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, bufList.size(), bufList);
}
} else {
log.info("{} 解析校验码:{}--计算校验码:{}", ByteBufUtil.hexDump(byteBuf), checkSum, calculationCheckSum);
throw new Exception("校验码错误!");
}
}
private ByteBuf slice0x01(ByteBuf buf, int low, int sign) {
return buf.slice(low, sign - low + 1);
}
private ByteBuf slice0x02(ByteBuf buf, int low, int sign) {
buf.setByte(sign, 0x7e);
return buf.slice(low, sign - low + 1);
}
}

View File

@@ -0,0 +1,33 @@
package com.genersoft.iot.vmp.jt1078.codec.encode;
import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
import com.genersoft.iot.vmp.jt1078.session.Session;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author QingtaiJiang
* @date 2023/4/27 18:10
* @email qingtaij@163.com
*/
public class Jt808Encoder extends MessageToByteEncoder<Rs> {
private final static Logger log = LoggerFactory.getLogger(Jt808Encoder.class);
@Override
protected void encode(ChannelHandlerContext ctx, Rs msg, ByteBuf out) throws Exception {
Session session = ctx.channel().attr(Session.KEY).get();
ByteBuf encode = Jt808EncoderCmd.encode(msg, session, session.nextSerialNo());
if(encode!=null){
log.info("< {} hex:{}", session, ByteBufUtil.hexDump(encode));
out.writeBytes(encode);
}
}
}

View File

@@ -0,0 +1,151 @@
package com.genersoft.iot.vmp.jt1078.codec.encode;
import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
import com.genersoft.iot.vmp.jt1078.proc.Header;
import com.genersoft.iot.vmp.jt1078.proc.entity.Cmd;
import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
import com.genersoft.iot.vmp.jt1078.session.Session;
import com.genersoft.iot.vmp.jt1078.util.Bin;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.ByteProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import java.util.LinkedList;
/**
* @author QingtaiJiang
* @date 2023/4/27 18:25
* @email qingtaij@163.com
*/
public class Jt808EncoderCmd extends MessageToByteEncoder<Cmd> {
private final static Logger log = LoggerFactory.getLogger(Jt808EncoderCmd.class);
@Override
protected void encode(ChannelHandlerContext ctx, Cmd cmd, ByteBuf out) throws Exception {
Session session = ctx.channel().attr(Session.KEY).get();
Rs msg = cmd.getRs();
ByteBuf encode = encode(msg, session, cmd.getPackageNo().intValue());
if (encode != null) {
log.info("< {} hex:{}", session, ByteBufUtil.hexDump(encode));
out.writeBytes(encode);
}
}
public static ByteBuf encode(Rs msg, Session session, Integer packageNo) {
String id = msg.getClass().getAnnotation(MsgId.class).id();
if (!StringUtils.hasLength(id)) {
log.error("Not find msgId");
return null;
}
ByteBuf byteBuf = Unpooled.buffer();
byteBuf.writeBytes(ByteBufUtil.decodeHexDump(id));
ByteBuf encode = msg.encode();
Header header = msg.getHeader();
if (header == null) {
header = session.getHeader();
}
if (header.is2019Version()) {
// 消息体属性
byteBuf.writeShort(encode.readableBytes() | 1 << 14);
// 版本号
byteBuf.writeByte(header.getVersion());
// 终端手机号
byteBuf.writeBytes(ByteBufUtil.decodeHexDump(Bin.strHexPaddingLeft(header.getDevId(), 20)));
} else {
// 消息体属性
byteBuf.writeShort(encode.readableBytes());
byteBuf.writeBytes(ByteBufUtil.decodeHexDump(Bin.strHexPaddingLeft(header.getDevId(), 12)));
}
// 消息体流水号
byteBuf.writeShort(packageNo);
// 写入消息体
byteBuf.writeBytes(encode);
// 计算校验码,并反转义
byteBuf = escapeAndCheck0(byteBuf);
return byteBuf;
}
private static final ByteProcessor searcher = value -> !(value == 0x7d || value == 0x7e);
//转义与校验
public static ByteBuf escapeAndCheck0(ByteBuf source) {
sign(source);
int low = source.readerIndex();
int high = source.writerIndex();
LinkedList<ByteBuf> bufList = new LinkedList<>();
int mark, len;
while ((mark = source.forEachByte(low, high - low, searcher)) > 0) {
len = mark + 1 - low;
ByteBuf[] slice = slice(source, low, len);
bufList.add(slice[0]);
bufList.add(slice[1]);
low += len;
}
if (bufList.size() > 0) {
bufList.add(source.slice(low, high - low));
} else {
bufList.add(source);
}
ByteBuf delimiter = Unpooled.buffer(1, 1).writeByte(0x7e).retain();
bufList.addFirst(delimiter);
bufList.addLast(delimiter);
CompositeByteBuf byteBufLs = Unpooled.compositeBuffer(bufList.size());
byteBufLs.addComponents(true, bufList);
return byteBufLs;
}
public static void sign(ByteBuf buf) {
byte checkCode = bcc(buf);
buf.writeByte(checkCode);
}
public static byte bcc(ByteBuf byteBuf) {
byte cs = 0;
while (byteBuf.isReadable())
cs ^= byteBuf.readByte();
byteBuf.resetReaderIndex();
return cs;
}
protected static ByteBuf[] slice(ByteBuf byteBuf, int index, int length) {
byte first = byteBuf.getByte(index + length - 1);
ByteBuf[] byteBufList = new ByteBuf[2];
byteBufList[0] = byteBuf.retainedSlice(index, length);
if (first == 0x7d) {
byteBufList[1] = Unpooled.buffer(1, 1).writeByte(0x01);
} else {
byteBuf.setByte(index + length - 1, 0x7d);
byteBufList[1] = Unpooled.buffer(1, 1).writeByte(0x02);
}
return byteBufList;
}
}

View File

@@ -0,0 +1,72 @@
package com.genersoft.iot.vmp.jt1078.codec.netty;
import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
import com.genersoft.iot.vmp.jt1078.session.Session;
import com.genersoft.iot.vmp.jt1078.session.SessionManager;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author QingtaiJiang
* @date 2023/4/27 18:14
* @email qingtaij@163.com
*/
public class Jt808Handler extends ChannelInboundHandlerAdapter {
private final static Logger log = LoggerFactory.getLogger(Jt808Handler.class);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof Rs) {
ctx.writeAndFlush(msg);
} else {
ctx.fireChannelRead(msg);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
Session session = SessionManager.INSTANCE.newSession(channel);
channel.attr(Session.KEY).set(session);
log.info("> Tcp connect {}", session);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
Session session = ctx.channel().attr(Session.KEY).get();
log.info("< Tcp disconnect {}", session);
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
Session session = ctx.channel().attr(Session.KEY).get();
String message = e.getMessage();
if (message.toLowerCase().contains("Connection reset by peer".toLowerCase())) {
log.info("< exception{} {}", session, e.getMessage());
} else {
log.info("< exception{} {}", session, e.getMessage(), e);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
IdleState state = event.state();
if (state == IdleState.READER_IDLE || state == IdleState.WRITER_IDLE) {
Session session = ctx.channel().attr(Session.KEY).get();
log.warn("< Proactively disconnect{}", session);
ctx.close();
}
}
}
}

View File

@@ -0,0 +1,112 @@
package com.genersoft.iot.vmp.jt1078.codec.netty;
import com.genersoft.iot.vmp.jt1078.codec.decode.Jt808Decoder;
import com.genersoft.iot.vmp.jt1078.codec.encode.Jt808Encoder;
import com.genersoft.iot.vmp.jt1078.codec.encode.Jt808EncoderCmd;
import com.genersoft.iot.vmp.jt1078.proc.factory.CodecFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
/**
* @author QingtaiJiang
* @date 2023/4/27 18:01
* @email qingtaij@163.com
*/
public class TcpServer {
private final static Logger log = LoggerFactory.getLogger(TcpServer.class);
private final Integer port;
private boolean isRunning = false;
private EventLoopGroup bossGroup = null;
private EventLoopGroup workerGroup = null;
private final ByteBuf DECODER_JT808 = Unpooled.wrappedBuffer(new byte[]{0x7e});
public TcpServer(Integer port) {
this.port = port;
}
private void startTcpServer() {
try {
CodecFactory.init();
this.bossGroup = new NioEventLoopGroup();
this.workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.group(bossGroup, workerGroup);
bootstrap.option(NioChannelOption.SO_BACKLOG, 1024)
.option(NioChannelOption.SO_REUSEADDR, true)
.childOption(NioChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
public void initChannel(NioSocketChannel channel) {
channel.pipeline()
.addLast(new IdleStateHandler(10, 0, 0, TimeUnit.MINUTES))
.addLast(new DelimiterBasedFrameDecoder(1024 * 2, DECODER_JT808))
.addLast(new Jt808Decoder())
.addLast(new Jt808Encoder())
.addLast(new Jt808EncoderCmd())
.addLast(new Jt808Handler());
}
});
ChannelFuture channelFuture = bootstrap.bind(port).sync();
// 监听设备TCP端口是否启动成功
channelFuture.addListener(future -> {
if (!future.isSuccess()) {
log.error("Binding port:{} fail! cause: {}", port, future.cause().getCause(), future.cause());
}
});
log.info("服务:JT808 Server 启动成功, port:{}", port);
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.warn("服务:JT808 Server 启动异常, port:{},{}", port, e.getMessage(), e);
} finally {
stop();
}
}
/**
* 开启一个新的线程,拉起来Netty
*/
public synchronized void start() {
if (this.isRunning) {
log.warn("服务:JT808 Server 已经启动, port:{}", port);
return;
}
this.isRunning = true;
new Thread(this::startTcpServer).start();
}
public synchronized void stop() {
if (!this.isRunning) {
log.warn("服务:JT808 Server 已经停止, port:{}", port);
}
this.isRunning = false;
Future<?> future = this.bossGroup.shutdownGracefully();
if (!future.isSuccess()) {
log.warn("bossGroup 无法正常停止", future.cause());
}
future = this.workerGroup.shutdownGracefully();
if (!future.isSuccess()) {
log.warn("workerGroup 无法正常停止", future.cause());
}
log.warn("服务:JT808 Server 已经停止, port:{}", port);
}
}