增加RPC录像列表查询

This commit is contained in:
648540858
2024-12-13 14:35:44 +08:00
parent 152a4954aa
commit 3e78af64de
25 changed files with 409 additions and 204 deletions

View File

@@ -4,7 +4,7 @@ import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.event.record.RecordEndEventListener;
import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEventListener;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
@@ -52,7 +52,7 @@ public class RecordInfoQueryMessageHandler extends SIPRequestProcessorParent imp
private SIPCommander commander;
@Autowired
private RecordEndEventListener recordEndEventListener;
private RecordInfoEventListener recordInfoEventListener;
@Override
public void afterPropertiesSet() throws Exception {
@@ -126,7 +126,7 @@ public class RecordInfoQueryMessageHandler extends SIPRequestProcessorParent imp
// 获取通道的原始信息
DeviceChannel deviceChannel = deviceChannelService.getOneForSourceById(channel.getGbId());
// 接收录像数据
recordEndEventListener.addEndEventHandler(device.getDeviceId(), deviceChannel.getDeviceId(), (recordInfo)->{
recordInfoEventListener.addEndEventHandler(device.getDeviceId(), deviceChannel.getDeviceId(), (recordInfo)->{
try {
log.info("[国标级联] 录像查询收到数据, 通道: {},准备转发===", channelId);
cmderFroPlatform.recordInfo(channel, platform, request.getFromTag(), recordInfo);

View File

@@ -6,6 +6,8 @@ import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.RecordInfo;
import com.genersoft.iot.vmp.gb28181.bean.RecordItem;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEndEvent;
import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEvent;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
@@ -19,6 +21,7 @@ import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
@@ -48,14 +51,7 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
private ResponseMessageHandler responseMessageHandler;
@Autowired
private DeferredResultHolder deferredResultHolder;
@Autowired
private EventPublisher eventPublisher;
@Qualifier("taskExecutor")
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
private ApplicationEventPublisher applicationEventPublisher;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@@ -75,88 +71,89 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
}catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 国标级联 国标录像: {}", e.getMessage());
}
taskExecutor.execute(()->{
try {
String sn = getText(rootElement, "SN");
String channelId = getText(rootElement, "DeviceID");
RecordInfo recordInfo = new RecordInfo();
recordInfo.setChannelId(channelId);
recordInfo.setDeviceId(device.getDeviceId());
recordInfo.setSn(sn);
recordInfo.setName(getText(rootElement, "Name"));
String sumNumStr = getText(rootElement, "SumNum");
int sumNum = 0;
if (!ObjectUtils.isEmpty(sumNumStr)) {
sumNum = Integer.parseInt(sumNumStr);
}
recordInfo.setSumNum(sumNum);
Element recordListElement = rootElement.element("RecordList");
if (recordListElement == null || sumNum == 0) {
log.info("无录像数据");
recordInfo.setCount(sumNum);
eventPublisher.recordEndEventPush(recordInfo);
releaseRequest(device.getDeviceId(), sn,recordInfo);
} else {
Iterator<Element> recordListIterator = recordListElement.elementIterator();
if (recordListIterator != null) {
List<RecordItem> recordList = new ArrayList<>();
// 遍历DeviceList
while (recordListIterator.hasNext()) {
Element itemRecord = recordListIterator.next();
Element recordElement = itemRecord.element("DeviceID");
if (recordElement == null) {
log.info("记录为空,下一个...");
continue;
}
RecordItem record = new RecordItem();
record.setDeviceId(getText(itemRecord, "DeviceID"));
record.setName(getText(itemRecord, "Name"));
record.setFilePath(getText(itemRecord, "FilePath"));
record.setFileSize(getText(itemRecord, "FileSize"));
record.setAddress(getText(itemRecord, "Address"));
String startTimeStr = getText(itemRecord, "StartTime");
record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr));
String endTimeStr = getText(itemRecord, "EndTime");
record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr));
record.setSecrecy(itemRecord.element("Secrecy") == null ? 0
: Integer.parseInt(getText(itemRecord, "Secrecy")));
record.setType(getText(itemRecord, "Type"));
record.setRecorderId(getText(itemRecord, "RecorderID"));
recordList.add(record);
}
Map<String, String> map = recordList.stream()
.filter(record -> record.getDeviceId() != null)
.collect(Collectors.toMap(record -> record.getStartTime()+ record.getEndTime(), UJson::writeJson));
// 获取任务结果数据
String resKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_PRE + channelId + sn;
redisTemplate.opsForHash().putAll(resKey, map);
redisTemplate.expire(resKey, recordInfoTtl, TimeUnit.SECONDS);
String resCountKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_COUNT_PRE + channelId + sn;
long incr = redisTemplate.opsForValue().increment(resCountKey, map.size());
redisTemplate.expire(resCountKey, recordInfoTtl, TimeUnit.SECONDS);
recordInfo.setRecordList(recordList);
recordInfo.setCount(Math.toIntExact(incr));
eventPublisher.recordEndEventPush(recordInfo);
if (incr < sumNum) {
return;
}
// 已接收完成
List<RecordItem> resList = redisTemplate.opsForHash().entries(resKey).values().stream().map(e -> UJson.readJson(e.toString(), RecordItem.class)).collect(Collectors.toList());
if (resList.size() < sumNum) {
return;
}
recordInfo.setRecordList(resList);
releaseRequest(device.getDeviceId(), sn,recordInfo);
}
}
} catch (Exception e) {
log.error("[国标录像] 发现未处理的异常, \r\n{}", evt.getRequest());
log.error("[国标录像] 异常内容: ", e);
try {
String sn = getText(rootElement, "SN");
String channelId = getText(rootElement, "DeviceID");
RecordInfo recordInfo = new RecordInfo();
recordInfo.setChannelId(channelId);
recordInfo.setDeviceId(device.getDeviceId());
recordInfo.setSn(sn);
recordInfo.setName(getText(rootElement, "Name"));
String sumNumStr = getText(rootElement, "SumNum");
int sumNum = 0;
if (!ObjectUtils.isEmpty(sumNumStr)) {
sumNum = Integer.parseInt(sumNumStr);
}
});
recordInfo.setSumNum(sumNum);
Element recordListElement = rootElement.element("RecordList");
if (recordListElement == null || sumNum == 0) {
log.info("无录像数据");
recordInfo.setCount(sumNum);
recordInfoEventPush(recordInfo);
recordInfoEndEventPush(recordInfo);
} else {
Iterator<Element> recordListIterator = recordListElement.elementIterator();
if (recordListIterator != null) {
List<RecordItem> recordList = new ArrayList<>();
// 遍历DeviceList
while (recordListIterator.hasNext()) {
Element itemRecord = recordListIterator.next();
Element recordElement = itemRecord.element("DeviceID");
if (recordElement == null) {
log.info("记录为空,下一个...");
continue;
}
RecordItem record = new RecordItem();
record.setDeviceId(getText(itemRecord, "DeviceID"));
record.setName(getText(itemRecord, "Name"));
record.setFilePath(getText(itemRecord, "FilePath"));
record.setFileSize(getText(itemRecord, "FileSize"));
record.setAddress(getText(itemRecord, "Address"));
String startTimeStr = getText(itemRecord, "StartTime");
record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr));
String endTimeStr = getText(itemRecord, "EndTime");
record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr));
record.setSecrecy(itemRecord.element("Secrecy") == null ? 0
: Integer.parseInt(getText(itemRecord, "Secrecy")));
record.setType(getText(itemRecord, "Type"));
record.setRecorderId(getText(itemRecord, "RecorderID"));
recordList.add(record);
}
Map<String, String> map = recordList.stream()
.filter(record -> record.getDeviceId() != null)
.collect(Collectors.toMap(record -> record.getStartTime()+ record.getEndTime(), UJson::writeJson));
// 获取任务结果数据
String resKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_PRE + channelId + sn;
redisTemplate.opsForHash().putAll(resKey, map);
redisTemplate.expire(resKey, recordInfoTtl, TimeUnit.SECONDS);
String resCountKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_COUNT_PRE + channelId + sn;
Long incr = redisTemplate.opsForValue().increment(resCountKey, map.size());
if (incr == null) {
incr = 0L;
}
redisTemplate.expire(resCountKey, recordInfoTtl, TimeUnit.SECONDS);
recordInfo.setRecordList(recordList);
recordInfo.setCount(Math.toIntExact(incr));
recordInfoEventPush(recordInfo);
if (incr < sumNum) {
return;
}
// 已接收完成
List<RecordItem> resList = redisTemplate.opsForHash().entries(resKey).values().stream().map(e -> UJson.readJson(e.toString(), RecordItem.class)).collect(Collectors.toList());
if (resList.size() < sumNum) {
return;
}
recordInfo.setRecordList(resList);
recordInfoEndEventPush(recordInfo);
}
}
} catch (Exception e) {
log.error("[国标录像] 发现未处理的异常, \r\n{}", evt.getRequest());
log.error("[国标录像] 异常内容: ", e);
}
}
@Override
@@ -164,18 +161,31 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
}
public void releaseRequest(String deviceId, String sn,RecordInfo recordInfo){
String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + deviceId + sn;
// 对数据进行排序
if(recordInfo!=null && recordInfo.getRecordList()!=null) {
private void recordInfoEventPush(RecordInfo recordInfo) {
if (recordInfo == null) {
return;
}
if(recordInfo.getRecordList() != null) {
Collections.sort(recordInfo.getRecordList());
}else{
recordInfo.setRecordList(new ArrayList<>());
}
RecordInfoEvent outEvent = new RecordInfoEvent(this);
outEvent.setRecordInfo(recordInfo);
applicationEventPublisher.publishEvent(outEvent);
}
RequestMessage msg = new RequestMessage();
msg.setKey(key);
msg.setData(recordInfo);
deferredResultHolder.invokeAllResult(msg);
private void recordInfoEndEventPush(RecordInfo recordInfo) {
if (recordInfo == null) {
return;
}
if(recordInfo.getRecordList() != null) {
Collections.sort(recordInfo.getRecordList());
}else{
recordInfo.setRecordList(new ArrayList<>());
}
RecordInfoEndEvent outEvent = new RecordInfoEndEvent(this);
outEvent.setRecordInfo(recordInfo);
applicationEventPublisher.publishEvent(outEvent);
}
}