Merge branch '2.7.3'
# Conflicts: # src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
This commit is contained in:
@@ -98,7 +98,7 @@ public class SubscribeHolder {
|
||||
for (Platform platform : platformList) {
|
||||
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "catalog", platform.getServerGBId());
|
||||
if (redisTemplate.hasKey(key)) {
|
||||
result.add(platform.getServerId());
|
||||
result.add(platform.getServerGBId());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
@@ -112,7 +112,7 @@ public class SubscribeHolder {
|
||||
for (Platform platform : platformList) {
|
||||
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "mobilePosition", platform.getServerGBId());
|
||||
if (redisTemplate.hasKey(key)) {
|
||||
result.add(platform.getServerId());
|
||||
result.add(platform.getServerGBId());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
|
||||
@@ -89,8 +89,8 @@ public interface PlatformMapper {
|
||||
@Select("SELECT * FROM wvp_platform WHERE id=#{id}")
|
||||
Platform query(int id);
|
||||
|
||||
@Update("UPDATE wvp_platform SET status=#{online} WHERE id=#{id}" )
|
||||
int updateStatus(@Param("id") int id, @Param("online") boolean online);
|
||||
@Update("UPDATE wvp_platform SET status=#{online}, server_id = #{serverId} WHERE id=#{id}" )
|
||||
int updateStatus(@Param("id") int id, @Param("online") boolean online, @Param("serverId") String serverId);
|
||||
|
||||
@Select("SELECT server_id FROM wvp_platform WHERE enable=true and server_id != #{serverId} group by server_id")
|
||||
List<String> queryServerIdsWithEnableAndNotInServer(@Param("serverId") String serverId);
|
||||
@@ -104,7 +104,7 @@ public interface PlatformMapper {
|
||||
@Select("SELECT * FROM wvp_platform WHERE enable=true and server_id = #{serverId}")
|
||||
List<Platform> queryServerIdsWithEnableAndServer(@Param("serverId") String serverId);
|
||||
|
||||
@Update("UPDATE wvp_platform SET status=false" )
|
||||
void offlineAll();
|
||||
@Update("UPDATE wvp_platform SET status=false where server_id = #{serverId}" )
|
||||
void offlineAll(@Param("serverId") String serverId);
|
||||
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ import com.genersoft.iot.vmp.media.bean.MediaServer;
|
||||
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOfflineEvent;
|
||||
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOnlineEvent;
|
||||
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.stereotype.Component;
|
||||
@@ -21,11 +22,12 @@ import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
/**
|
||||
* @description:Event事件通知推送器,支持推送在线事件、离线事件
|
||||
* @author: swwheihei
|
||||
* @date: 2020年5月6日 上午11:30:50
|
||||
* @date: 2020年5月6日 上午11:30:50
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class EventPublisher {
|
||||
|
||||
@@ -72,12 +74,7 @@ public class EventPublisher {
|
||||
}
|
||||
public void catalogEventPublish(Platform platform, List<CommonGBChannel> deviceChannels, String type, boolean share) {
|
||||
if (platform != null && !userSetting.getServerId().equals(platform.getServerId())) {
|
||||
// 指定了上级平台的推送,则发送到指定的设备,未指定的则全部发送, 接收后各自处理自己的
|
||||
CatalogEvent outEvent = new CatalogEvent(this);
|
||||
outEvent.setChannels(deviceChannels);
|
||||
outEvent.setType(type);
|
||||
outEvent.setPlatform(platform);
|
||||
redisRpcService.catalogEventPublish(platform.getServerId(), outEvent);
|
||||
log.info("[国标级联] 目录状态推送, 此上级平台由其他服务处理,消息已经忽略");
|
||||
return;
|
||||
}
|
||||
CatalogEvent outEvent = new CatalogEvent(this);
|
||||
@@ -96,12 +93,11 @@ public class EventPublisher {
|
||||
}
|
||||
outEvent.setChannels(channels);
|
||||
outEvent.setType(type);
|
||||
outEvent.setPlatform(platform);
|
||||
applicationEventPublisher.publishEvent(outEvent);
|
||||
if (platform == null && share) {
|
||||
// 如果没指定上级平台,则推送消息到所有在线的wvp处理自己含有的平台的目录更新
|
||||
redisRpcService.catalogEventPublish(null, outEvent);
|
||||
if (platform != null) {
|
||||
outEvent.setPlatform(platform);
|
||||
}
|
||||
applicationEventPublisher.publishEvent(outEvent);
|
||||
|
||||
}
|
||||
|
||||
public void mobilePositionEventPublish(MobilePosition mobilePosition) {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.genersoft.iot.vmp.gb28181.event.subscribe.catalog;
|
||||
|
||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.Platform;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
|
||||
@@ -39,22 +40,30 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
|
||||
@Autowired
|
||||
private SubscribeHolder subscribeHolder;
|
||||
|
||||
@Autowired
|
||||
private UserSetting userSetting;
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(CatalogEvent event) {
|
||||
SubscribeInfo subscribe = null;
|
||||
Platform parentPlatform = null;
|
||||
|
||||
Map<String, List<Platform>> parentPlatformMap = new HashMap<>();
|
||||
log.info("[Catalog事件: {}] 通道数量: {}", event.getType(), event.getChannels().size());
|
||||
Map<String, List<Platform>> platformMap = new HashMap<>();
|
||||
Map<String, CommonGBChannel> channelMap = new HashMap<>();
|
||||
if (event.getPlatform() != null) {
|
||||
parentPlatform = event.getPlatform();
|
||||
if (parentPlatform.getServerGBId() == null) {
|
||||
log.info("[Catalog事件: {}] 平台服务国标编码未找到", event.getType());
|
||||
return;
|
||||
}
|
||||
subscribe = subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId());
|
||||
if (subscribe == null) {
|
||||
log.info("[Catalog事件: {}] 未订阅目录事件", event.getType());
|
||||
return;
|
||||
}
|
||||
|
||||
}else {
|
||||
List<Platform> allPlatform = platformService.queryAll();
|
||||
List<Platform> allPlatform = platformService.queryAll(userSetting.getServerId());
|
||||
// 获取所用订阅
|
||||
List<String> platforms = subscribeHolder.getAllCatalogSubscribePlatform(allPlatform);
|
||||
if (event.getChannels() != null) {
|
||||
@@ -62,10 +71,14 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
|
||||
for (CommonGBChannel deviceChannel : event.getChannels()) {
|
||||
List<Platform> parentPlatformsForGB = platformChannelService.queryPlatFormListByChannelDeviceId(
|
||||
deviceChannel.getGbId(), platforms);
|
||||
parentPlatformMap.put(deviceChannel.getGbDeviceId(), parentPlatformsForGB);
|
||||
platformMap.put(deviceChannel.getGbDeviceId(), parentPlatformsForGB);
|
||||
channelMap.put(deviceChannel.getGbDeviceId(), deviceChannel);
|
||||
}
|
||||
}else {
|
||||
log.info("[Catalog事件: {}] 未订阅目录事件", event.getType());
|
||||
}
|
||||
}else {
|
||||
log.info("[Catalog事件: {}] 事件内通道数为0", event.getType());
|
||||
}
|
||||
}
|
||||
switch (event.getType()) {
|
||||
@@ -74,32 +87,32 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
|
||||
case CatalogEvent.DEL:
|
||||
|
||||
if (parentPlatform != null) {
|
||||
List<CommonGBChannel> deviceChannelList = new ArrayList<>();
|
||||
List<CommonGBChannel> channels = new ArrayList<>();
|
||||
if (event.getChannels() != null) {
|
||||
deviceChannelList.addAll(event.getChannels());
|
||||
channels.addAll(event.getChannels());
|
||||
}
|
||||
if (!deviceChannelList.isEmpty()) {
|
||||
log.info("[Catalog事件: {}]平台:{},影响通道{}个", event.getType(), parentPlatform.getServerGBId(), deviceChannelList.size());
|
||||
if (!channels.isEmpty()) {
|
||||
log.info("[Catalog事件: {}]平台:{},影响通道{}个", event.getType(), parentPlatform.getServerGBId(), channels.size());
|
||||
try {
|
||||
sipCommanderFroPlatform.sendNotifyForCatalogOther(event.getType(), parentPlatform, deviceChannelList, subscribe, null);
|
||||
sipCommanderFroPlatform.sendNotifyForCatalogOther(event.getType(), parentPlatform, channels, subscribe, null);
|
||||
} catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException |
|
||||
IllegalAccessException e) {
|
||||
log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
}else if (!parentPlatformMap.keySet().isEmpty()) {
|
||||
for (String gbId : parentPlatformMap.keySet()) {
|
||||
List<Platform> parentPlatforms = parentPlatformMap.get(gbId);
|
||||
if (parentPlatforms != null && !parentPlatforms.isEmpty()) {
|
||||
for (Platform platform : parentPlatforms) {
|
||||
}else if (!platformMap.keySet().isEmpty()) {
|
||||
for (String serverGbId : platformMap.keySet()) {
|
||||
List<Platform> platformList = platformMap.get(serverGbId);
|
||||
if (platformList != null && !platformList.isEmpty()) {
|
||||
for (Platform platform : platformList) {
|
||||
SubscribeInfo subscribeInfo = subscribeHolder.getCatalogSubscribe(platform.getServerGBId());
|
||||
if (subscribeInfo == null) {
|
||||
continue;
|
||||
}
|
||||
log.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId);
|
||||
log.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), serverGbId);
|
||||
List<CommonGBChannel> deviceChannelList = new ArrayList<>();
|
||||
CommonGBChannel deviceChannel = new CommonGBChannel();
|
||||
deviceChannel.setGbDeviceId(gbId);
|
||||
deviceChannel.setGbDeviceId(serverGbId);
|
||||
deviceChannelList.add(deviceChannel);
|
||||
try {
|
||||
sipCommanderFroPlatform.sendNotifyForCatalogOther(event.getType(), platform, deviceChannelList, subscribeInfo, null);
|
||||
@@ -108,6 +121,8 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
|
||||
log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
}else {
|
||||
log.info("[Catalog事件: {}] 未找到上级平台: {}", event.getType(), serverGbId);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -132,9 +147,9 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
|
||||
log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
}else if (!parentPlatformMap.keySet().isEmpty()) {
|
||||
for (String gbId : parentPlatformMap.keySet()) {
|
||||
List<Platform> parentPlatforms = parentPlatformMap.get(gbId);
|
||||
}else if (!platformMap.keySet().isEmpty()) {
|
||||
for (String gbId : platformMap.keySet()) {
|
||||
List<Platform> parentPlatforms = platformMap.get(gbId);
|
||||
if (parentPlatforms != null && !parentPlatforms.isEmpty()) {
|
||||
for (Platform platform : parentPlatforms) {
|
||||
SubscribeInfo subscribeInfo = subscribeHolder.getCatalogSubscribe(platform.getServerGBId());
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition;
|
||||
|
||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.Platform;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
|
||||
@@ -37,12 +38,15 @@ public class MobilePositionEventLister implements ApplicationListener<MobilePosi
|
||||
@Autowired
|
||||
private SubscribeHolder subscribeHolder;
|
||||
|
||||
@Autowired
|
||||
private UserSetting userSetting;
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(MobilePositionEvent event) {
|
||||
if (event.getMobilePosition().getChannelId() == 0) {
|
||||
return;
|
||||
}
|
||||
List<Platform> allPlatforms = platformService.queryAll();
|
||||
List<Platform> allPlatforms = platformService.queryAll(userSetting.getServerId());
|
||||
// 获取所用订阅
|
||||
List<String> platforms = subscribeHolder.getAllMobilePositionSubscribePlatform(allPlatforms);
|
||||
if (platforms.isEmpty()) {
|
||||
|
||||
@@ -80,6 +80,6 @@ public interface IPlatformService {
|
||||
|
||||
void delete(Integer platformId, CommonCallback<Object> callback);
|
||||
|
||||
List<Platform> queryAll();
|
||||
List<Platform> queryAll(String serverId);
|
||||
|
||||
}
|
||||
|
||||
@@ -192,6 +192,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
||||
}
|
||||
sync(device);
|
||||
}else {
|
||||
device.setServerId(userSetting.getServerId());
|
||||
if(!device.isOnLine()){
|
||||
device.setOnLine(true);
|
||||
device.setCreateTime(now);
|
||||
@@ -307,15 +308,15 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
||||
return;
|
||||
}
|
||||
for (Device device : deviceList) {
|
||||
if (device == null || !device.isOnLine()) {
|
||||
if (device == null || !device.isOnLine() || !device.getServerId().equals(userSetting.getServerId())) {
|
||||
continue;
|
||||
}
|
||||
if (device.getSubscribeCycleForCatalog() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForCatalog.getKey(device))) {
|
||||
log.info("[订阅丢失] 目录订阅, 编号: {}, 重新发起订阅", device.getDeviceId());
|
||||
log.debug("[订阅丢失] 目录订阅, 编号: {}, 重新发起订阅", device.getDeviceId());
|
||||
addCatalogSubscribe(device, null);
|
||||
}
|
||||
if (device.getSubscribeCycleForMobilePosition() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
|
||||
log.info("[订阅丢失] 移动位置订阅, 编号: {}, 重新发起订阅", device.getDeviceId());
|
||||
log.debug("[订阅丢失] 移动位置订阅, 编号: {}, 重新发起订阅", device.getDeviceId());
|
||||
addMobilePositionSubscribe(device, null);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -160,30 +160,26 @@ public class GbChannelServiceImpl implements IGbChannelService {
|
||||
log.warn("[多个通道离线] 通道数量为0,更新失败");
|
||||
return 0;
|
||||
}
|
||||
List<CommonGBChannel> onlineChannelList = commonGBChannelMapper.queryInListByStatus(commonGBChannelList, "ON");
|
||||
if (onlineChannelList.isEmpty()) {
|
||||
log.info("[多个通道离线] 更新失败, 参数内通道已经离线, 无需更新");
|
||||
return 0;
|
||||
}
|
||||
log.info("[通道离线] 共 {} 个", commonGBChannelList.size());
|
||||
int limitCount = 1000;
|
||||
int result = 0;
|
||||
if (onlineChannelList.size() > limitCount) {
|
||||
for (int i = 0; i < onlineChannelList.size(); i += limitCount) {
|
||||
if (commonGBChannelList.size() > limitCount) {
|
||||
for (int i = 0; i < commonGBChannelList.size(); i += limitCount) {
|
||||
int toIndex = i + limitCount;
|
||||
if (i + limitCount > onlineChannelList.size()) {
|
||||
toIndex = onlineChannelList.size();
|
||||
if (i + limitCount > commonGBChannelList.size()) {
|
||||
toIndex = commonGBChannelList.size();
|
||||
}
|
||||
result += commonGBChannelMapper.updateStatusForListById(onlineChannelList.subList(i, toIndex), "OFF");
|
||||
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList.subList(i, toIndex), "OFF");
|
||||
}
|
||||
} else {
|
||||
result += commonGBChannelMapper.updateStatusForListById(onlineChannelList, "OFF");
|
||||
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList, "OFF");
|
||||
}
|
||||
if (result > 0) {
|
||||
try {
|
||||
// 发送catalog
|
||||
eventPublisher.catalogEventPublish(null, onlineChannelList, CatalogEvent.OFF);
|
||||
eventPublisher.catalogEventPublish(null, commonGBChannelList, CatalogEvent.OFF);
|
||||
} catch (Exception e) {
|
||||
log.warn("[多个通道离线] 发送失败,数量:{}", onlineChannelList.size(), e);
|
||||
log.warn("[多个通道离线] 发送失败,数量:{}", commonGBChannelList.size(), e);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
@@ -214,32 +210,25 @@ public class GbChannelServiceImpl implements IGbChannelService {
|
||||
log.warn("[多个通道上线] 通道数量为0,更新失败");
|
||||
return 0;
|
||||
}
|
||||
List<CommonGBChannel> offlineChannelList = commonGBChannelMapper.queryInListByStatus(commonGBChannelList, "OFF");
|
||||
if (offlineChannelList.isEmpty()) {
|
||||
log.warn("[多个通道上线] 更新失败, 参数内通道已经上线");
|
||||
return 0;
|
||||
}
|
||||
// 批量更新
|
||||
int limitCount = 1000;
|
||||
int result = 0;
|
||||
if (offlineChannelList.size() > limitCount) {
|
||||
for (int i = 0; i < offlineChannelList.size(); i += limitCount) {
|
||||
if (commonGBChannelList.size() > limitCount) {
|
||||
for (int i = 0; i < commonGBChannelList.size(); i += limitCount) {
|
||||
int toIndex = i + limitCount;
|
||||
if (i + limitCount > offlineChannelList.size()) {
|
||||
toIndex = offlineChannelList.size();
|
||||
if (i + limitCount > commonGBChannelList.size()) {
|
||||
toIndex = commonGBChannelList.size();
|
||||
}
|
||||
result += commonGBChannelMapper.updateStatusForListById(offlineChannelList.subList(i, toIndex), "ON");
|
||||
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList.subList(i, toIndex), "ON");
|
||||
}
|
||||
} else {
|
||||
result += commonGBChannelMapper.updateStatusForListById(offlineChannelList, "ON");
|
||||
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList, "ON");
|
||||
}
|
||||
if (result > 0) {
|
||||
try {
|
||||
// 发送catalog
|
||||
eventPublisher.catalogEventPublish(null, offlineChannelList, CatalogEvent.ON);
|
||||
} catch (Exception e) {
|
||||
log.warn("[多个通道上线] 发送失败,数量:{}", offlineChannelList.size(), e);
|
||||
}
|
||||
try {
|
||||
// 发送catalog
|
||||
eventPublisher.catalogEventPublish(null, commonGBChannelList, CatalogEvent.ON);
|
||||
} catch (Exception e) {
|
||||
log.warn("[多个通道上线] 发送失败,数量:{}", commonGBChannelList.size(), e);
|
||||
}
|
||||
|
||||
return result;
|
||||
|
||||
@@ -63,11 +63,6 @@ import java.util.concurrent.TimeUnit;
|
||||
@Order(value=15)
|
||||
public class PlatformServiceImpl implements IPlatformService, CommandLineRunner {
|
||||
|
||||
private final static String REGISTER_KEY_PREFIX = "platform_register_";
|
||||
|
||||
private final static String REGISTER_FAIL_AGAIN_KEY_PREFIX = "platform_register_fail_again_";
|
||||
private final static String KEEPALIVE_KEY_PREFIX = "platform_keepalive_";
|
||||
|
||||
@Autowired
|
||||
private PlatformMapper platformMapper;
|
||||
|
||||
@@ -133,7 +128,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
||||
sendUnRegister(platform, taskInfo.getSipTransactionInfo());
|
||||
}
|
||||
// 启动时所有平台默认离线
|
||||
platformMapper.offlineAll();
|
||||
platformMapper.offlineAll(userSetting.getServerId());
|
||||
}
|
||||
@Scheduled(fixedDelay = 20, timeUnit = TimeUnit.SECONDS) //每3秒执行一次
|
||||
public void statusLostCheck(){
|
||||
@@ -199,6 +194,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
||||
return;
|
||||
}
|
||||
log.info("[集群] 检测到 {} 已离线", serverId);
|
||||
redisCatchStorage.removeOfflineWVPInfo(serverId);
|
||||
String chooseServerId = redisCatchStorage.chooseOneServer(serverId);
|
||||
if (!userSetting.getServerId().equals(chooseServerId)){
|
||||
return;
|
||||
@@ -390,8 +386,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
||||
PlatformKeepaliveTask keepaliveTask = new PlatformKeepaliveTask(platform.getServerGBId(), platform.getKeepTimeout() * 1000L,
|
||||
this::keepaliveExpire);
|
||||
statusTaskRunner.addKeepAliveTask(keepaliveTask);
|
||||
|
||||
platformMapper.updateStatus(platform.getId(), true);
|
||||
platformMapper.updateStatus(platform.getId(), true, userSetting.getServerId());
|
||||
|
||||
if (platform.getAutoPushChannel() != null && platform.getAutoPushChannel()) {
|
||||
if (subscribeHolder.getCatalogSubscribe(platform.getServerGBId()) == null) {
|
||||
@@ -481,7 +476,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
||||
subscribeHolder.removeCatalogSubscribe(platform.getServerGBId());
|
||||
subscribeHolder.removeMobilePositionSubscribe(platform.getServerGBId());
|
||||
|
||||
platformMapper.updateStatus(platform.getId(), false);
|
||||
platformMapper.updateStatus(platform.getId(), false, userSetting.getServerId());
|
||||
|
||||
// 停止所有推流
|
||||
log.info("[平台离线] {}({}), 停止所有推流", platform.getName(), platform.getServerGBId());
|
||||
@@ -521,7 +516,6 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
||||
gpsMsgInfo = null;
|
||||
}
|
||||
|
||||
|
||||
if (gpsMsgInfo == null && !userSetting.isSendPositionOnDemand()){
|
||||
gpsMsgInfo = new GPSMsgInfo();
|
||||
gpsMsgInfo.setId(channel.getGbDeviceId());
|
||||
@@ -870,7 +864,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Platform> queryAll() {
|
||||
return platformMapper.queryAll();
|
||||
public List<Platform> queryAll(String serverId) {
|
||||
return platformMapper.queryByServerId(serverId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -340,7 +340,7 @@ public class PlayServiceImpl implements IPlayService {
|
||||
InviteInfo inviteInfoInCatch = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId());
|
||||
if (inviteInfoInCatch != null ) {
|
||||
if (inviteInfoInCatch.getStreamInfo() == null) {
|
||||
// 释放生成的ssrc,使用上一次申请的322
|
||||
// 释放生成的ssrc,使用上一次申请的
|
||||
|
||||
ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc);
|
||||
// 点播发起了但是尚未成功, 仅注册回调等待结果即可
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.session;
|
||||
|
||||
import com.genersoft.iot.vmp.conf.SipConfig;
|
||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
@@ -13,6 +14,7 @@ import java.util.Set;
|
||||
/**
|
||||
* ssrc使用
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class SSRCFactory {
|
||||
|
||||
@@ -93,6 +95,7 @@ public class SSRCFactory {
|
||||
String redisKey = SSRC_INFO_KEY + userSetting.getServerId() + "_" + mediaServerId;
|
||||
Long size = redisTemplate.opsForSet().size(redisKey);
|
||||
if (size == null || size == 0) {
|
||||
log.info("[获取 SSRC 失败] redisKey: {}", redisKey);
|
||||
throw new RuntimeException("ssrc已经用完");
|
||||
} else {
|
||||
// 在集合中移除并返回一个随机成员。
|
||||
|
||||
@@ -143,39 +143,44 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
|
||||
mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
|
||||
}
|
||||
}
|
||||
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
|
||||
if (mediaServer != null) {
|
||||
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getChannelId());
|
||||
if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) {
|
||||
// 来自上级平台的停止对讲
|
||||
log.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getTargetId(), sendRtpItem.getChannelId());
|
||||
audioBroadcastManager.del(sendRtpItem.getChannelId());
|
||||
}
|
||||
if (sendRtpItem.getServerId().equals(userSetting.getServerId())) {
|
||||
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
|
||||
if (mediaServer != null) {
|
||||
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getChannelId());
|
||||
if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) {
|
||||
// 来自上级平台的停止对讲
|
||||
log.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getTargetId(), sendRtpItem.getChannelId());
|
||||
audioBroadcastManager.del(sendRtpItem.getChannelId());
|
||||
}
|
||||
|
||||
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, sendRtpItem.getApp(), streamId);
|
||||
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, sendRtpItem.getApp(), streamId);
|
||||
|
||||
if (mediaInfo.getReaderCount() <= 0) {
|
||||
log.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
|
||||
if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
|
||||
Device device = deviceService.getDeviceByDeviceId(sendRtpItem.getTargetId());
|
||||
if (device == null) {
|
||||
log.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId);
|
||||
return;
|
||||
}
|
||||
DeviceChannel deviceChannel = deviceChannelService.getOneForSourceById(sendRtpItem.getChannelId());
|
||||
if (deviceChannel == null) {
|
||||
log.info("[收到bye] {} 通知设备停止推流时未找到通道信息", streamId);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
log.info("[停止点播] {}/{}", sendRtpItem.getTargetId(), sendRtpItem.getChannelId());
|
||||
cmder.streamByeCmd(device, deviceChannel.getDeviceId(), sendRtpItem.getApp(), sendRtpItem.getStream(), null, null);
|
||||
} catch (InvalidArgumentException | ParseException | SipException |
|
||||
SsrcTransactionNotFoundException e) {
|
||||
log.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage());
|
||||
if (mediaInfo != null && mediaInfo.getReaderCount() <= 0) {
|
||||
log.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
|
||||
if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
|
||||
Device device = deviceService.getDeviceByDeviceId(sendRtpItem.getTargetId());
|
||||
if (device == null) {
|
||||
log.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId);
|
||||
return;
|
||||
}
|
||||
DeviceChannel deviceChannel = deviceChannelService.getOneForSourceById(sendRtpItem.getChannelId());
|
||||
if (deviceChannel == null) {
|
||||
log.info("[收到bye] {} 通知设备停止推流时未找到通道信息", streamId);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
log.info("[停止点播] {}/{}", sendRtpItem.getTargetId(), sendRtpItem.getChannelId());
|
||||
cmder.streamByeCmd(device, deviceChannel.getDeviceId(), sendRtpItem.getApp(), sendRtpItem.getStream(), null, null);
|
||||
} catch (InvalidArgumentException | ParseException | SipException |
|
||||
SsrcTransactionNotFoundException e) {
|
||||
log.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// TODO 流再其他wvp上时应该通知这个wvp停止推流和发送BYE
|
||||
|
||||
}
|
||||
}
|
||||
// 可能是设备发送的停止
|
||||
|
||||
@@ -172,10 +172,13 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
||||
// 点播成功, TODO 可以在此处检测cancel命令是否存在,存在则不发送
|
||||
if (userSetting.getUseCustomSsrcForParentInvite()) {
|
||||
// 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
|
||||
String ssrc = "Play".equalsIgnoreCase(inviteInfo.getSessionName())
|
||||
MediaServer mediaServer = mediaServerService.getOne(streamInfo.getMediaServer().getId());
|
||||
if (mediaServer != null) {
|
||||
String ssrc = "Play".equalsIgnoreCase(inviteInfo.getSessionName())
|
||||
? ssrcFactory.getPlaySsrc(streamInfo.getMediaServer().getId())
|
||||
: ssrcFactory.getPlayBackSsrc(streamInfo.getMediaServer().getId());
|
||||
inviteInfo.setSsrc(ssrc);
|
||||
: ssrcFactory.getPlayBackSsrc(streamInfo.getMediaServer().getId());
|
||||
inviteInfo.setSsrc(ssrc);
|
||||
}
|
||||
}
|
||||
// 构建sendRTP内容
|
||||
SendRtpInfo sendRtpItem = sendRtpServerService.createSendRtpInfo(streamInfo.getMediaServer(),
|
||||
|
||||
Reference in New Issue
Block a user