添加zlm集群支持

This commit is contained in:
64850858
2021-07-16 16:34:51 +08:00
parent 06d78575cc
commit 89a9ab4534
75 changed files with 2431 additions and 914 deletions

View File

@@ -2,10 +2,11 @@ package com.genersoft.iot.vmp.storager;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.dto.IMediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import java.util.List;
import java.util.Map;
@@ -40,19 +41,6 @@ public interface IRedisCatchStorage {
StreamInfo queryPlayByDevice(String deviceId, String channelId);
/**
* 更新流媒体信息
* @param ZLMServerConfig
* @return
*/
boolean updateMediaInfo(ZLMServerConfig ZLMServerConfig);
/**
* 获取流媒体信息
* @return
*/
ZLMServerConfig getMediaInfo();
Map<String, StreamInfo> queryPlayByDeviceId(String deviceId);
boolean startPlayback(StreamInfo stream);
@@ -114,6 +102,13 @@ public interface IRedisCatchStorage {
*/
void clearCatchByDeviceId(String deviceId);
/**
* 获取mediaServer节点
* @param mediaServerId
* @return
*/
// MediaServerItem getMediaInfo(String mediaServerId);
/**
* 设置所有设备离线
*/

View File

@@ -3,6 +3,8 @@ package com.genersoft.iot.vmp.storager;
import java.util.List;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
@@ -365,4 +367,12 @@ public interface IVideoManagerStorager {
* @param online
*/
void updateParentPlatformStatus(String platformGbID, boolean online);
/**
* 更新媒体节点
* @param mediaServerItem
*/
void updateMediaServer(MediaServerItem mediaServerItem);
List<StreamProxyItem> getStreamProxyListForEnableInMediaServer(String id, boolean b);
}

View File

@@ -12,9 +12,10 @@ import java.util.List;
public interface GbStreamMapper {
@Insert("INSERT INTO gb_stream (app, stream, gbId, name, " +
"longitude, latitude, streamType, status) VALUES" +
"longitude, latitude, streamType, mediaServerId, status) VALUES" +
"('${app}', '${stream}', '${gbId}', '${name}', " +
"'${longitude}', '${latitude}', '${streamType}', ${status})")
"'${longitude}', '${latitude}', '${streamType}', " +
"'${mediaServerId}', ${status})")
int add(GbStream gbStream);
@Update("UPDATE gb_stream " +
@@ -25,6 +26,7 @@ public interface GbStreamMapper {
"streamType=#{streamType}," +
"longitude=#{longitude}, " +
"latitude=#{latitude}," +
"mediaServerId=#{mediaServerId}," +
"status=${status} " +
"WHERE app=#{app} AND stream=#{stream} AND gbId=#{gbId}")
int update(GbStream gbStream);
@@ -52,4 +54,7 @@ public interface GbStreamMapper {
"SET status=${status} " +
"WHERE app=#{app} AND stream=#{stream}")
void setStatus(String app, String stream, boolean status);
@Select("SELECT gs.*, pgs.platformId FROM gb_stream gs LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream WHERE mediaServerId=#{mediaServerId} ")
List<GbStream> selectAllByMediaServerId(String mediaServerId);
}

View File

@@ -0,0 +1,99 @@
package com.genersoft.iot.vmp.storager.dao;
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.media.zlm.dto.IMediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import org.springframework.stereotype.Repository;
import java.util.List;
@Mapper
@Repository
public interface MediaServerMapper {
@Insert("INSERT INTO media_server (" +
"id, " +
"ip, " +
"hookIp, " +
"sdpIp, " +
"streamIp, " +
"httpPort, " +
"httpSSlPort, " +
"rtmpPort, " +
"rtmpSSlPort, " +
"rtpProxyPort, " +
"rtspPort, " +
"rtspSSLPort, " +
"autoConfig, " +
"secret, " +
"streamNoneReaderDelayMS, " +
"rtpEnable, " +
"rtpPortRange, " +
"recordAssistPort, " +
"createTime, " +
"updateTime" +
") VALUES " +
"(" +
"'${id}', " +
"'${ip}', " +
"'${hookIp}', " +
"'${sdpIp}', " +
"'${streamIp}', " +
"${httpPort}, " +
"${httpSSlPort}, " +
"${rtmpPort}, " +
"${rtmpSSlPort}, " +
"${rtpProxyPort}, " +
"${rtspPort}, " +
"${rtspSSLPort}, " +
"${autoConfig}, " +
"'${secret}', " +
"${streamNoneReaderDelayMS}, " +
"${rtpEnable}, " +
"'${rtpPortRange}', " +
"${recordAssistPort}, " +
"'${createTime}', " +
"'${updateTime}')")
int add(IMediaServerItem mediaServerItem);
@Update(value = {" <script>" +
"UPDATE media_server " +
"SET updateTime='${updateTime}'" +
"<if test=\"ip != null\">, ip='${ip}'</if>" +
"<if test=\"hookIp != null\">, hookIp='${hookIp}'</if>" +
"<if test=\"sdpIp != null\">, sdpIp='${sdpIp}'</if>" +
"<if test=\"streamIp != null\">, streamIp='${streamIp}'</if>" +
"<if test=\"httpPort != null\">, httpPort=${httpPort}</if>" +
"<if test=\"httpSSlPort != null\">, httpSSlPort=${httpSSlPort}</if>" +
"<if test=\"rtmpPort != null\">, rtmpPort=${rtmpPort}</if>" +
"<if test=\"rtmpSSlPort != null\">, rtmpSSlPort=${rtmpSSlPort}</if>" +
"<if test=\"rtpProxyPort != null\">, rtpProxyPort=${rtpProxyPort}</if>" +
"<if test=\"rtspPort != null\">, rtspPort=${rtspPort}</if>" +
"<if test=\"rtspSSLPort != null\">, rtspSSLPort=${rtspSSLPort}</if>" +
"<if test=\"autoConfig != null\">, autoConfig=${autoConfig}</if>" +
"<if test=\"streamNoneReaderDelayMS != null\">, streamNoneReaderDelayMS=${streamNoneReaderDelayMS}</if>" +
"<if test=\"rtpEnable != null\">, rtpEnable=${rtpEnable}</if>" +
"<if test=\"rtpPortRange != null\">, rtpPortRange='${rtpPortRange}'</if>" +
"<if test=\"secret != null\">, secret='${secret}'</if>" +
"<if test=\"recordAssistPort != null\">, recordAssistPort=${recordAssistPort}</if>" +
"WHERE id='${id}'"+
" </script>"})
int update(IMediaServerItem mediaServerItem);
@Select("SELECT * FROM media_server WHERE id='${id}'")
MediaServerItem queryOne(String id);
@Select("SELECT * FROM media_server")
List<MediaServerItem> queryAll();
@Select("DELETE FROM media_server WHERE id='${id}'")
int delOne(String secret);
@Select("SELECT * FROM media_server WHERE ip='${host}' and httpPort=${port}")
MediaServerItem queryOneByHostAndPort(String host, int port);
}

View File

@@ -10,10 +10,10 @@ import java.util.List;
@Repository
public interface StreamProxyMapper {
@Insert("INSERT INTO stream_proxy (type, app, stream, url, src_url, dst_url, " +
"timeout_ms, ffmpeg_cmd_key, rtp_type, enable_hls, enable_mp4, enable) VALUES" +
"('${type}','${app}', '${stream}', '${url}', '${src_url}', '${dst_url}', " +
"'${timeout_ms}', '${ffmpeg_cmd_key}', '${rtp_type}', ${enable_hls}, ${enable_mp4}, ${enable} )")
@Insert("INSERT INTO stream_proxy (type, app, stream,mediaServerId, url, src_url, dst_url, " +
"timeout_ms, ffmpeg_cmd_key, rtp_type, enable_hls, enable_mp4, enable, createTime) VALUES" +
"('${type}','${app}', '${stream}', '${mediaServerId}','${url}', '${src_url}', '${dst_url}', " +
"'${timeout_ms}', '${ffmpeg_cmd_key}', '${rtp_type}', ${enable_hls}, ${enable_mp4}, ${enable}, '${createTime}' )")
int add(StreamProxyItem streamProxyDto);
@Update("UPDATE stream_proxy " +
@@ -21,6 +21,7 @@ public interface StreamProxyMapper {
"app=#{app}," +
"stream=#{stream}," +
"url=#{url}, " +
"mediaServerId=#{mediaServerId}, " +
"src_url=#{src_url}," +
"dst_url=#{dst_url}, " +
"timeout_ms=#{timeout_ms}, " +
@@ -35,12 +36,17 @@ public interface StreamProxyMapper {
@Delete("DELETE FROM stream_proxy WHERE app=#{app} AND stream=#{stream}")
int del(String app, String stream);
@Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream")
@Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream order by st.createTime desc")
List<StreamProxyItem> selectAll();
@Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.enable=${enable}")
@Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.enable=${enable} order by st.createTime desc")
List<StreamProxyItem> selectForEnable(boolean enable);
@Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.app=#{app} AND st.stream=#{stream}")
@Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.app=#{app} AND st.stream=#{stream} order by st.createTime desc")
StreamProxyItem selectOne(String app, String stream);
@Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st " +
"LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream " +
"WHERE st.enable=${enable} and st.mediaServerId = '${id}' order by st.createTime desc")
List<StreamProxyItem> selectForEnableInMediaServer(String id, boolean enable);
}

View File

@@ -11,14 +11,15 @@ import java.util.List;
public interface StreamPushMapper {
@Insert("INSERT INTO stream_push (app, stream, totalReaderCount, originType, originTypeStr, " +
"createStamp, aliveSecond) VALUES" +
"createStamp, aliveSecond, mediaServerId) VALUES" +
"('${app}', '${stream}', '${totalReaderCount}', '${originType}', '${originTypeStr}', " +
"'${createStamp}', '${aliveSecond}' )")
"'${createStamp}', '${aliveSecond}', '${mediaServerId}' )")
int add(StreamPushItem streamPushItem);
@Update("UPDATE stream_push " +
"SET app=#{app}," +
"stream=#{stream}," +
"mediaServerId=#{mediaServerId}," +
"totalReaderCount=#{totalReaderCount}, " +
"originType=#{originType}," +
"originTypeStr=#{originTypeStr}, " +
@@ -41,10 +42,10 @@ public interface StreamPushMapper {
@Insert("<script>" +
"INSERT INTO stream_push (app, stream, totalReaderCount, originType, originTypeStr, " +
"createStamp, aliveSecond) " +
"createStamp, aliveSecond, mediaServerId) " +
"VALUES <foreach collection='streamPushItems' item='item' index='index' >" +
"( '${item.app}', '${item.stream}', '${item.totalReaderCount}', '${item.originType}', " +
"'${item.originTypeStr}','${item.createStamp}', '${item.aliveSecond}' )" +
"'${item.originTypeStr}','${item.createStamp}', '${item.aliveSecond}', '${item.mediaServerId}' )" +
" </foreach>" +
"</script>")
void addAll(List<StreamPushItem> streamPushItems);

View File

@@ -5,6 +5,8 @@ import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.media.zlm.dto.IMediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
@@ -87,26 +89,6 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
return (StreamInfo)redis.get(playLeys.get(0).toString());
}
/**
* 更新流媒体信息
* @param ZLMServerConfig
* @return
*/
@Override
public boolean updateMediaInfo(ZLMServerConfig ZLMServerConfig) {
ZLMServerConfig.setUpdateTime(format.format(new Date(System.currentTimeMillis())));
return redis.set(VideoManagerConstants.MEDIA_SERVER_PREFIX, ZLMServerConfig);
}
/**
* 获取流媒体信息
* @return
*/
@Override
public ZLMServerConfig getMediaInfo() {
return (ZLMServerConfig)redis.get(VideoManagerConstants.MEDIA_SERVER_PREFIX);
}
@Override
public Map<String, StreamInfo> queryPlayByDeviceId(String deviceId) {
Map<String, StreamInfo> streamInfos = new HashMap<>();
@@ -297,7 +279,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Override
public void outlineForAll() {
List<Object> onlineDevices = redis.scan(String.format("%S*", VideoManagerConstants.KEEPLIVEKEY_PREFIX));
List<Object> onlineDevices = redis.scan(VideoManagerConstants.KEEPLIVEKEY_PREFIX + "*" );
for (int i = 0; i < onlineDevices.size(); i++) {
String key = (String) onlineDevices.get(i);
redis.del(key);
@@ -308,4 +290,5 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
public void updateWVPInfo(JSONObject jsonObject) {
}
}

View File

@@ -5,6 +5,7 @@ import java.util.*;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -70,6 +71,9 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
@Autowired
private VideoStreamSessionManager streamSession;
@Autowired
private MediaServerMapper mediaServerMapper;
private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@@ -459,6 +463,8 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
boolean result = false;
streamProxyItem.setStreamType("proxy");
streamProxyItem.setStatus(true);
String now = this.format.format(new Date(System.currentTimeMillis()));
streamProxyItem.setCreateTime(now);
try {
if (gbStreamMapper.add(streamProxyItem)<0 || streamProxyMapper.add(streamProxyItem) < 0) {
//事务回滚
@@ -467,6 +473,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
result = true;
dataSourceTransactionManager.commit(transactionStatus); //手动提交
}catch (Exception e) {
logger.error("向数据库添加流代理失败:", e);
dataSourceTransactionManager.rollback(transactionStatus);
}
return result;
@@ -599,4 +606,21 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
public void updateParentPlatformStatus(String platformGbID, boolean online) {
platformMapper.updateParentPlatformStatus(platformGbID, online);
}
@Override
public void updateMediaServer(MediaServerItem mediaServerItem) {
String now = this.format.format(new Date(System.currentTimeMillis()));
mediaServerItem.setUpdateTime(now);
if (mediaServerMapper.queryOne(mediaServerItem.getId()) != null) {
mediaServerMapper.update(mediaServerItem);
}else {
mediaServerItem.setCreateTime(now);
mediaServerMapper.add(mediaServerItem);
}
}
@Override
public List<StreamProxyItem> getStreamProxyListForEnableInMediaServer(String id, boolean enable) {
return streamProxyMapper.selectForEnableInMediaServer(id, enable);
}
}