临时提交

This commit is contained in:
648540858
2024-06-27 16:02:43 +08:00
parent 1616a2a731
commit 400eef2d2e
25 changed files with 135 additions and 162 deletions

View File

@@ -4,7 +4,7 @@ import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
import com.genersoft.iot.vmp.streamPush.bean.StreamPush;
import com.github.pagehelper.PageInfo;
import java.util.List;

View File

@@ -1,131 +0,0 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.github.pagehelper.PageInfo;
import java.util.List;
import java.util.Map;
/**
* @author lin
*/
public interface IStreamPushService {
/**
* 将应用名和流ID加入国标关联
* @param stream
* @return
*/
boolean saveToGB(GbStream stream);
/**
* 将应用名和流ID移出国标关联
* @param stream
* @return
*/
boolean removeFromGB(GbStream stream);
/**
* 获取
*/
PageInfo<StreamPush> getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId);
List<StreamPush> getPushList(String mediaSererId);
StreamPush transform(OnStreamChangedHookParam item);
StreamPush getPush(String app, String streamId);
boolean stop(StreamPush streamPush);
/**
* 停止一路推流
* @param app 应用名
* @param stream 流ID
*/
boolean stopByAppAndStream(String app, String stream);
/**
* 新的节点加入
*/
void zlmServerOnline(String mediaServerId);
/**
* 节点离线
*/
void zlmServerOffline(String mediaServerId);
/**
* 清空
*/
void clean();
boolean saveToRandomGB();
/**
* 批量添加
*/
void batchAdd(List<StreamPush> streamPushExcelDtoList);
/**
* 中止多个推流
*/
boolean batchStop(List<GbStream> streamPushItems);
/**
* 导入时批量增加
*/
void batchAddForUpload(List<StreamPush> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll);
/**
* 全部离线
*/
void allStreamOffline();
/**
* 推流离线
*/
void offline(List<StreamPushItemFromRedis> offlineStreams);
/**
* 推流上线
*/
void online(List<StreamPushItemFromRedis> onlineStreams);
/**
* 增加推流
*/
boolean add(StreamPush stream);
boolean update(StreamPush stream);
/**
* 获取全部的app+Streanm 用于判断推流列表是新增还是修改
* @return
*/
List<String> getAllAppAndStream();
/**
* 获取统计信息
* @return
*/
ResourceBaseInfo getOverview();
Map<String, StreamPush> getAllAppAndStreamMap();
void updatePush(OnStreamChangedHookParam param);
Map<String, StreamPush> getAllGBId();
void updateStatus(StreamPush push);
void deleteByAppAndStream(String app, String stream);
void updatePushStatus(Integer streamPushId, boolean pushIng);
}

View File

@@ -4,7 +4,7 @@ import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
import com.genersoft.iot.vmp.streamPush.bean.StreamPush;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;

View File

@@ -1,671 +0,0 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOfflineEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOnlineEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.*;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import java.util.*;
import java.util.stream.Collectors;
@Service
@Slf4j
@DS("master")
public class StreamPushServiceImpl implements IStreamPushService {
@Autowired
private StreamPushMapper streamPushMapper;
@Autowired
private StreamProxyMapper streamProxyMapper;
@Autowired
private ParentPlatformMapper parentPlatformMapper;
@Autowired
private PlatformCatalogMapper platformCatalogMapper;
@Autowired
private PlatformGbStreamMapper platformGbStreamMapper;
@Autowired
private EventPublisher eventPublisher;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private UserSetting userSetting;
@Autowired
private IMediaServerService mediaServerService;
@Autowired
DataSourceTransactionManager dataSourceTransactionManager;
@Autowired
TransactionDefinition transactionDefinition;
@Autowired
private MediaConfig mediaConfig;
@Autowired
private IGbChannelService gbChannelService;
/**
* 流到来的处理
*/
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaArrivalEvent event) {
MediaInfo mediaInfo = event.getMediaInfo();
if (mediaInfo == null) {
return;
}
if (mediaInfo.getOriginType() != OriginType.RTMP_PUSH.ordinal()
&& mediaInfo.getOriginType() != OriginType.RTSP_PUSH.ordinal()
&& mediaInfo.getOriginType() != OriginType.RTC_PUSH.ordinal()) {
return;
}
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(event.getApp(), event.getStream());
if (streamAuthorityInfo == null) {
streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(event);
} else {
streamAuthorityInfo.setOriginType(mediaInfo.getOriginType());
}
redisCatchStorage.updateStreamAuthorityInfo(event.getApp(), event.getStream(), streamAuthorityInfo);
StreamPush streamPushInDb = getPush(event.getApp(), event.getStream());
if (streamPushInDb == null) {
StreamPush streamPush = StreamPush.getInstance(event, userSetting.getServerId());
streamPush.setPushIng(true);
streamPush.setUpdateTime(DateUtil.getNow());
streamPush.setPushTime(DateUtil.getNow());
streamPush.setSelf(true);
add(streamPush);
}else {
updatePushStatus(streamPushInDb.getId(), true);
}
// 冗余数据,自己系统中自用
if (!"broadcast".equals(event.getApp()) && !"talk".equals(event.getApp())) {
StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(
event.getMediaServer(), event.getApp(), event.getStream(), event.getMediaInfo(), event.getCallId());
event.getHookParam().setStreamInfo(new StreamContent(streamInfo));
redisCatchStorage.addPushListItem(event.getApp(), event.getStream(), event);
}
// 发送流变化redis消息
JSONObject jsonObject = new JSONObject();
jsonObject.put("serverId", userSetting.getServerId());
jsonObject.put("app", event.getApp());
jsonObject.put("stream", event.getStream());
jsonObject.put("register", true);
jsonObject.put("mediaServerId", event.getMediaServer().getId());
redisCatchStorage.sendStreamChangeMsg(OriginType.values()[event.getMediaInfo().getOriginType()].getType(), jsonObject);
}
/**
* 流离开的处理
*/
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaDepartureEvent event) {
// 兼容流注销时类型从redis记录获取
MediaInfo mediaInfo = redisCatchStorage.getStreamInfo(
event.getApp(), event.getStream(), event.getMediaServer().getId());
if (mediaInfo != null) {
String type = OriginType.values()[mediaInfo.getOriginType()].getType();
redisCatchStorage.removeStream(event.getMediaServer().getId(), type, event.getApp(), event.getStream());
if ("PUSH".equalsIgnoreCase(type)) {
// 冗余数据,自己系统中自用
redisCatchStorage.removePushListItem(event.getApp(), event.getStream(), event.getMediaServer().getId());
}
if (type != null) {
// 发送流变化redis消息
JSONObject jsonObject = new JSONObject();
jsonObject.put("serverId", userSetting.getServerId());
jsonObject.put("app", event.getApp());
jsonObject.put("stream", event.getStream());
jsonObject.put("register", false);
jsonObject.put("mediaServerId", event.getMediaServer().getId());
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
}
}
StreamPush push = getPush(event.getApp(), event.getStream());
push.setPushIng(false);
if (push.getGbDeviceId() != null) {
if (userSetting.isUsePushingAsStatus()) {
push.setGbStatus(false);
updateStatus(push);
// streamPushMapper.updatePushStatus(event.getApp(), event.getStream(), false);
// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
}
}else {
deleteByAppAndStream(event.getApp(), event.getStream());
}
}
/**
* 流媒体节点上线
*/
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaServerOnlineEvent event) {
zlmServerOnline(event.getMediaServerId());
}
/**
* 流媒体节点离线
*/
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaServerOfflineEvent event) {
zlmServerOffline(event.getMediaServerId());
}
@Override
public PageInfo<StreamPush> getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId) {
PageHelper.startPage(page, count);
List<StreamPush> all = streamPushMapper.selectAllForList(query, pushing, mediaServerId);
return new PageInfo<>(all);
}
@Override
public List<StreamPush> getPushList(String mediaServerId) {
return streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
}
@Override
public StreamPush getPush(String app, String stream) {
return streamPushMapper.selectByAppAndStream(app, stream);
}
@Override
@Transactional
public boolean add(StreamPush stream) {
log.info("[添加推流] app: {}, stream: {}, 国标编号: {}", stream.getApp(), stream.getStream(), stream.getGbDeviceId());
stream.setUpdateTime(DateUtil.getNow());
stream.setCreateTime(DateUtil.getNow());
int addResult = streamPushMapper.add(stream);
if (addResult <= 0) {
return false;
}
if (ObjectUtils.isEmpty(stream.getGbDeviceId())) {
return true;
}
CommonGBChannel channel = gbChannelService.queryByDeviceId(stream.getGbDeviceId());
if (channel != null) {
log.info("[添加推流]失败,国标编号已存在: {} app: {}, stream: {}, ", stream.getGbDeviceId(), stream.getApp(), stream.getStream());
}
int addChannelResult = gbChannelService.add(stream.getCommonGBChannel());
return addChannelResult > 0;
}
@Override
@Transactional
public void deleteByAppAndStream(String app, String stream) {
log.info("[删除推流] app: {}, stream: {}, ", app, stream);
StreamPush streamPush = streamPushMapper.selectByAppAndStream(app, stream);
if (streamPush == null) {
log.info("[删除推流]失败, 不存在 app: {}, stream: {}, ", app, stream);
return;
}
if (streamPush.isPushIng()) {
stop(streamPush);
}
if (streamPush.getGbId() > 0) {
gbChannelService.delete(streamPush.getGbId());
}
}
@Override
@Transactional
public boolean update(StreamPush streamPush) {
log.info("[更新推流]id: {}, app: {}, stream: {}, ", streamPush.getId(), streamPush.getApp(), streamPush.getStream());
assert streamPush.getId() != null;
streamPush.setUpdateTime(DateUtil.getNow());
streamPushMapper.update(streamPush);
if (streamPush.getGbId() > 0) {
gbChannelService.update(streamPush.getCommonGBChannel());
}
return true;
}
@Override
@Transactional
public boolean stop(StreamPush streamPush) {
log.info("[主动停止推流] id: {}, app: {}, stream: {}, ", streamPush.getId(), streamPush.getApp(), streamPush.getStream());
MediaServer mediaServer = null;
if (streamPush.getMediaServerId() == null) {
log.info("[主动停止推流]未找到使用MediaServer开始自动检索 id: {}, app: {}, stream: {}, ", streamPush.getId(), streamPush.getApp(), streamPush.getStream());
mediaServer = mediaServerService.getMediaServerByAppAndStream(streamPush.getApp(), streamPush.getStream());
if (mediaServer != null) {
log.info("[主动停止推流] 检索到MediaServer为{} id: {}, app: {}, stream: {}, ", mediaServer.getId(), streamPush.getId(), streamPush.getApp(), streamPush.getStream());
}else {
log.info("[主动停止推流]未找到使用MediaServer id: {}, app: {}, stream: {}, ", streamPush.getId(), streamPush.getApp(), streamPush.getStream());
}
}else {
mediaServer = mediaServerService.getOne(streamPush.getMediaServerId());
if (mediaServer == null) {
log.info("[主动停止推流]未找到使用的MediaServer {},开始自动检索 id: {}, app: {}, stream: {}, ",streamPush.getMediaServerId(), streamPush.getId(), streamPush.getApp(), streamPush.getStream());
mediaServer = mediaServerService.getMediaServerByAppAndStream(streamPush.getApp(), streamPush.getStream());
if (mediaServer != null) {
log.info("[主动停止推流] 检索到MediaServer为{} id: {}, app: {}, stream: {}, ", mediaServer.getId(), streamPush.getId(), streamPush.getApp(), streamPush.getStream());
}else {
log.info("[主动停止推流]未找到使用MediaServer id: {}, app: {}, stream: {}, ", streamPush.getId(), streamPush.getApp(), streamPush.getStream());
}
}
}
if (mediaServer != null) {
mediaServerService.closeStreams(mediaServer, streamPush.getApp(), streamPush.getStream());
}
streamPush.setPushIng(false);
if (userSetting.isUsePushingAsStatus()) {
streamPush.setGbStatus(false);
gbChannelService.offline(streamPush.getCommonGBChannel());
}
gbChannelService.closeSend(streamPush.getCommonGBChannel());
streamPush.setUpdateTime(DateUtil.getNow());
streamPushMapper.update(streamPush);
return true;
}
@Override
@Transactional
public boolean stopByAppAndStream(String app, String stream) {
log.info("[主动停止推流] app: {}, stream: {}, ", app, stream);
StreamPush streamPushItem = streamPushMapper.selectByAppAndStream(app, stream);
if (streamPushItem != null) {
stop(streamPushItem);
}
return true;
}
@Override
@Transactional
public void zlmServerOnline(String mediaServerId) {
// 同步zlm推流信息
MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem == null) {
return;
}
// 数据库记录
List<StreamPush> pushList = getPushList(mediaServerId);
Map<String, StreamPush> pushItemMap = new HashMap<>();
// redis记录
List<MediaInfo> mediaInfoList = redisCatchStorage.getStreams(mediaServerId, "PUSH");
Map<String, MediaInfo> streamInfoPushItemMap = new HashMap<>();
if (!pushList.isEmpty()) {
for (StreamPush streamPushItem : pushList) {
if (ObjectUtils.isEmpty(streamPushItem.getGbId())) {
pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
}
}
}
if (!mediaInfoList.isEmpty()) {
for (MediaInfo mediaInfo : mediaInfoList) {
streamInfoPushItemMap.put(mediaInfo.getApp() + mediaInfo.getStream(), mediaInfo);
}
}
// 获取所有推流鉴权信息,清理过期的
List<StreamAuthorityInfo> allStreamAuthorityInfo = redisCatchStorage.getAllStreamAuthorityInfo();
Map<String, StreamAuthorityInfo> streamAuthorityInfoInfoMap = new HashMap<>();
for (StreamAuthorityInfo streamAuthorityInfo : allStreamAuthorityInfo) {
streamAuthorityInfoInfoMap.put(streamAuthorityInfo.getApp() + streamAuthorityInfo.getStream(), streamAuthorityInfo);
}
List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaServerItem, null, null, null);
if (mediaList == null) {
return;
}
List<StreamPush> streamPushItems = handleJSON(mediaList);
if (streamPushItems != null) {
for (StreamPush streamPushItem : streamPushItems) {
pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
}
}
List<StreamPush> changedStreamPushList = new ArrayList<>(pushItemMap.values());
if (!changedStreamPushList.isEmpty()) {
for (StreamPush streamPush : changedStreamPushList) {
stop(streamPush);
}
}
// if (!changedStreamPushList.isEmpty()) {
// String type = "PUSH";
// int runLimit = 300;
// if (changedStreamPushList.size() > runLimit) {
// for (int i = 0; i < changedStreamPushList.size(); i += runLimit) {
// int toIndex = i + runLimit;
// if (i + runLimit > changedStreamPushList.size()) {
// toIndex = changedStreamPushList.size();
// }
// List<StreamPush> streamPushItemsSub = changedStreamPushList.subList(i, toIndex);
// streamPushMapper.delAll(streamPushItemsSub);
// }
// }else {
// streamPushMapper.delAll(changedStreamPushList);
// }
//
// }
Collection<MediaInfo> mediaInfos = streamInfoPushItemMap.values();
if (!mediaInfos.isEmpty()) {
String type = "PUSH";
for (MediaInfo mediaInfo : mediaInfos) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("serverId", userSetting.getServerId());
jsonObject.put("app", mediaInfo.getApp());
jsonObject.put("stream", mediaInfo.getStream());
jsonObject.put("register", false);
jsonObject.put("mediaServerId", mediaServerId);
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
// 移除redis内流的信息
redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", mediaInfo.getApp(), mediaInfo.getStream());
// 冗余数据,自己系统中自用
redisCatchStorage.removePushListItem(mediaInfo.getApp(), mediaInfo.getStream(), mediaServerItem.getId());
}
}
Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values();
if (!streamAuthorityInfos.isEmpty()) {
for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) {
// 移除redis内流的信息
redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream());
}
}
}
@Override
@Transactional
public void zlmServerOffline(String mediaServerId) {
List<StreamPush> streamPushItems = streamPushMapper.selectAllByMediaServerId(mediaServerId);
if (!streamPushItems.isEmpty()) {
for (StreamPush streamPushItem : streamPushItems) {
stop(streamPushItem);
}
}
// // 移除没有GBId的推流
// streamPushMapper.deleteWithoutGBId(mediaServerId);
// // 其他的流设置未启用
// streamPushMapper.updateStatusByMediaServerId(mediaServerId, false);
// streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false);
// 发送流停止消息
String type = "PUSH";
// 发送redis消息
List<MediaInfo> mediaInfoList = redisCatchStorage.getStreams(mediaServerId, type);
if (!mediaInfoList.isEmpty()) {
for (MediaInfo mediaInfo : mediaInfoList) {
// 移除redis内流的信息
redisCatchStorage.removeStream(mediaServerId, type, mediaInfo.getApp(), mediaInfo.getStream());
JSONObject jsonObject = new JSONObject();
jsonObject.put("serverId", userSetting.getServerId());
jsonObject.put("app", mediaInfo.getApp());
jsonObject.put("stream", mediaInfo.getStream());
jsonObject.put("register", false);
jsonObject.put("mediaServerId", mediaServerId);
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
// 冗余数据,自己系统中自用
redisCatchStorage.removePushListItem(mediaInfo.getApp(), mediaInfo.getStream(), mediaServerId);
}
}
}
@Override
@Transactional
public void batchAdd(List<StreamPush> streamPushItems) {
streamPushMapper.addAll(streamPushItems);
List<CommonGBChannel> commonGBChannels = new ArrayList<>();
for (StreamPush streamPush : streamPushItems) {
if (ObjectUtils.isEmpty(streamPush.getGbDeviceId())) {
commonGBChannels.add(streamPush.getCommonGBChannel());
}
}
gbChannelService.batchAdd(commonGBChannels);
}
@Override
public void batchAddForUpload(List<StreamPush> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) {
// 存储数据到stream_push表
streamPushMapper.addAll(streamPushItems);
List<StreamPush> streamPushItemForGbStream = streamPushItems.stream()
.filter(streamPushItem-> streamPushItem.getGbId() != null)
.collect(Collectors.toList());
// 存储数据到gb_stream表 id会返回到streamPushItemForGbStream里
if (streamPushItemForGbStream.size() > 0) {
gbStreamMapper.batchAdd(streamPushItemForGbStream);
}
// 去除没有ID也就是没有存储到数据库的数据
List<StreamPush> streamPushItemsForPlatform = streamPushItemForGbStream.stream()
.filter(streamPushItem-> streamPushItem.getGbStreamId() != null)
.collect(Collectors.toList());
if (streamPushItemsForPlatform.size() > 0) {
// 获取所有平台,平台和目录信息一般不会特别大量。
List<ParentPlatform> parentPlatformList = parentPlatformMapper.getParentPlatformList();
Map<String, Map<String, PlatformCatalog>> platformInfoMap = new HashMap<>();
if (parentPlatformList.size() == 0) {
return;
}
for (ParentPlatform platform : parentPlatformList) {
Map<String, PlatformCatalog> catalogMap = new HashMap<>();
// 创建根节点
PlatformCatalog platformCatalog = new PlatformCatalog();
platformCatalog.setId(platform.getServerGBId());
catalogMap.put(platform.getServerGBId(), platformCatalog);
// 查询所有节点信息
List<PlatformCatalog> platformCatalogs = platformCatalogMapper.selectByPlatForm(platform.getServerGBId());
if (platformCatalogs.size() > 0) {
for (PlatformCatalog catalog : platformCatalogs) {
catalogMap.put(catalog.getId(), catalog);
}
}
platformInfoMap.put(platform.getServerGBId(), catalogMap);
}
List<StreamPush> streamPushItemListFroPlatform = new ArrayList<>();
Map<String, List<GbStream>> platformForEvent = new HashMap<>();
// 遍历存储结果查找app+Stream->platformId+catalogId的对应关系然后执行批量写入
for (StreamPush streamPushItem : streamPushItemsForPlatform) {
List<String[]> platFormInfoList = streamPushItemsForAll.get(streamPushItem.getApp() + streamPushItem.getStream());
if (platFormInfoList != null && platFormInfoList.size() > 0) {
for (String[] platFormInfoArray : platFormInfoList) {
StreamPush streamPushItemForPlatform = new StreamPush();
streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId());
if (platFormInfoArray.length > 0) {
// 数组 platFormInfoArray 0 为平台ID。 1为目录ID
// 不存在这个平台,则忽略导入此关联关系
if (platformInfoMap.get(platFormInfoArray[0]) == null
|| platformInfoMap.get(platFormInfoArray[0]).get(platFormInfoArray[1]) == null) {
log.info("导入数据时不存在平台或目录{}/{},已导入未分配", platFormInfoArray[0], platFormInfoArray[1] );
continue;
}
streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]);
List<GbStream> gbStreamList = platformForEvent.get(platFormInfoArray[0]);
if (gbStreamList == null) {
gbStreamList = new ArrayList<>();
platformForEvent.put(platFormInfoArray[0], gbStreamList);
}
// 为发送通知整理数据
streamPushItemForPlatform.setName(streamPushItem.getName());
streamPushItemForPlatform.setApp(streamPushItem.getApp());
streamPushItemForPlatform.setStream(streamPushItem.getStream());
streamPushItemForPlatform.setGbId(streamPushItem.getGbId());
gbStreamList.add(streamPushItemForPlatform);
}
if (platFormInfoArray.length > 1) {
streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]);
}
streamPushItemListFroPlatform.add(streamPushItemForPlatform);
}
}
}
if (!streamPushItemListFroPlatform.isEmpty()) {
platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform);
// 发送通知
for (String platformId : platformForEvent.keySet()) {
eventPublisher.catalogEventPublishForStream(
platformId, platformForEvent.get(platformId), CatalogEvent.ADD);
}
}
}
}
@Override
public boolean batchStop(List<GbStream> gbStreams) {
if (gbStreams == null || gbStreams.size() == 0) {
return false;
}
gbStreamService.sendCatalogMsgs(gbStreams, CatalogEvent.DEL);
platformGbStreamMapper.delByGbStreams(gbStreams);
gbStreamMapper.batchDelForGbStream(gbStreams);
int delStream = streamPushMapper.delAllForGbStream(gbStreams);
if (delStream > 0) {
for (GbStream gbStream : gbStreams) {
MediaServer mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId());
mediaServerService.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream());
}
}
return true;
}
@Override
public void allStreamOffline() {
List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGb();
if (onlinePushers.size() == 0) {
return;
}
streamPushMapper.setAllStreamOffline();
// 发送通知
eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);
}
@Override
public void offline(List<StreamPushItemFromRedis> offlineStreams) {
// 更新部分设备离线
List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGbInList(offlineStreams);
streamPushMapper.offline(offlineStreams);
// 发送通知
eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);
}
@Override
public void online(List<StreamPushItemFromRedis> onlineStreams) {
// 更新部分设备上线streamPushService
List<GbStream> onlinePushers = streamPushMapper.getOfflinePusherForGbInList(onlineStreams);
streamPushMapper.online(onlineStreams);
// 发送通知
eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.ON);
}
@Override
public List<String> getAllAppAndStream() {
return streamPushMapper.getAllAppAndStream();
}
@Override
public ResourceBaseInfo getOverview() {
int total = streamPushMapper.getAllCount();
int online = streamPushMapper.getAllOnline(userSetting.isUsePushingAsStatus());
return new ResourceBaseInfo(total, online);
}
@Override
public Map<String, StreamPush> getAllAppAndStreamMap() {
return streamPushMapper.getAllAppAndStreamMap();
}
@Override
public Map<String, StreamPush> getAllGBId() {
return streamPushMapper.getAllGBId();
}
@Override
public void updateStatus(StreamPush push) {
}
@Override
public void updatePushStatus(Integer streamPushId, boolean pushIng) {
streamPushInDb.setPushIng(true);
if (userSetting.isUsePushingAsStatus()) {
streamPushInDb.setGbStatus(true);
}
streamPushInDb.setPushTime(DateUtil.getNow());
}
private List<StreamPush> handleJSON(List<StreamInfo> streamInfoList) {
if (streamInfoList == null || streamInfoList.isEmpty()) {
return null;
}
Map<String, StreamPush> result = new HashMap<>();
for (StreamInfo streamInfo : streamInfoList) {
// 不保存国标推理以及拉流代理的流
if (streamInfo.getOriginType() == OriginType.RTSP_PUSH.ordinal()
|| streamInfo.getOriginType() == OriginType.RTMP_PUSH.ordinal()
|| streamInfo.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
String key = streamInfo.getApp() + "_" + streamInfo.getStream();
StreamPush streamPushItem = result.get(key);
if (streamPushItem == null) {
streamPushItem = streamPushItem.getInstance(streamInfo);
result.put(key, streamPushItem);
}
}
}
return new ArrayList<>(result.values());
}
}

View File

@@ -1,192 +0,0 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.event.AnalysisEventListener;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import org.springframework.util.ObjectUtils;
import java.util.*;
public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPushExcelDto> {
/**
* 错误数据的回调,用于将错误数据发送给页面
*/
private ErrorDataHandler errorDataHandler;
/**
* 推流的业务类用于存储数据
*/
private IStreamPushService pushService;
/**
* 默认流媒体节点ID
*/
private String defaultMediaServerId;
/**
* 用于存储不加过滤的所有数据
*/
private final List<StreamPush> streamPushItems = new ArrayList<>();
/**
* 用于存储更具APP+Stream过滤后的数据可以直接存入stream_push表与gb_stream表
*/
private final Map<String, StreamPush> streamPushItemForSave = new HashMap<>();
/**
* 用于存储按照APP+Stream为KEY 平台ID+目录Id 为value的数据用于存储到gb_stream表后获取app+Stream对应的平台与目录信息然后存入关联表
*/
private final Map<String, List<String[]>> streamPushItemsForPlatform = new HashMap<>();
/**
* 用于判断文件是否存在重复的app+Stream+平台ID
*/
private final Set<String> streamPushStreamSet = new HashSet<>();
/**
* 用于存储APP+Stream->国标ID 的数据结构, 数据一一对应全局判断APP+Stream->国标ID是否存在不对应
*/
private final BiMap<String,String> gBMap = HashBiMap.create();
/**
* 用于存储APP+Stream-> 在数据库中的数据
*/
private final BiMap<String,String> pushMapInDb = HashBiMap.create();
/**
* 记录错误的APP+Stream
*/
private final List<String> errorStreamList = new ArrayList<>();
/**
* 记录错误的国标ID
*/
private final List<String> errorInfoList = new ArrayList<>();
/**
* 读取数量计数器
*/
private int loadedSize = 0;
public StreamPushUploadFileHandler(IStreamPushService pushService, String defaultMediaServerId, ErrorDataHandler errorDataHandler) {
this.pushService = pushService;
this.defaultMediaServerId = defaultMediaServerId;
this.errorDataHandler = errorDataHandler;
// 获取数据库已有的数据,已经存在的则忽略
List<String> allAppAndStreams = pushService.getAllAppAndStream();
if (allAppAndStreams.size() > 0) {
for (String allAppAndStream : allAppAndStreams) {
pushMapInDb.put(allAppAndStream, allAppAndStream);
}
}
}
public interface ErrorDataHandler{
void handle(List<String> streams, List<String> gbId);
}
@Override
public void invoke(StreamPushExcelDto streamPushExcelDto, AnalysisContext analysisContext) {
if (ObjectUtils.isEmpty(streamPushExcelDto.getApp())
|| ObjectUtils.isEmpty(streamPushExcelDto.getStream())
|| ObjectUtils.isEmpty(streamPushExcelDto.getGbId())) {
return;
}
Integer rowIndex = analysisContext.readRowHolder().getRowIndex();
if (gBMap.get(streamPushExcelDto.getApp() + streamPushExcelDto.getStream()) == null) {
try {
gBMap.put(streamPushExcelDto.getApp() + streamPushExcelDto.getStream(), streamPushExcelDto.getGbId());
}catch (IllegalArgumentException e) {
errorInfoList.add("行:" + rowIndex + ", " + streamPushExcelDto.getGbId() + " 国标ID重复使用");
return;
}
}else {
if (!gBMap.get(streamPushExcelDto.getApp() + streamPushExcelDto.getStream()).equals(streamPushExcelDto.getGbId())) {
errorInfoList.add("行:" + rowIndex + ", " + streamPushExcelDto.getGbId() + " 同样的应用名和流ID使用了不同的国标ID");
return;
}
}
if (streamPushStreamSet.contains(streamPushExcelDto.getApp() + streamPushExcelDto.getStream() + streamPushExcelDto.getPlatformId())) {
errorStreamList.add("行:" + rowIndex + ", " + streamPushExcelDto.getApp() + "/" + streamPushExcelDto.getStream()+ " 平台信息重复");
return;
}else {
if (pushMapInDb.get(streamPushExcelDto.getApp()+streamPushExcelDto.getStream()) != null) {
errorStreamList.add("行:" + rowIndex + ", " + streamPushExcelDto.getApp() + "/" + streamPushExcelDto.getStream()+ " 数据已存在");
return;
}
streamPushStreamSet.add(streamPushExcelDto.getApp()+streamPushExcelDto.getStream() + streamPushExcelDto.getPlatformId());
}
StreamPush streamPushItem = new StreamPush();
streamPushItem.setApp(streamPushExcelDto.getApp());
streamPushItem.setStream(streamPushExcelDto.getStream());
streamPushItem.setGbId(streamPushExcelDto.getGbId());
streamPushItem.setStatus(streamPushExcelDto.getStatus());
streamPushItem.setStreamType("push");
streamPushItem.setCreateTime(DateUtil.getNow());
streamPushItem.setMediaServerId(defaultMediaServerId);
streamPushItem.setName(streamPushExcelDto.getName());
streamPushItem.setOriginType(2);
streamPushItem.setOriginTypeStr("rtsp_push");
streamPushItem.setTotalReaderCount(0);
streamPushItem.setPlatformId(streamPushExcelDto.getPlatformId());
streamPushItem.setCatalogId(streamPushExcelDto.getCatalogId());
// 存入所有的通道信息
streamPushItems.add(streamPushItem);
streamPushItemForSave.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
if (!ObjectUtils.isEmpty(streamPushExcelDto.getPlatformId())) {
List<String[]> platformList = streamPushItemsForPlatform.get(streamPushItem.getApp() + streamPushItem.getStream());
if (platformList == null) {
platformList = new ArrayList<>();
streamPushItemsForPlatform.put(streamPushItem.getApp() + streamPushItem.getStream(), platformList);
}
String platformId = streamPushExcelDto.getPlatformId();
String catalogId = streamPushExcelDto.getCatalogId();
if (ObjectUtils.isEmpty(streamPushExcelDto.getCatalogId())) {
catalogId = null;
}
String[] platFormInfoArray = new String[]{platformId, catalogId};
platformList.add(platFormInfoArray);
}
loadedSize ++;
if (loadedSize > 1000) {
saveData();
streamPushItems.clear();
streamPushItemForSave.clear();
streamPushItemsForPlatform.clear();
loadedSize = 0;
}
}
@Override
public void doAfterAllAnalysed(AnalysisContext analysisContext) {
// 这里也要保存数据,确保最后遗留的数据也存储到数据库
saveData();
streamPushItems.clear();
streamPushItemForSave.clear();
gBMap.clear();
streamPushStreamSet.clear();
streamPushItemsForPlatform.clear();
errorDataHandler.handle(errorStreamList, errorInfoList);
}
private void saveData(){
if (streamPushItemForSave.size() > 0) {
// 向数据库查询是否存在重复的app
pushService.batchAddForUpload(new ArrayList<>(streamPushItemForSave.values()), streamPushItemsForPlatform);
}
}
}

View File

@@ -2,7 +2,7 @@ package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.streamPush.service.IStreamPushService;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;

View File

@@ -3,9 +3,9 @@ package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
import com.genersoft.iot.vmp.streamPush.bean.StreamPush;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.streamPush.service.IStreamPushService;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@@ -4,7 +4,7 @@ import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.streamPush.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.PushStreamStatusChangeFromRedisDto;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.slf4j.Logger;