hook优化

This commit is contained in:
648540858
2024-03-31 00:28:45 +08:00
parent 0447b83c3a
commit 4548695a0b
63 changed files with 900 additions and 973 deletions

View File

@@ -6,8 +6,8 @@ import com.genersoft.iot.vmp.gb28181.event.device.RequestTimeoutEvent;
import com.genersoft.iot.vmp.gb28181.event.record.RecordEndEvent;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition.MobilePositionEvent;
import com.genersoft.iot.vmp.media.event.MediaServerOfflineEvent;
import com.genersoft.iot.vmp.media.event.MediaServerOnlineEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOfflineEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOnlineEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;

View File

@@ -1,26 +1,13 @@
package com.genersoft.iot.vmp.gb28181.session;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.event.MediaDepartureEvent;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

View File

@@ -14,12 +14,11 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribeForStreamPush;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@@ -279,11 +278,11 @@ public class SIPCommander implements ISIPCommander {
}
logger.info("{} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getSdpIp(), ssrcInfo.getPort());
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId());
subscribe.addSubscribe(hookSubscribe, (MediaServer mediaServerItemInUse, HookParam hookParam) -> {
Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServerItem.getId());
subscribe.addSubscribe(rtpHook, (hookData) -> {
if (event != null) {
event.response(mediaServerItemInUse, hookParam);
subscribe.removeSubscribe(hookSubscribe);
event.response(hookData);
subscribe.removeSubscribe(rtpHook);
}
});
String sdpIp;
@@ -453,13 +452,13 @@ public class SIPCommander implements ISIPCommander {
//ssrc
content.append("y=" + ssrcInfo.getSsrc() + "\r\n");
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcInfo.getStream(), mediaServerItem.getId());
// 添加订阅
subscribe.addSubscribe(hookSubscribe, (MediaServer mediaServerItemInUse, HookParam hookParam) -> {
subscribe.addSubscribe(rtpHook, (hookData) -> {
if (hookEvent != null) {
hookEvent.response(mediaServerItemInUse, hookParam);
hookEvent.response(hookData);
}
subscribe.removeSubscribe(hookSubscribe);
subscribe.removeSubscribe(rtpHook);
});
Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()), ssrcInfo.getSsrc());
@@ -554,19 +553,18 @@ public class SIPCommander implements ISIPCommander {
content.append("y=" + ssrcInfo.getSsrc() + "\r\n");//ssrc
logger.debug("此时请求下载信令的ssrc===>{}",ssrcInfo.getSsrc());
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcInfo.getStream(), mediaServerItem.getId());
// 添加订阅
CallIdHeader newCallIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), device.getTransport());
String callId= newCallIdHeader.getCallId();
subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
subscribe.addSubscribe(rtpHook, (hookData) -> {
logger.debug("sipc 添加订阅===callId {}",callId);
hookEvent.response(mediaServerItemInUse, hookParam);
subscribe.removeSubscribe(hookSubscribe);
hookSubscribe.getContent().put("regist", false);
hookSubscribe.getContent().put("schema", "rtsp");
hookEvent.response(hookData);
subscribe.removeSubscribe(rtpHook);
// 添加流注销的订阅注销了后向设备发送bye
subscribe.addSubscribe(hookSubscribe,
(mediaServerItemForEnd, hookParam1) -> {
Hook departureHook = Hook.getInstance(HookType.on_media_departure, "rtp", ssrcInfo.getStream(), mediaServerItem.getId());
subscribe.addSubscribe(departureHook,
(departureHookData) -> {
logger.info("[录像]下载结束, 发送BYE");
try {
streamByeCmd(device, channelId, ssrcInfo.getStream(), callId);
@@ -604,20 +602,20 @@ public class SIPCommander implements ISIPCommander {
}
logger.info("[语音喊话] {} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), sendRtpItem.getPort());
HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId());
subscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItemInUse, hookParam) -> {
Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServerItem.getId());
subscribe.addSubscribe(hook, (hookData) -> {
if (event != null) {
event.response(mediaServerItemInUse, hookParam);
subscribe.removeSubscribe(hookSubscribeForStreamChange);
event.response(hookData);
subscribe.removeSubscribe(hook);
}
});
CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), device.getTransport());
callIdHeader.setCallId(callId);
HookSubscribeForStreamPush hookSubscribeForStreamPush = HookSubscribeFactory.on_publish("rtp", stream, null, mediaServerItem.getId());
subscribe.addSubscribe(hookSubscribeForStreamPush, (mediaServerItemInUse, hookParam) -> {
Hook publishHook = Hook.getInstance(HookType.on_publish, "rtp", stream, mediaServerItem.getId());
subscribe.addSubscribe(publishHook, (hookData) -> {
if (eventForPush != null) {
eventForPush.response(mediaServerItemInUse, hookParam);
eventForPush.response(hookData);
}
});
//
@@ -1260,7 +1258,6 @@ public class SIPCommander implements ISIPCommander {
* @param startPriority 报警起始级别(可选)
* @param endPriority 报警终止级别(可选)
* @param alarmMethod 报警方式条件(可选)
* @param alarmType 报警类型
* @param startTime 报警发生起始时间(可选)
* @param endTime 报警发生终止时间(可选)
* @return true = 命令发送成功

View File

@@ -13,9 +13,9 @@ import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
@@ -905,11 +905,11 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
}
logger.info("{} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId());
subscribe.addSubscribe(hookSubscribe, (MediaServer mediaServerItemInUse, HookParam hookParam) -> {
Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServerItem.getId());
subscribe.addSubscribe(hook, (hookData) -> {
if (event != null) {
event.response(mediaServerItemInUse, hookParam);
subscribe.removeSubscribe(hookSubscribe);
event.response(hookData);
subscribe.removeSubscribe(hook);
}
});
String sdpIp = mediaServerItem.getSdpIp();
@@ -949,7 +949,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), request, (e -> {
streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream());
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
subscribe.removeSubscribe(hookSubscribe);
subscribe.removeSubscribe(hook);
errorEvent.response(e);
}), e -> {
ResponseEvent responseEvent = (ResponseEvent) e.event;

View File

@@ -18,8 +18,8 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
@@ -115,7 +115,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
private IMediaServerService mediaServerService;
@Autowired
private HookSubscribe zlmHttpHookSubscribe;
private HookSubscribe hookSubscribe;
@Autowired
private SIPProcessorObserver sipProcessorObserver;
@@ -723,17 +723,16 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
// TODO 控制启用以使设备上线
logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream());
// 监听流上线
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId());
zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> {
OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
logger.info("[上级点播]拉流代理已经就绪, {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream());
Hook hook = Hook.getInstance(HookType.on_media_arrival, gbStream.getApp(), gbStream.getStream(), mediaServerItem.getId());
this.hookSubscribe.addSubscribe(hook, (hookData) -> {
logger.info("[上级点播]拉流代理已经就绪, {}/{}", hookData.getApp(), hookData.getStream());
dynamicTask.stop(callIdHeader.getCallId());
pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
});
dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
logger.info("[ app={}, stream={} ] 等待拉流代理流超时", gbStream.getApp(), gbStream.getStream());
zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
this.hookSubscribe.removeSubscribe(hook);
}, userSetting.getPlatformPlayTimeout());
boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
if (!start) {
@@ -742,7 +741,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
}
zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
this.hookSubscribe.removeSubscribe(hook);
dynamicTask.stop(callIdHeader.getCallId());
}
} else if ("push".equals(gbStream.getStreamType())) {

View File

@@ -127,8 +127,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
// 消息发送成功, 向上级发送invite获取推流
try {
platformService.broadcastInvite(platform, deviceChannel.getChannelId(), mediaServerForMinimumLoad, (mediaServerItem, hookParam)->{
OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
platformService.broadcastInvite(platform, deviceChannel.getChannelId(), mediaServerForMinimumLoad, (hookData)->{
// 上级平台推流成功
AudioBroadcastCatch broadcastCatch = audioBroadcastManager.get(device.getDeviceId(), targetId);
if (broadcastCatch != null ) {
@@ -136,20 +135,20 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
logger.info("[国标级联] 语音喊话 设备正在使用中 platform {} channel: {}",
platform.getServerGBId(), deviceChannel.getChannelId());
// 查看语音通道已经建立且已经占用 回复BYE
platformService.stopBroadcast(platform, deviceChannel, streamChangedHookParam.getStream(), true, mediaServerItem);
platformService.stopBroadcast(platform, deviceChannel, hookData.getStream(), true, hookData.getMediaServer());
}else {
// 查看语音通道已经建立但是未占用
broadcastCatch.setApp(streamChangedHookParam.getApp());
broadcastCatch.setStream(streamChangedHookParam.getStream());
broadcastCatch.setMediaServerItem(mediaServerItem);
broadcastCatch.setApp(hookData.getApp());
broadcastCatch.setStream(hookData.getStream());
broadcastCatch.setMediaServerItem(hookData.getMediaServer());
audioBroadcastManager.update(broadcastCatch);
// 推流到设备
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, targetId, streamChangedHookParam.getStream(), null);
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, targetId, hookData.getStream(), null);
if (sendRtpItem == null) {
logger.warn("[国标级联] 语音喊话 异常,未找到发流信息, channelId: {}, stream: {}", targetId, streamChangedHookParam.getStream());
logger.info("[国标级联] 语音喊话 重新开始channelId: {}, stream: {}", targetId, streamChangedHookParam.getStream());
logger.warn("[国标级联] 语音喊话 异常,未找到发流信息, channelId: {}, stream: {}", targetId, hookData.getStream());
logger.info("[国标级联] 语音喊话 重新开始channelId: {}, stream: {}", targetId, hookData.getStream());
try {
playService.audioBroadcastCmd(device, targetId, mediaServerItem, streamChangedHookParam.getApp(), streamChangedHookParam.getStream(), 60, true, msg -> {
playService.audioBroadcastCmd(device, targetId, hookData.getMediaServer(), hookData.getApp(), hookData.getStream(), 60, true, msg -> {
logger.info("[语音喊话] 通道建立成功, device: {}, channel: {}", device.getDeviceId(), targetId);
});
} catch (SipException | InvalidArgumentException | ParseException e) {
@@ -157,7 +156,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
}
}else {
// 发流
JSONObject jsonObject = zlmServerFactory.startSendRtp(mediaServerItem, sendRtpItem);
JSONObject jsonObject = zlmServerFactory.startSendRtp(hookData.getMediaServer(), sendRtpItem);
if (jsonObject != null && jsonObject.getInteger("code") == 0 ) {
logger.info("[语音喊话] 自动推流成功, device: {}, channel: {}", device.getDeviceId(), targetId);
}else {
@@ -167,7 +166,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
}
}else {
try {
playService.audioBroadcastCmd(device, targetId, mediaServerItem, streamChangedHookParam.getApp(), streamChangedHookParam.getStream(), 60, true, msg -> {
playService.audioBroadcastCmd(device, targetId, hookData.getMediaServer(), hookData.getApp(), hookData.getStream(), 60, true, msg -> {
logger.info("[语音喊话] 通道建立成功, device: {}, channel: {}", device.getDeviceId(), targetId);
});
} catch (SipException | InvalidArgumentException | ParseException e) {

View File

@@ -13,9 +13,9 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@@ -106,9 +106,8 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
logger.error("[录像流]推送完毕,收到关流通知, 发送BYE失败 {}", e.getMessage());
}
// 去除监听流注销自动停止下载的监听
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcTransaction.getStream(), false, "rtsp", ssrcTransaction.getMediaServerId());
subscribe.removeSubscribe(hookSubscribe);
Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcTransaction.getStream(), ssrcTransaction.getMediaServerId());
subscribe.removeSubscribe(hook);
// 如果级联播放,需要给上级发送此通知 TODO 多个上级同时观看一个下级 可能存在停错的问题需要将点播CallId进行上下级绑定
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, ssrcTransaction.getChannelId(), null, null);
if (sendRtpItem != null) {