Merge branch 'wvp-28181-2.0' into main-dev
# Conflicts: # pom.xml # src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java # src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java # src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java # src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java # src/main/java/com/genersoft/iot/vmp/media/zlm/AssistRESTfulUtils.java # src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java # src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java # src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java # src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java # src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java # src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java # src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java # src/main/resources/all-application.yml # src/main/resources/application-dev.yml
This commit is contained in:
@@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||
import com.genersoft.iot.vmp.service.IMediaServerService;
|
||||
import com.genersoft.iot.vmp.service.bean.*;
|
||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
|
||||
import org.slf4j.Logger;
|
||||
@@ -77,6 +78,9 @@ public class RedisGbPlayMsgListener implements MessageListener {
|
||||
@Autowired
|
||||
private IMediaServerService mediaServerService;
|
||||
|
||||
@Autowired
|
||||
private IRedisCatchStorage redisCatchStorage;
|
||||
|
||||
|
||||
@Autowired
|
||||
private DynamicTask dynamicTask;
|
||||
@@ -113,8 +117,8 @@ public class RedisGbPlayMsgListener implements MessageListener {
|
||||
while (!taskQueue.isEmpty()) {
|
||||
Message msg = taskQueue.poll();
|
||||
try {
|
||||
JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class);
|
||||
WvpRedisMsg wvpRedisMsg = JSON.to(WvpRedisMsg.class, msgJSON);
|
||||
WvpRedisMsg wvpRedisMsg = JSON.parseObject(msg.getBody(), WvpRedisMsg.class);
|
||||
logger.info("[收到REDIS通知] 消息: {}", JSON.toJSONString(wvpRedisMsg));
|
||||
if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) {
|
||||
continue;
|
||||
}
|
||||
@@ -123,7 +127,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
|
||||
|
||||
switch (wvpRedisMsg.getCmd()){
|
||||
case WvpRedisMsgCmd.GET_SEND_ITEM:
|
||||
RequestSendItemMsg content = JSON.to(RequestSendItemMsg.class, wvpRedisMsg.getContent());
|
||||
RequestSendItemMsg content = JSON.parseObject(wvpRedisMsg.getContent(), RequestSendItemMsg.class);
|
||||
requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
|
||||
break;
|
||||
case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
|
||||
@@ -242,7 +246,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
|
||||
result.setData(content);
|
||||
|
||||
WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId,
|
||||
WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, result);
|
||||
WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, JSON.toJSONString(result));
|
||||
JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
|
||||
redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
|
||||
}
|
||||
@@ -260,7 +264,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
|
||||
result.setMsg("流媒体不存在");
|
||||
|
||||
WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId,
|
||||
WvpRedisMsgCmd.GET_SEND_ITEM, serial, result);
|
||||
WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result));
|
||||
|
||||
JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
|
||||
redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
|
||||
@@ -283,7 +287,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
|
||||
WVPResult<SendRtpItem> result = new WVPResult<>();
|
||||
result.setCode(ERROR_CODE_TIMEOUT);
|
||||
WvpRedisMsg response = WvpRedisMsg.getResponseInstance(
|
||||
userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result
|
||||
userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result)
|
||||
);
|
||||
JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
|
||||
redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
|
||||
@@ -322,9 +326,10 @@ public class RedisGbPlayMsgListener implements MessageListener {
|
||||
responseSendItemMsg.setSendRtpItem(sendRtpItem);
|
||||
responseSendItemMsg.setMediaServerItem(mediaServerItem);
|
||||
result.setData(responseSendItemMsg);
|
||||
redisCatchStorage.updateSendRTPSever(sendRtpItem);
|
||||
|
||||
WvpRedisMsg response = WvpRedisMsg.getResponseInstance(
|
||||
userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result
|
||||
userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result)
|
||||
);
|
||||
JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
|
||||
redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
|
||||
@@ -350,7 +355,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
|
||||
requestSendItemMsg.setServerId(serverId);
|
||||
String key = UUID.randomUUID().toString();
|
||||
WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.GET_SEND_ITEM,
|
||||
key, requestSendItemMsg);
|
||||
key, JSON.toJSONString(requestSendItemMsg));
|
||||
|
||||
JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
|
||||
logger.info("[请求推流SendItem] {}: {}", serverId, jsonObject);
|
||||
@@ -375,7 +380,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
|
||||
public void sendMsgForStartSendRtpStream(String serverId, RequestPushStreamMsg param, PlayMsgCallbackForStartSendRtpStream callback) {
|
||||
String key = UUID.randomUUID().toString();
|
||||
WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId,
|
||||
WvpRedisMsgCmd.REQUEST_PUSH_STREAM, key, param);
|
||||
WvpRedisMsgCmd.REQUEST_PUSH_STREAM, key, JSON.toJSONString(param));
|
||||
|
||||
JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
|
||||
logger.info("[REDIS 请求其他平台推流] {}: {}", serverId, jsonObject);
|
||||
|
||||
@@ -50,11 +50,12 @@ public class RedisGpsMsgListener implements MessageListener {
|
||||
Message msg = taskQueue.poll();
|
||||
try {
|
||||
GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class);
|
||||
logger.info("[REDIS的位置变化通知], {}", JSON.toJSONString(gpsMsgInfo));
|
||||
// 只是放入redis缓存起来
|
||||
redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
|
||||
}catch (Exception e) {
|
||||
logger.warn("[REDIS的ALARM通知] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
|
||||
logger.error("[REDIS的ALARM通知] 异常内容: ", e);
|
||||
logger.warn("[REDIS的位置变化通知] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
|
||||
logger.error("[REDIS的位置变化通知] 异常内容: ", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -73,12 +73,20 @@ public class RedisPushStreamCloseResponseListener implements MessageListener {
|
||||
MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class);
|
||||
StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream());
|
||||
if (push != null) {
|
||||
if (redisCatchStorage.isChannelSendingRTP(push.getGbId())) {
|
||||
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
|
||||
push.getGbId());
|
||||
if (sendRtpItems.size() > 0) {
|
||||
for (SendRtpItem sendRtpItem : sendRtpItems) {
|
||||
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
|
||||
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
|
||||
push.getGbId());
|
||||
if (!sendRtpItems.isEmpty()) {
|
||||
for (SendRtpItem sendRtpItem : sendRtpItems) {
|
||||
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
|
||||
if (parentPlatform != null) {
|
||||
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId());
|
||||
try {
|
||||
commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem);
|
||||
} catch (SipException | InvalidArgumentException | ParseException e) {
|
||||
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
if (push.isSelf()) {
|
||||
// 停止向上级推流
|
||||
String streamId = sendRtpItem.getStream();
|
||||
Map<String, Object> param = new HashMap<>();
|
||||
@@ -90,12 +98,6 @@ public class RedisPushStreamCloseResponseListener implements MessageListener {
|
||||
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
|
||||
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream());
|
||||
zlmServerFactory.stopSendRtpStream(mediaInfo, param);
|
||||
|
||||
try {
|
||||
commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem);
|
||||
} catch (SipException | InvalidArgumentException | ParseException e) {
|
||||
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
|
||||
}
|
||||
if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
|
||||
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
|
||||
sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
|
||||
|
||||
Reference in New Issue
Block a user