临时提交

This commit is contained in:
648540858
2024-09-09 00:07:00 +08:00
parent 3fafe83e88
commit 489fbe31a5
21 changed files with 279 additions and 338 deletions

View File

@@ -16,6 +16,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
@@ -77,6 +78,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
@Autowired
private IPlayService playService;
@Autowired
private ISendRtpServerService sendRtpServerService;
/**
* 处理 ACK请求
@@ -88,7 +92,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
String fromUserId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
String toUserId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
log.info("[收到ACK] 来自->{}", fromUserId);
SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callIdHeader.getCallId());
if (sendRtpItem == null) {
log.warn("[收到ACK]:未找到来自{}callId: {}", fromUserId, callIdHeader.getCallId());
return;
@@ -112,7 +116,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
if (parentPlatform != null) {
DeviceChannel deviceChannel = deviceChannelService.getOneById(sendRtpItem.getChannelId());
if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem.getRedisKey(), sendRtpItem);
WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem.getChannelId(), sendRtpItem);
if (wvpResult.getCode() == 0) {
redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, deviceChannel, parentPlatform);
}

View File

@@ -16,6 +16,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@@ -46,7 +47,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
private ISIPCommander cmder;
@Autowired
private ISIPCommanderForPlatform commanderForPlatform;
private ISendRtpServerService sendRtpServerService;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@@ -112,7 +113,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
log.error("[回复BYE信息失败]{}", e.getMessage());
}
CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callIdHeader.getCallId());
// 收流端发送的停止
if (sendRtpItem != null){
@@ -128,11 +129,11 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
if (platform != null) {
redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform, channel);
if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
redisRpcService.stopSendRtp(sendRtpItem.getRedisKey());
redisCatchStorage.deleteSendRTPServer(null, null, sendRtpItem.getCallId(), null);
redisRpcService.stopSendRtp(sendRtpItem.getChannelId());
sendRtpServerService.deleteByCallId(sendRtpItem.getCallId());
}else {
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(null, null, callIdHeader.getCallId(), null);
sendRtpServerService.deleteByCallId(callIdHeader.getCallId());
mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
if (userSetting.getUseCustomSsrcForParentInvite()) {
mediaServerService.releaseSsrc(mediaServer.getId(), sendRtpItem.getSsrc());
@@ -143,8 +144,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
}
}else {
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
callIdHeader.getCallId(), null);
sendRtpServerService.delete(sendRtpItem);
mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
if (userSetting.getUseCustomSsrcForParentInvite()) {
mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());

View File

@@ -20,8 +20,8 @@ import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@@ -75,13 +75,13 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
private IGbChannelPlayService channelPlayService;
@Autowired
private IStreamProxyService streamProxyService;
private ISendRtpServerService sendRtpServerService;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private IRedisRpcService redisRpcService;
private IMediaServerService mediaServerService;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@@ -101,9 +101,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
@Autowired
private AudioBroadcastManager audioBroadcastManager;
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private HookSubscribe hookSubscribe;
@@ -182,7 +179,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}else {
// 点播成功, TODO 可以在此处检测cancel命令是否存在存在则不发送
// 构建sendRTP内容
SendRtpInfo sendRtpItem = mediaServerService.createSendRtpItem(streamInfo.getMediaServer(),
SendRtpInfo sendRtpItem = sendRtpServerService.createSendRtpInfo(streamInfo.getMediaServer(),
inviteInfo.getIp(), inviteInfo.getPort(), inviteInfo.getSsrc(), platform.getServerGBId(),
streamInfo.getApp(), streamInfo.getStream(),
channel.getGbId(), inviteInfo.isTcp(), platform.isRtcp());
@@ -193,7 +190,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
sendRtpItem.setCallId(inviteInfo.getCallId());
sendRtpItem.setPlayType("Play".equalsIgnoreCase(inviteInfo.getSessionName()) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK);
redisCatchStorage.updateSendRTPSever(sendRtpItem);
sendRtpServerService.update(sendRtpItem);
String sdpIp = streamInfo.getMediaServer().getSdpIp();
if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) {
sdpIp = platform.getSendStreamIp();
@@ -733,7 +730,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
private void sendBye(Platform platform, String callId) {
try {
SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(platform.getServerGBId(), null, null, callId);
SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId);
if (sendRtpItem == null) {
return;
}
@@ -878,7 +875,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
mediaTransmissionTCP ? (tcpActive ? "TCP主动" : "TCP被动") : "UDP", sdp.getSessionName().getValue());
CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
SendRtpInfo sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), requesterId,
SendRtpInfo sendRtpItem = sendRtpServerService.createSendRtpInfo(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), requesterId,
device.getDeviceId(), deviceChannel.getId(),
mediaTransmissionTCP, false);
@@ -894,7 +891,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
return;
}
sendRtpItem.setPlayType(InviteStreamType.BROADCAST);
sendRtpItem.setCallId(callIdHeader.getCallId());
sendRtpItem.setPlatformId(requesterId);
@@ -910,7 +906,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
sendRtpItem.setTcpActive(tcpActive);
}
redisCatchStorage.updateSendRTPSever(sendRtpItem);
sendRtpServerService.update(sendRtpItem);
Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, broadcastCatch.getApp(), broadcastCatch.getStream());
if (streamReady) {
@@ -944,7 +940,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
SIPResponse sipResponse = null;
try {
sendRtpItem.setStatus(2);
redisCatchStorage.updateSendRTPSever(sendRtpItem);
sendRtpServerService.update(sendRtpItem);
StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n");
content.append("o=" + config.getId() + " " + sdp.getOrigin().getSessionId() + " " + sdp.getOrigin().getSessionVersion() + " IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");

View File

@@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.extern.slf4j.Slf4j;
@@ -64,6 +65,9 @@ public class InfoRequestProcessor extends SIPRequestProcessorParent implements I
@Autowired
private SipInviteSessionManager sessionManager;
@Autowired
private ISendRtpServerService sendRtpServerService;
@Override
public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅
@@ -108,7 +112,7 @@ public class InfoRequestProcessor extends SIPRequestProcessorParent implements I
String contentType = header.getContentType();
String contentSubType = header.getContentSubType();
if ("Application".equalsIgnoreCase(contentType) && "MANSRTSP".equalsIgnoreCase(contentSubType)) {
SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callIdHeader.getCallId());
String streamId = sendRtpItem.getStream();
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId);
if (null == inviteInfo) {

View File

@@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessag
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import gov.nist.javax.sip.message.SIPRequest;
@@ -67,6 +68,9 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private ISendRtpServerService sendRtpServerService;
@Override
public void afterPropertiesSet() throws Exception {
notifyMessageHandler.addHandler(cmdType, this);
@@ -147,7 +151,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
broadcastCatch.setMediaServerItem(hookData.getMediaServer());
audioBroadcastManager.update(broadcastCatch);
// 推流到设备
SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(null, targetId, hookData.getStream(), null);
SendRtpInfo sendRtpItem = sendRtpServerService.queryByStream(null, targetId, hookData.getStream(), null);
if (sendRtpItem == null) {
log.warn("[国标级联] 语音喊话 异常,未找到发流信息, channelId: {}, stream: {}", targetId, hookData.getStream());
log.info("[国标级联] 语音喊话 重新开始channelId: {}, stream: {}", targetId, hookData.getStream());

View File

@@ -16,6 +16,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.extern.slf4j.Slf4j;
@@ -69,6 +70,9 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
@Autowired
private IDeviceChannelService deviceChannelService;
@Autowired
private ISendRtpServerService sendRtpServerService;
@Override
public void afterPropertiesSet() throws Exception {
notifyMessageHandler.addHandler(cmdType, this);
@@ -110,7 +114,7 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcTransaction.getStream(), ssrcTransaction.getMediaServerId());
subscribe.removeSubscribe(hook);
// 如果级联播放,需要给上级发送此通知 TODO 多个上级同时观看一个下级 可能存在停错的问题需要将点播CallId进行上下级绑定
SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(null, ssrcTransaction.getChannelId(), null, null);
SendRtpInfo sendRtpItem = sendRtpServerService.queryByChannelId(ssrcTransaction.getChannelId());
if (sendRtpItem != null) {
Platform parentPlatform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
if (parentPlatform == null) {