优化文档, 优化notify-catalog大并发处理

This commit is contained in:
648540858
2024-10-11 15:24:46 +08:00
parent 028274a088
commit 3078162c58
45 changed files with 842 additions and 392 deletions

View File

@@ -1,15 +1,12 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.CatalogChannelEvent;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import lombok.extern.slf4j.Slf4j;
@@ -27,10 +24,7 @@ import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* SIP命令类型 NOTIFY请求中的目录请求处理
@@ -39,11 +33,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
@Component
public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent {
private final List<DeviceChannel> updateChannelForStatusChange = new CopyOnWriteArrayList<>();
private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>();
private final List<DeviceChannel> deleteChannelList = new CopyOnWriteArrayList<>();
private final ConcurrentLinkedQueue<NotifyCatalogChannel> channelList = new ConcurrentLinkedQueue<>();
private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
@@ -64,7 +54,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
// log.warn("[notify-目录订阅] 待处理消息数量: {}", taskQueue.size() );
// }
@Transactional
public void process(RequestEvent evt) {
if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
log.error("[notify-目录订阅] 待处理消息队列已满 {}返回486 BUSY_HERE消息不做处理", userSetting.getMaxNotifyCountQueue());
@@ -79,8 +68,12 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
return;
}
List<HandlerCatchData> handlerCatchDataList = new ArrayList<>();
while (!taskQueue.isEmpty()) {
handlerCatchDataList.add(taskQueue.poll());
int size = taskQueue.size();
for (int i = 0; i < size; i++) {
HandlerCatchData poll = taskQueue.poll();
if (poll != null) {
handlerCatchDataList.add(poll);
}
}
if (handlerCatchDataList.isEmpty()) {
return;
@@ -91,23 +84,23 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
}
RequestEvent evt = take.getEvt();
try {
long start = System.currentTimeMillis();
FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
Device device = redisCatchStorage.getDevice(deviceId);
if (device == null || !device.isOnLine()) {
log.warn("[收到目录订阅]{}, 但是设备已经离线", (device != null ? device.getDeviceId() : ""));
return;
continue;
}
Element rootElement = getRootElement(evt, device.getCharset());
if (rootElement == null) {
log.warn("[ 收到目录订阅 ] content cannot be null, {}", evt.getRequest());
return;
continue;
}
Element deviceListElement = rootElement.element("DeviceList");
if (deviceListElement == null) {
return;
log.warn("[ 收到目录订阅 ] content cannot be null, {}", evt.getRequest());
continue;
}
Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
if (deviceListIterator != null) {
@@ -139,7 +132,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
// 上线
log.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId());
channel.setStatus("ON");
updateChannelForStatusChange.add(channel);
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel));
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId(), true);
@@ -152,7 +145,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
log.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId());
} else {
channel.setStatus("OFF");
updateChannelForStatusChange.add(channel);
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel));
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId(), false);
@@ -166,7 +159,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
log.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId());
} else {
channel.setStatus("OFF");
updateChannelForStatusChange.add(channel);
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel));
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId(), false);
@@ -180,7 +173,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
log.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId());
} else {
channel.setStatus("OFF");
updateChannelForStatusChange.add(channel);
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel));
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId(), false);
@@ -191,18 +184,17 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
// 增加
log.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId());
// 判断此通道是否存在
DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, catalogChannelEvent.getChannel().getDeviceId());
DeviceChannel deviceChannel = deviceChannelService.getOneForSource(device.getId(), catalogChannelEvent.getChannel().getDeviceId());
if (deviceChannel != null) {
log.info("[增加通道] 已存在,不发送通知只更新,设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId());
channel.setId(deviceChannel.getId());
channel.setHasAudio(deviceChannel.isHasAudio());
channel.setUpdateTime(DateUtil.getNow());
updateChannelMap.put(catalogChannelEvent.getChannel().getDeviceId(), channel);
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.UPDATE, channel));
} else {
catalogChannelEvent.getChannel().setUpdateTime(DateUtil.getNow());
catalogChannelEvent.getChannel().setCreateTime(DateUtil.getNow());
addChannelMap.put(catalogChannelEvent.getChannel().getDeviceId(), catalogChannelEvent.getChannel());
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.ADD, channel));
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId(), true);
@@ -213,7 +205,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
case CatalogEvent.DEL:
// 删除
log.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId());
deleteChannelList.add(catalogChannelEvent.getChannel());
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.DELETE, channel));
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId(), false);
@@ -223,17 +215,17 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
// 更新
log.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId());
// 判断此通道是否存在
DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, catalogChannelEvent.getChannel().getDeviceId());
DeviceChannel deviceChannelForUpdate = deviceChannelService.getOneForSource(device.getId(), catalogChannelEvent.getChannel().getDeviceId());
if (deviceChannelForUpdate != null) {
channel.setId(deviceChannelForUpdate.getId());
channel.setHasAudio(deviceChannelForUpdate.isHasAudio());
channel.setUpdateTime(DateUtil.getNow());
channel.setUpdateTime(DateUtil.getNow());
updateChannelMap.put(catalogChannelEvent.getChannel().getDeviceId(), channel);
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.UPDATE, channel));
} else {
catalogChannelEvent.getChannel().setCreateTime(DateUtil.getNow());
catalogChannelEvent.getChannel().setUpdateTime(DateUtil.getNow());
addChannelMap.put(catalogChannelEvent.getChannel().getDeviceId(), catalogChannelEvent.getChannel());
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.ADD, channel));
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId(), true);
@@ -253,66 +245,39 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
log.error("未处理的异常 ", e);
}
}
taskQueue.clear();
if (!updateChannelMap.keySet().isEmpty()
|| !addChannelMap.keySet().isEmpty()
|| !updateChannelForStatusChange.isEmpty()
|| !deleteChannelList.isEmpty()) {
if (!channelList.isEmpty()) {
executeSave();
}
}
public void executeSave(){
try {
executeSaveForAdd();
} catch (Exception e) {
log.error("[存储收到的增加通道] 异常: ", e );
@Transactional
public void executeSave() {
int size = channelList.size();
List<NotifyCatalogChannel> channelListForSave = new ArrayList<>();
for (int i = 0; i < size; i++) {
channelListForSave.add(channelList.poll());
}
try {
executeSaveForStatus();
} catch (Exception e) {
log.error("[存储收到的通道状态变化] 异常: ", e );
}
try {
executeSaveForUpdate();
} catch (Exception e) {
log.error("[存储收到的更新通道] 异常: ", e );
}
try {
executeSaveForDelete();
} catch (Exception e) {
log.error("[存储收到的删除通道] 异常: ", e );
}
}
private void executeSaveForUpdate(){
if (!updateChannelMap.values().isEmpty()) {
log.info("[存储收到的更新通道], 数量: {}", updateChannelMap.size());
ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
deviceChannelService.batchUpdateChannelForNotify(deviceChannels);
updateChannelMap.clear();
}
}
private void executeSaveForAdd(){
if (!addChannelMap.values().isEmpty()) {
ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(addChannelMap.values());
addChannelMap.clear();
deviceChannelService.batchAddChannel(deviceChannels);
}
}
private void executeSaveForDelete(){
if (!deleteChannelList.isEmpty()) {
deviceChannelService.deleteChannelsForNotify(deleteChannelList);
deleteChannelList.clear();
}
}
private void executeSaveForStatus(){
if (!updateChannelForStatusChange.isEmpty()) {
deviceChannelService.updateChannelsStatus(updateChannelForStatusChange);
updateChannelForStatusChange.clear();
for (NotifyCatalogChannel notifyCatalogChannel : channelListForSave) {
try {
switch (notifyCatalogChannel.getType()) {
case STATUS_CHANGED:
deviceChannelService.updateChannelStatus(notifyCatalogChannel.getChannel());
break;
case ADD:
deviceChannelService.addChannel(notifyCatalogChannel.getChannel());
break;
case UPDATE:
deviceChannelService.updateChannelForNotify(notifyCatalogChannel.getChannel());
break;
case DELETE:
deviceChannelService.delete(notifyCatalogChannel.getChannel());
break;
}
}catch (Exception e) {
log.error("[存储收到的通道]类型:{},编号:{}", notifyCatalogChannel.getType(),
notifyCatalogChannel.getChannel().getDeviceId(), e);
}
}
}
}