临时提交

This commit is contained in:
648540858
2024-09-03 18:00:35 +08:00
parent de0ad2b32e
commit fa47f619ba
40 changed files with 472 additions and 720 deletions

View File

@@ -93,6 +93,8 @@ public interface IDeviceChannelService {
void stopPlay(String deviceId, String channelId);
void stopPlay(Integer channelId);
void batchUpdateChannelGPS(List<DeviceChannel> channelList);
void batchAddMobilePosition(List<MobilePosition> addMobilePositionList);
@@ -118,4 +120,5 @@ public interface IDeviceChannelService {
DeviceChannel getRawChannel(int id);
DeviceChannel getOneById(Integer channelId);
}

View File

@@ -5,6 +5,8 @@ import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import java.util.List;
/**
* 记录国标点播的状态,包括实时预览,下载,录像回放
*/
@@ -22,18 +24,12 @@ public interface IInviteStreamService {
/**
* 获取点播的状态信息
*/
InviteInfo getInviteInfo(InviteSessionType type,
String deviceId,
String channelId,
String stream);
InviteInfo getInviteInfo(InviteSessionType type, Integer channelId, String stream);
/**
* 移除点播的状态信息
*/
void removeInviteInfo(InviteSessionType type,
String deviceId,
String channelId,
String stream);
void removeInviteInfo(InviteSessionType type, Integer channelId, String stream);
/**
* 移除点播的状态信息
*/
@@ -41,14 +37,14 @@ public interface IInviteStreamService {
/**
* 移除点播的状态信息
*/
void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, String deviceId, String channelId);
void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, Integer channelId);
List<InviteInfo> getAllInviteInfo(InviteSessionType type, Integer channelId, String stream);
/**
* 获取点播的状态信息
*/
InviteInfo getInviteInfoByDeviceAndChannel(InviteSessionType type,
String deviceId,
String channelId);
InviteInfo getInviteInfoByDeviceAndChannel(InviteSessionType type, Integer channelId);
/**
* 获取点播的状态信息
@@ -59,12 +55,12 @@ public interface IInviteStreamService {
/**
* 添加一个invite回调
*/
void once(InviteSessionType type, String deviceId, String channelId, String stream, ErrorCallback<StreamInfo> callback);
void once(InviteSessionType type, Integer channelId, String stream, ErrorCallback<StreamInfo> callback);
/**
* 调用一个invite回调
*/
void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, StreamInfo data);
void call(InviteSessionType type, Integer channelId, String stream, int code, String msg, StreamInfo data);
/**
* 清空一个设备的所有invite信息

View File

@@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.gb28181.service;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.Group;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.PlatformChannel;
import com.github.pagehelper.PageInfo;
@@ -36,4 +37,12 @@ public interface IPlatformChannelService {
void removeChannelByDevice(Integer platformId, List<Integer> deviceIds);
void updateCustomChannel(PlatformChannel channel);
void checkGroupRemove(List<CommonGBChannel> channelList, List<Group> groups);
void checkGroupAdd(List<CommonGBChannel> channelList);
List<Platform> queryPlatFormListByChannelDeviceId(Integer channelId, List<String> platforms);
CommonGBChannel queryChannelByPlatformIdAndChannelId(Integer platformId, Integer channelId);
}

View File

@@ -75,7 +75,7 @@ public interface IPlatformService {
* @param errorEvent 信令错误事件
* @param timeoutCallback 超时事件
*/
void broadcastInvite(Platform platform, String channelId, MediaServer mediaServerItem, HookSubscribe.Event hookEvent,
void broadcastInvite(Platform platform, CommonGBChannel channelId, MediaServer mediaServerItem, HookSubscribe.Event hookEvent,
SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback) throws InvalidArgumentException, ParseException, SipException;
/**

View File

@@ -21,7 +21,7 @@ import java.text.ParseException;
*/
public interface IPlayService {
void play(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channelId,
void play(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel,
ErrorCallback<StreamInfo> callback);
SSRCInfo play(MediaServer mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback<StreamInfo> callback);
@@ -30,7 +30,6 @@ public interface IPlayService {
MediaServer getNewMediaServerItem(Device device);
void playBack(String deviceId, String channelId, String startTime, String endTime, ErrorCallback<StreamInfo> callback);
void playBack(MediaServer mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, ErrorCallback<StreamInfo> callback);
void zlmServerOffline(String mediaServerId);
void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<StreamInfo> callback);

View File

@@ -208,7 +208,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
@Override
public List<Device> getDeviceByChannelId(String channelId) {
return channelMapper.getDeviceByChannelId(channelId);
return channelMapper.getDeviceByChannelDeviceId(channelId);
}
@Override
@@ -340,7 +340,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
}
for (DeviceChannel channel : deviceChannels) {
// 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
mobilePosition.setChannelId(channel.getDeviceId());
mobilePosition.setChannelId(channel.getId());
try {
eventPublisher.mobilePositionEventPublish(mobilePosition);
}catch (Exception e) {
@@ -378,6 +378,11 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
channelMapper.stopPlay(device.getId(), channelId);
}
@Override
public void stopPlay(Integer channelId) {
channelMapper.stopPlayById(channelId);
}
@Override
@Transactional
public void batchUpdateChannelGPS(List<DeviceChannel> channelList) {
@@ -596,4 +601,9 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
public DeviceChannel getRawChannel(int id) {
return deviceMapper.getRawChannel(id);
}
@Override
public DeviceChannel getOneById(Integer channelId) {
return channelMapper.getOne(channelId);
}
}

View File

@@ -500,12 +500,15 @@ public class GbChannelServiceImpl implements IGbChannelService {
}
@Override
@Transactional
public void removeParentIdByBusinessGroup(String businessGroup) {
List<CommonGBChannel> channelList = commonGBChannelMapper.queryByBusinessGroup(businessGroup);
if (channelList.isEmpty()) {
return;
}
int result = commonGBChannelMapper.removeParentIdByChannels(channelList);
List<Group> groupList = groupMapper.queryByBusinessGroup(businessGroup);
platformChannelService.checkGroupRemove(channelList, groupList);
}
@@ -516,7 +519,7 @@ public class GbChannelServiceImpl implements IGbChannelService {
return;
}
commonGBChannelMapper.removeParentIdByChannels(channelList);
// TODO 可能需要发送通道更新通知
platformChannelService.checkGroupRemove(channelList, groupList);
}
@Override
@@ -560,18 +563,21 @@ public class GbChannelServiceImpl implements IGbChannelService {
}
@Override
@Transactional
public void addChannelToGroup(String parentId, String businessGroup, List<Integer> channelIds) {
List<CommonGBChannel> channelList = commonGBChannelMapper.queryByIds(channelIds);
if (channelList.isEmpty()) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "所有通道Id不存在");
}
int result = commonGBChannelMapper.updateGroup(parentId, businessGroup, channelList);
for (CommonGBChannel commonGBChannel : channelList) {
commonGBChannel.setGbParentId(parentId);
commonGBChannel.setGbBusinessGroupId(businessGroup);
}
// 发送通知
if (result > 0) {
for (CommonGBChannel channel : channelList) {
channel.setGbBusinessGroupId(businessGroup);
channel.setGbParentId(parentId);
}
platformChannelService.checkGroupAdd(channelList);
try {
// 发送catalog
eventPublisher.catalogEventPublish(null, channelList, CatalogEvent.UPDATE);
@@ -587,10 +593,20 @@ public class GbChannelServiceImpl implements IGbChannelService {
if (channelList.isEmpty()) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "所有通道Id不存在");
}
int result = commonGBChannelMapper.removeParentIdByChannels(channelList);
commonGBChannelMapper.removeParentIdByChannels(channelList);
Group group = groupMapper.queryOneByDeviceId(parentId, businessGroup);
if (group == null) {
platformChannelService.checkGroupRemove(channelList, null);
}else {
List<Group> groupList = new ArrayList<>();
groupList.add(group);
platformChannelService.checkGroupRemove(channelList, groupList);
}
}
@Override
@Transactional
public void addChannelToGroupByGbDevice(String parentId, String businessGroup, List<Integer> deviceIds) {
List<CommonGBChannel> channelList = commonGBChannelMapper.queryByGbDeviceIds(deviceIds);
if (channelList.isEmpty()) {
@@ -601,12 +617,14 @@ public class GbChannelServiceImpl implements IGbChannelService {
channel.setGbBusinessGroupId(businessGroup);
}
int result = commonGBChannelMapper.updateGroup(parentId, businessGroup, channelList);
for (CommonGBChannel commonGBChannel : channelList) {
commonGBChannel.setGbParentId(parentId);
commonGBChannel.setGbBusinessGroupId(businessGroup);
}
// 发送通知
if (result > 0) {
for (CommonGBChannel channel : channelList) {
channel.setGbBusinessGroupId(businessGroup);
channel.setGbParentId(parentId);
}
platformChannelService.checkGroupAdd(channelList);
try {
// 发送catalog
eventPublisher.catalogEventPublish(null, channelList, CatalogEvent.UPDATE);
@@ -623,10 +641,12 @@ public class GbChannelServiceImpl implements IGbChannelService {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "所有通道Id不存在");
}
commonGBChannelMapper.removeParentIdByChannels(channelList);
platformChannelService.checkGroupRemove(channelList, null);
}
@Override
public CommonGBChannel queryOneWithPlatform(Integer platformId, String channelDeviceId) {
// 防止共享的通道编号重复
List<CommonGBChannel> channelList = platformChannelMapper.queryOneWithPlatform(platformId, channelDeviceId);
if (!channelList.isEmpty()) {
return channelList.get(channelList.size() - 1);

View File

@@ -7,9 +7,8 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import lombok.extern.slf4j.Slf4j;
@@ -19,6 +18,7 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -44,17 +44,6 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
@Autowired
private DeviceChannelMapper deviceChannelMapper;
/**
* 流到来的处理
*/
@Async("taskExecutor")
@org.springframework.context.event.EventListener
public void onApplicationEvent(MediaArrivalEvent event) {
// if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) {
//
// }
}
/**
* 流离开的处理
*/
@@ -67,7 +56,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
removeInviteInfo(inviteInfo);
Device device = deviceMapper.getDeviceByDeviceId(inviteInfo.getDeviceId());
if (device != null) {
deviceChannelMapper.stopPlay(device.getId(), inviteInfo.getChannelId());
deviceChannelMapper.stopPlayById(inviteInfo.getChannelId());
}
}
}
@@ -87,7 +76,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
log.warn("[更新Invite信息],参数不全: {}", JSON.toJSON(inviteInfo));
return;
}
InviteInfo inviteInfoForUpdate = null;
InviteInfo inviteInfoForUpdate;
if (InviteSessionStatus.ready == inviteInfo.getStatus()) {
if (inviteInfo.getDeviceId() == null
@@ -99,8 +88,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
}
inviteInfoForUpdate = inviteInfo;
} else {
InviteInfo inviteInfoInRedis = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(),
inviteInfo.getChannelId(), inviteInfo.getStream());
InviteInfo inviteInfoInRedis = getInviteInfo(inviteInfo.getType(), inviteInfo.getChannelId(), inviteInfo.getStream());
if (inviteInfoInRedis == null) {
log.warn("[更新Invite信息]未从缓存中读取到Invite信息 deviceId: {}, channel: {}, stream: {}",
inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
@@ -144,7 +132,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
@Override
public InviteInfo updateInviteInfoForStream(InviteInfo inviteInfo, String stream) {
InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getChannelId(), inviteInfo.getStream());
if (inviteInfoInDb == null) {
return null;
}
@@ -169,10 +157,9 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
}
@Override
public InviteInfo getInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) {
public InviteInfo getInviteInfo(InviteSessionType type, Integer channelId, String stream) {
String key = VideoManagerConstants.INVITE_PREFIX +
":" + (type != null ? type : "*") +
":" + (deviceId != null ? deviceId : "*") +
":" + (channelId != null ? channelId : "*") +
":" + (stream != null ? stream : "*")
+ ":*";
@@ -188,25 +175,42 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
}
@Override
public InviteInfo getInviteInfoByDeviceAndChannel(InviteSessionType type, String deviceId, String channelId) {
return getInviteInfo(type, deviceId, channelId, null);
public List<InviteInfo> getAllInviteInfo(InviteSessionType type, Integer channelId, String stream) {
String key = VideoManagerConstants.INVITE_PREFIX +
":" + (type != null ? type : "*") +
":" + (channelId != null ? channelId : "*") +
":" + (stream != null ? stream : "*")
+ ":*";
List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
if (scanResult.isEmpty()) {
return new ArrayList<>();
}
List<InviteInfo> result = new ArrayList<>();
for (Object keyObj : scanResult) {
result.add((InviteInfo) redisTemplate.opsForValue().get(keyObj));
}
return result;
}
@Override
public InviteInfo getInviteInfoByDeviceAndChannel(InviteSessionType type, Integer channelId) {
return getInviteInfo(type, channelId, null);
}
@Override
public InviteInfo getInviteInfoByStream(InviteSessionType type, String stream) {
return getInviteInfo(type, null, null, stream);
return getInviteInfo(type, null, stream);
}
@Override
public void removeInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) {
public void removeInviteInfo(InviteSessionType type, Integer channelId, String stream) {
String scanKey = VideoManagerConstants.INVITE_PREFIX +
":" + (type != null ? type : "*") +
":" + (deviceId != null ? deviceId : "*") +
":" + (channelId != null ? channelId : "*") +
":" + (stream != null ? stream : "*") +
":*";
List<Object> scanResult = RedisUtil.scan(redisTemplate, scanKey);
if (scanResult.size() > 0) {
if (!scanResult.isEmpty()) {
for (Object keyObj : scanResult) {
String key = (String) keyObj;
InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(key);
@@ -214,35 +218,31 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
continue;
}
redisTemplate.delete(key);
inviteErrorCallbackMap.remove(buildKey(type, deviceId, channelId, inviteInfo.getStream()));
inviteErrorCallbackMap.remove(buildKey(type,channelId, inviteInfo.getStream()));
}
}
}
@Override
public void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, String deviceId, String channelId) {
removeInviteInfo(inviteSessionType, deviceId, channelId, null);
public void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, Integer channelId) {
removeInviteInfo(inviteSessionType, channelId, null);
}
@Override
public void removeInviteInfo(InviteInfo inviteInfo) {
removeInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
removeInviteInfo(inviteInfo.getType(), inviteInfo.getChannelId(), inviteInfo.getStream());
}
@Override
public void once(InviteSessionType type, String deviceId, String channelId, String stream, ErrorCallback<StreamInfo> callback) {
String key = buildKey(type, deviceId, channelId, stream);
List<ErrorCallback<StreamInfo>> callbacks = inviteErrorCallbackMap.get(key);
if (callbacks == null) {
callbacks = new CopyOnWriteArrayList<>();
inviteErrorCallbackMap.put(key, callbacks);
}
public void once(InviteSessionType type, Integer channelId, String stream, ErrorCallback<StreamInfo> callback) {
String key = buildKey(type, channelId, stream);
List<ErrorCallback<StreamInfo>> callbacks = inviteErrorCallbackMap.computeIfAbsent(key, k -> new CopyOnWriteArrayList<>());
callbacks.add(callback);
}
private String buildKey(InviteSessionType type, String deviceId, String channelId, String stream) {
String key = type + ":" + deviceId + ":" + channelId;
private String buildKey(InviteSessionType type, Integer channelId, String stream) {
String key = type + ":" + channelId;
// 如果ssrc未null那么可以实现一个通道只能一次操作ssrc不为null则可以支持一个通道多次invite
if (stream != null) {
key += (":" + stream);
@@ -253,7 +253,12 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
@Override
public void clearInviteInfo(String deviceId) {
removeInviteInfo(null, deviceId, null, null);
List<InviteInfo> inviteInfoList = getAllInviteInfo(null, null, null);
for (InviteInfo inviteInfo : inviteInfoList) {
if (inviteInfo.getDeviceId().equals(deviceId)) {
removeInviteInfo(inviteInfo);
}
}
}
@Override
@@ -261,7 +266,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
int count = 0;
String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:*";
List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
if (scanResult.size() == 0) {
if (scanResult.isEmpty()) {
return 0;
}else {
for (Object keyObj : scanResult) {
@@ -282,8 +287,8 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
}
@Override
public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, StreamInfo data) {
String key = buildSubStreamKey(type, deviceId, channelId, stream);
public void call(InviteSessionType type, Integer channelId, String stream, int code, String msg, StreamInfo data) {
String key = buildSubStreamKey(type, channelId, stream);
List<ErrorCallback<StreamInfo>> callbacks = inviteErrorCallbackMap.get(key);
if (callbacks == null) {
return;
@@ -295,8 +300,8 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
}
private String buildSubStreamKey(InviteSessionType type, String deviceId, String channelId, String stream) {
String key = type + ":" + ":" + deviceId + ":" + channelId;
private String buildSubStreamKey(InviteSessionType type, Integer channelId, String stream) {
String key = type + ":" + channelId;
// 如果ssrc为null那么可以实现一个通道只能一次操作ssrc不为null则可以支持一个通道多次invite
if (stream != null) {
key += (":" + stream);
@@ -317,7 +322,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
@Override
public InviteInfo updateInviteInfoForSSRC(InviteInfo inviteInfo, String ssrc) {
InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getChannelId(), inviteInfo.getStream());
if (inviteInfoInDb == null) {
return null;
}

View File

@@ -63,7 +63,8 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
/**
* 获取通道使用的分组中未分享的
*/
private Set<Group> getGroupNotShareByChannelList(List<CommonGBChannel> channelList, Integer platformId) {
@Transactional
public Set<Group> getGroupNotShareByChannelList(List<CommonGBChannel> channelList, Integer platformId) {
// 获取分组中未分享的节点
Set<Group> groupList = groupMapper.queryNotShareGroupForPlatformByChannelList(channelList, platformId);
// 获取这些节点的所有父节点
@@ -92,16 +93,19 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
return regionMapper.queryNotShareRegionForPlatformByRegionList(allRegion, platformId);
}
/**
* 移除空的共享,并返回移除的分组
*/
private Set<Group> deleteEmptyGroup(Set<Group> groupSet, Integer platformId) {
@Transactional
public Set<Group> deleteEmptyGroup(Set<Group> groupSet, Integer platformId) {
Iterator<Group> iterator = groupSet.iterator();
while (iterator.hasNext()) {
Group group = iterator.next();
// groupSet 为当前通道直接使用的分组,如果已经没有子分组与其他的通道,则可以移除
// 获取分组子节点
Set<Group> children = platformChannelMapper.queryShareChildrenGroup(group.getDeviceId(), platformId);
Set<Group> children = platformChannelMapper.queryShareChildrenGroup(group.getId(), platformId);
if (!children.isEmpty()) {
iterator.remove();
continue;
@@ -427,4 +431,85 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
channel.getGbName(), channel.getGbDeviceDbId(), e);
}
}
@Override
@Transactional
public void checkGroupRemove(List<CommonGBChannel> channelList, List<Group> groupList) {
List<Integer> channelIds = new ArrayList<>();
channelList.stream().forEach(commonGBChannel -> {
channelIds.add(commonGBChannel.getGbId());
});
// 获取关联这些通道的平台
List<Platform> platformList = platformChannelMapper.queryPlatFormListByChannelList(channelIds);
if (platformList.isEmpty()) {
return;
}
for (Platform platform : platformList) {
Set<Group> groupSet;
if (groupList == null || groupList.isEmpty()) {
groupSet = platformChannelMapper.queryShareGroup(platform.getId());
}else {
groupSet = new HashSet<>(groupList);
}
// 清理空的分组并发送消息
Set<Group> deleteGroup = deleteEmptyGroup(groupSet, platform.getId());
List<CommonGBChannel> channelListForEvent = new ArrayList<>();
if (!deleteGroup.isEmpty()) {
for (Group group : deleteGroup) {
channelListForEvent.add(0, CommonGBChannel.build(group));
}
}
// 发送消息
try {
// 发送catalog
eventPublisher.catalogEventPublish(platform.getId(), channelListForEvent, CatalogEvent.DEL);
} catch (Exception e) {
log.warn("[移除关联通道] 发送失败,数量:{}", channelList.size(), e);
}
}
}
@Override
@Transactional
public void checkGroupAdd(List<CommonGBChannel> channelList) {
List<Integer> channelIds = new ArrayList<>();
channelList.stream().forEach(commonGBChannel -> {
channelIds.add(commonGBChannel.getGbId());
});
List<Platform> platformList = platformChannelMapper.queryPlatFormListByChannelList(channelIds);
if (platformList.isEmpty()) {
return;
}
for (Platform platform : platformList) {
Set<Group> addGroup = getGroupNotShareByChannelList(channelList, platform.getId());
List<CommonGBChannel> channelListForEvent = new ArrayList<>();
if (!addGroup.isEmpty()) {
for (Group group : addGroup) {
channelListForEvent.add(0, CommonGBChannel.build(group));
}
platformChannelMapper.addPlatformGroup(addGroup, platform.getId());
// 发送消息
try {
// 发送catalog
eventPublisher.catalogEventPublish(platform.getId(), channelListForEvent, CatalogEvent.ADD);
} catch (Exception e) {
log.warn("[移除关联通道] 发送失败,数量:{}", channelList.size(), e);
}
}
}
}
@Override
public List<Platform> queryPlatFormListByChannelDeviceId(Integer channelId, List<String> platforms) {
return platformChannelMapper.queryPlatFormListForGBWithGBId(channelId, platforms);
}
@Override
public CommonGBChannel queryChannelByPlatformIdAndChannelId(Integer platformId, Integer channelId) {
return platformChannelMapper.queryShareChannel(platformId, channelId);
}
}

View File

@@ -466,9 +466,13 @@ public class PlatformServiceImpl implements IPlatformService {
if (gpsMsgInfo.getLng() == 0 && gpsMsgInfo.getLat() == 0) {
continue;
}
CommonGBChannel commonGBChannel = platformChannelMapper.queryShareChannel(platform.getId(), deviceChannel.getId());
if (commonGBChannel == null) {
continue;
}
// 发送GPS消息
try {
commanderForPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, subscribe);
commanderForPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, commonGBChannel, subscribe);
} catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException |
IllegalAccessException e) {
log.error("[命令发送失败] 国标级联 移动位置通知: {}", e.getMessage());
@@ -479,14 +483,14 @@ public class PlatformServiceImpl implements IPlatformService {
}
@Override
public void broadcastInvite(Platform platform, String channelId, MediaServer mediaServerItem, HookSubscribe.Event hookEvent,
public void broadcastInvite(Platform platform, CommonGBChannel channel, MediaServer mediaServerItem, HookSubscribe.Event hookEvent,
SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback) throws InvalidArgumentException, ParseException, SipException {
if (mediaServerItem == null) {
log.info("[国标级联] 语音喊话未找到可用的zlm. platform: {}", platform.getServerGBId());
return;
}
InviteInfo inviteInfoForOld = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, platform.getServerGBId(), channelId);
InviteInfo inviteInfoForOld = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, platform.getServerGBId(), channel.getGbDeviceId());
if (inviteInfoForOld != null && inviteInfoForOld.getStreamInfo() != null) {
// 如果zlm不存在这个流则删除数据即可
@@ -510,7 +514,7 @@ public class PlatformServiceImpl implements IPlatformService {
String streamId = null;
if (mediaServerItem.isRtpEnable()) {
streamId = String.format("%s_%s", platform.getServerGBId(), channelId);
streamId = String.format("%s_%s", platform.getServerGBId(), channel.getGbDeviceId());
}
// 默认不进行SSRC校验 TODO 后续可改为配置
boolean ssrcCheck = false;
@@ -524,7 +528,7 @@ public class PlatformServiceImpl implements IPlatformService {
}
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, ssrcCheck, false, null, true, false, false, tcpMode);
if (ssrcInfo == null || ssrcInfo.getPort() < 0) {
log.info("[国标级联] 发起语音喊话 开启端口监听失败, platform: {}, channel {}", platform.getServerGBId(), channelId);
log.info("[国标级联] 发起语音喊话 开启端口监听失败, platform: {}, channel {}", platform.getServerGBId(), channel.getGbDeviceId());
SipSubscribe.EventResult<Object> eventResult = new SipSubscribe.EventResult<>();
eventResult.statusCode = -1;
eventResult.msg = "端口监听失败";
@@ -533,45 +537,45 @@ public class PlatformServiceImpl implements IPlatformService {
return;
}
log.info("[国标级联] 语音喊话发起Invite消息 deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验{}",
platform.getServerGBId(), channelId, ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), ssrcInfo.getSsrc(), ssrcCheck);
platform.getServerGBId(), channel.getGbDeviceId(), ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), ssrcInfo.getSsrc(), ssrcCheck);
// 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo.getInviteInfo(platform.getServerGBId(), channelId, ssrcInfo.getStream(), ssrcInfo,
InviteInfo inviteInfo = InviteInfo.getInviteInfo(platform.getServerGBId(), channel.getGbId(), ssrcInfo.getStream(), ssrcInfo,
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), InviteSessionType.BROADCAST,
InviteSessionStatus.ready);
inviteStreamService.updateInviteInfo(inviteInfo);
String timeOutTaskKey = UUID.randomUUID().toString();
dynamicTask.startDelay(timeOutTaskKey, () -> {
// 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况
InviteInfo inviteInfoForBroadcast = inviteStreamService.getInviteInfo(InviteSessionType.BROADCAST, platform.getServerGBId(), channelId, null);
InviteInfo inviteInfoForBroadcast = inviteStreamService.getInviteInfo(InviteSessionType.BROADCAST, platform.getServerGBId(), channel.getGbDeviceId(), null);
if (inviteInfoForBroadcast == null) {
log.info("[国标级联] 发起语音喊话 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", platform.getServerGBId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc());
log.info("[国标级联] 发起语音喊话 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", platform.getServerGBId(), channel.getGbDeviceId(), ssrcInfo.getPort(), ssrcInfo.getSsrc());
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
try {
commanderForPlatform.streamByeCmd(platform, channelId, ssrcInfo.getStream(), null, null);
commanderForPlatform.streamByeCmd(platform, channel.getGbDeviceId(), ssrcInfo.getStream(), null, null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
log.error("[点播超时] 发送BYE失败 {}", e.getMessage());
} finally {
timeoutCallback.run(1, "收流超时");
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream());
streamSession.remove(platform.getServerGBId(), channel.getGbDeviceId(), ssrcInfo.getStream());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
}
}
}, userSetting.getPlayTimeout());
commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (hookData)->{
log.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channelId);
commanderForPlatform.broadcastInviteCmd(platform, channel.getGbDeviceId(), mediaServerItem, ssrcInfo, (hookData)->{
log.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channel.getGbDeviceId());
dynamicTask.stop(timeOutTaskKey);
// hook响应
onPublishHandlerForBroadcast(hookData.getMediaServer(), hookData.getMediaInfo(), platform.getServerGBId(), channelId);
onPublishHandlerForBroadcast(hookData.getMediaServer(), hookData.getMediaInfo(), platform.getServerGBId(), channel.getGbDeviceId());
// 收到流
if (hookEvent != null) {
hookEvent.response(hookData);
}
}, event -> {
inviteOKHandler(event, ssrcInfo, tcpMode, ssrcCheck, mediaServerItem, platform, channelId, timeOutTaskKey,
inviteOKHandler(event, ssrcInfo, tcpMode, ssrcCheck, mediaServerItem, platform, channel, timeOutTaskKey,
null, inviteInfo, InviteSessionType.BROADCAST);
// // 收到200OK 检测ssrc是否有变化防止上级自定义了ssrc
// ResponseEvent responseEvent = (ResponseEvent) event.event;
@@ -633,7 +637,7 @@ public class PlatformServiceImpl implements IPlatformService {
}
private void inviteOKHandler(SipSubscribe.EventResult eventResult, SSRCInfo ssrcInfo, int tcpMode, boolean ssrcCheck, MediaServer mediaServerItem,
Platform platform, String channelId, String timeOutTaskKey, ErrorCallback<Object> callback,
Platform platform, CommonGBChannel channel, String timeOutTaskKey, ErrorCallback<Object> callback,
InviteInfo inviteInfo, InviteSessionType inviteSessionType){
inviteInfo.setStatus(InviteSessionStatus.ok);
ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
@@ -648,7 +652,7 @@ public class PlatformServiceImpl implements IPlatformService {
if (mediaServerItem.isRtpEnable()) {
// 多端口
if (tcpMode == 2) {
tcpActiveHandler(platform, channelId, contentString, mediaServerItem, tcpMode, ssrcCheck,
tcpActiveHandler(platform, channel.getGbDeviceId(), contentString, mediaServerItem, tcpMode, ssrcCheck,
timeOutTaskKey, ssrcInfo, callback);
}
}else {
@@ -671,8 +675,8 @@ public class PlatformServiceImpl implements IPlatformService {
Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
if (!result) {
try {
log.warn("[Invite 200OK] 更新ssrc失败停止喊话 {}/{}", platform.getServerGBId(), channelId);
commanderForPlatform.streamByeCmd(platform, channelId, ssrcInfo.getStream(), null, null);
log.warn("[Invite 200OK] 更新ssrc失败停止喊话 {}/{}", platform.getServerGBId(), channel.getGbDeviceId());
commanderForPlatform.streamByeCmd(platform, channel.getGbDeviceId(), ssrcInfo.getStream(), null, null);
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
log.error("[命令发送失败] 停止播放, 发送BYE: {}", e.getMessage());
}
@@ -681,11 +685,11 @@ public class PlatformServiceImpl implements IPlatformService {
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream());
streamSession.remove(platform.getServerGBId(), channel.getGbDeviceId(), ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
"下级自定义了ssrc,重新设置收流信息失败", null);
inviteStreamService.call(inviteSessionType, platform.getServerGBId(), channelId, null,
inviteStreamService.call(inviteSessionType, platform.getServerGBId(), channel.getGbDeviceId(), null,
InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
"下级自定义了ssrc,重新设置收流信息失败", null);
@@ -695,7 +699,7 @@ public class PlatformServiceImpl implements IPlatformService {
inviteInfo.setStream(ssrcInfo.getStream());
if (tcpMode == 2) {
if (mediaServerItem.isRtpEnable()) {
tcpActiveHandler(platform, channelId, contentString, mediaServerItem, tcpMode, ssrcCheck,
tcpActiveHandler(platform, channel.getGbDeviceId(), contentString, mediaServerItem, tcpMode, ssrcCheck,
timeOutTaskKey, ssrcInfo, callback);
}else {
log.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流");
@@ -709,7 +713,7 @@ public class PlatformServiceImpl implements IPlatformService {
inviteInfo.setStream(ssrcInfo.getStream());
if (tcpMode == 2) {
if (mediaServerItem.isRtpEnable()) {
tcpActiveHandler(platform, channelId, contentString, mediaServerItem, tcpMode, ssrcCheck,
tcpActiveHandler(platform, channel.getGbDeviceId(), contentString, mediaServerItem, tcpMode, ssrcCheck,
timeOutTaskKey, ssrcInfo, callback);
}else {
log.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流");
@@ -721,13 +725,20 @@ public class PlatformServiceImpl implements IPlatformService {
if (ssrcInResponse != null) {
// 单端口
// 重新订阅流上线
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(inviteInfo.getDeviceId(),
inviteInfo.getChannelId(), null, inviteInfo.getStream());
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(inviteInfo.getChannelId(), null, inviteInfo.getStream());
streamSession.remove(inviteInfo.getDeviceId(),
inviteInfo.getChannelId(), inviteInfo.getStream());
inviteStreamService.updateInviteInfoForSSRC(inviteInfo, ssrcInResponse);
streamSession.put(platform.getServerGBId(), channelId, ssrcTransaction.getCallId(),
inviteInfo.getStream(), ssrcInResponse, mediaServerItem.getId(), (SIPResponse) responseEvent.getResponse(), inviteSessionType);
ssrcTransaction.setPlatformId(platform.getServerGBId());
ssrcTransaction.setChannelId(channel.getGbId());
ssrcTransaction.setStream(inviteInfo.getStream());
ssrcTransaction.setSsrc(ssrcInResponse);
ssrcTransaction.setMediaServerId(mediaServerItem.getId());
ssrcTransaction.setSipTransactionInfo(new SipTransactionInfo((SIPResponse) responseEvent.getResponse()));
ssrcTransaction.setType(inviteSessionType);
streamSession.put(ssrcTransaction);
}
}
}

View File

@@ -261,7 +261,7 @@ public class PlayServiceImpl implements IPlayService {
int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(event.getMediaServer(), event.getStream(), null,
device.isSsrcCheck(), true, 0, false, !deviceChannel.isHasAudio(), false, tcpMode);
playBack(event.getMediaServer(), ssrcInfo, deviceId, channelId, startTime, endTime, null);
playBack(event.getMediaServer(), ssrcInfo, deviceId, deviceChannel, startTime, endTime, null);
}
}
@@ -494,7 +494,7 @@ public class PlayServiceImpl implements IPlayService {
}
// 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getDeviceId(), ssrcInfo.getStream(), ssrcInfo,
InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo,
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY,
InviteSessionStatus.ready);
inviteStreamService.updateInviteInfo(inviteInfo);
@@ -780,12 +780,11 @@ public class PlayServiceImpl implements IPlayService {
String stream = deviceId + "_" + channelId + "_" + startTimeStr + "_" + endTimeTimeStr;
int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, stream, null, device.isSsrcCheck(), true, 0, false, !channel.isHasAudio(), false, tcpMode);
playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, callback);
playBack(newMediaServerItem, ssrcInfo, deviceId, channel, startTime, endTime, callback);
}
@Override
public void playBack(MediaServer mediaServerItem, SSRCInfo ssrcInfo,
String deviceId, String channelId, String startTime,
String deviceId, DeviceChannel channel, String startTime,
String endTime, ErrorCallback<StreamInfo> callback) {
if (mediaServerItem == null || ssrcInfo == null) {
callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
@@ -799,28 +798,28 @@ public class PlayServiceImpl implements IPlayService {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备: " + deviceId + "不存在");
}
log.info("[录像回放] deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}, 收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验{}",
device.getDeviceId(), channelId, startTime, endTime, ssrcInfo.getPort(), device.getStreamMode(),
device.getDeviceId(), channel.getGbDeviceId(), startTime, endTime, ssrcInfo.getPort(), device.getStreamMode(),
ssrcInfo.getSsrc(), device.isSsrcCheck());
// 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo,
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAYBACK,
InviteSessionStatus.ready);
inviteStreamService.updateInviteInfo(inviteInfo);
String playBackTimeOutTaskKey = UUID.randomUUID().toString();
dynamicTask.startDelay(playBackTimeOutTaskKey, () -> {
log.warn("[录像回放] 超时deviceId{} channelId{}", deviceId, channelId);
log.warn("[录像回放] 超时deviceId{} channelId{}", deviceId, channel.getGbDeviceId());
inviteStreamService.removeInviteInfo(inviteInfo);
callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
try {
cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
cmder.streamByeCmd(device, channel.getGbDeviceId(), ssrcInfo.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException e) {
log.error("[录像回放] 超时 发送BYE失败 {}", e.getMessage());
} catch (SsrcTransactionNotFoundException e) {
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
streamSession.remove(deviceId, channel.getGbDeviceId(), ssrcInfo.getStream());
}
}, userSetting.getPlayTimeout());
@@ -831,14 +830,14 @@ public class PlayServiceImpl implements IPlayService {
String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg), null);
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
streamSession.remove(device.getDeviceId(), channel.getGbDeviceId(), ssrcInfo.getStream());
inviteStreamService.removeInviteInfo(inviteInfo);
};
HookSubscribe.Event hookEvent = (hookData) -> {
log.info("收到回放订阅消息: " + hookData);
dynamicTask.stop(playBackTimeOutTaskKey);
StreamInfo streamInfo = onPublishHandlerForPlayback(hookData.getMediaServer(), hookData.getMediaInfo(), deviceId, channelId, startTime, endTime);
StreamInfo streamInfo = onPublishHandlerForPlayback(hookData.getMediaServer(), hookData.getMediaInfo(), deviceId, channel.getGbDeviceId(), startTime, endTime);
if (streamInfo == null) {
log.warn("设备回放API调用失败");
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
@@ -846,14 +845,14 @@ public class PlayServiceImpl implements IPlayService {
return;
}
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
log.info("[录像回放] 成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channelId, startTime, endTime);
log.info("[录像回放] 成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channel.getGbDeviceId(), startTime, endTime);
};
try {
cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime,
cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channel.getGbDeviceId(), startTime, endTime,
hookEvent, eventResult -> {
// 处理收到200ok后的TCP主动连接以及SSRC不一致的问题
InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channelId,
InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channel.getGbDeviceId(),
playBackTimeOutTaskKey, callback, inviteInfo, InviteSessionType.PLAYBACK);
}, errorEvent);
} catch (InvalidArgumentException | SipException | ParseException e) {