使用Slf4j注解打印日志

This commit is contained in:
648540858
2024-07-03 17:24:35 +08:00
parent adf040ec4b
commit b98cfd1fed
66 changed files with 804 additions and 1039 deletions

View File

@@ -20,9 +20,8 @@ import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
@@ -35,13 +34,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
@Slf4j
@Service
@DS("share")
public class CloudRecordServiceImpl implements ICloudRecordService {
private final static Logger logger = LoggerFactory.getLogger(CloudRecordServiceImpl.class);
@Autowired
private CloudRecordServiceMapper cloudRecordServiceMapper;
@@ -116,7 +113,7 @@ public class CloudRecordServiceImpl implements ICloudRecordService {
cloudRecordItem.setCallId(streamAuthorityInfo.getCallId());
}
}
logger.info("[添加录像记录] {}/{}, callId: {}, 内容:{}", event.getApp(), event.getStream(), cloudRecordItem.getCallId(), event.getRecordInfo());
log.info("[添加录像记录] {}/{}, callId: {}, 内容:{}", event.getApp(), event.getStream(), cloudRecordItem.getCallId(), event.getRecordInfo());
cloudRecordServiceMapper.add(cloudRecordItem);
}

View File

@@ -20,8 +20,7 @@ import com.genersoft.iot.vmp.storager.dao.PlatformChannelMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -35,12 +34,11 @@ import java.util.concurrent.CopyOnWriteArrayList;
/**
* @author lin
*/
@Slf4j
@Service
@DS("master")
public class DeviceChannelServiceImpl implements IDeviceChannelService {
private final static Logger logger = LoggerFactory.getLogger(DeviceChannelServiceImpl.class);
@Autowired
private EventPublisher eventPublisher;
@@ -278,9 +276,9 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
assert !ObjectUtils.isEmpty(channel.getDeviceId());
assert !ObjectUtils.isEmpty(channel.getStreamIdentification());
if (ObjectUtils.isEmpty(channel.getStreamIdentification())) {
logger.info("[重置通道码流类型] 设备: {}, 码流: {}", channel.getDeviceId(), channel.getStreamIdentification());
log.info("[重置通道码流类型] 设备: {}, 码流: {}", channel.getDeviceId(), channel.getStreamIdentification());
}else {
logger.info("[更新通道码流类型] 设备: {}, 通道:{} 码流: {}", channel.getDeviceId(), channel.getDeviceId(),
log.info("[更新通道码流类型] 设备: {}, 通道:{} 码流: {}", channel.getDeviceId(), channel.getDeviceId(),
channel.getStreamIdentification());
}
channelMapper.updateChannelStreamIdentification(channel);
@@ -321,7 +319,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
return;
}
if (deviceChannels.size() > 100) {
logger.warn("[更新通道位置信息后发送通知] 设备可能是平台,上报的位置信息未标明通道编号," +
log.warn("[更新通道位置信息后发送通知] 设备可能是平台,上报的位置信息未标明通道编号," +
"导致所有通道被更新位置, deviceId:{}", device.getDeviceId());
}
for (DeviceChannel channel : deviceChannels) {
@@ -330,7 +328,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
try {
eventPublisher.mobilePositionEventPublish(mobilePosition);
}catch (Exception e) {
logger.error("[向上级转发移动位置失败] ", e);
log.error("[向上级转发移动位置失败] ", e);
}
// 发送redis消息。 通知位置信息的变化
JSONObject jsonObject = new JSONObject();
@@ -473,10 +471,10 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
}
if (stringBuilder.length() > 0) {
logger.info("[目录查询]收到的数据存在重复: {}" , stringBuilder);
log.info("[目录查询]收到的数据存在重复: {}" , stringBuilder);
}
if(CollectionUtils.isEmpty(channels)){
logger.info("通道重设,数据为空={}" , deviceChannelList);
log.info("通道重设,数据为空={}" , deviceChannelList);
return false;
}
int limitCount = 50;

View File

@@ -25,8 +25,7 @@ import com.genersoft.iot.vmp.storager.dao.DeviceMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformChannelMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service;
@@ -45,12 +44,11 @@ import java.util.concurrent.TimeUnit;
/**
* 设备业务(目录订阅)
*/
@Slf4j
@Service
@DS("master")
public class DeviceServiceImpl implements IDeviceService {
private final static Logger logger = LoggerFactory.getLogger(DeviceServiceImpl.class);
@Autowired
private SIPCommander cmder;
@@ -104,7 +102,7 @@ public class DeviceServiceImpl implements IDeviceService {
@Override
public void online(Device device, SipTransactionInfo sipTransactionInfo) {
logger.info("[设备上线] deviceId{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort());
log.info("[设备上线] deviceId{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort());
Device deviceInRedis = redisCatchStorage.getDevice(device.getDeviceId());
Device deviceInDb = deviceMapper.getDeviceByDeviceId(device.getDeviceId());
@@ -132,13 +130,13 @@ public class DeviceServiceImpl implements IDeviceService {
device.setOnLine(true);
device.setCreateTime(now);
device.setUpdateTime(now);
logger.info("[设备上线,首次注册]: {},查询设备信息以及通道信息", device.getDeviceId());
log.info("[设备上线,首次注册]: {},查询设备信息以及通道信息", device.getDeviceId());
deviceMapper.add(device);
redisCatchStorage.updateDevice(device);
try {
commander.deviceInfoQuery(device);
} catch (InvalidArgumentException | SipException | ParseException e) {
logger.error("[命令发送失败] 查询设备信息: {}", e.getMessage());
log.error("[命令发送失败] 查询设备信息: {}", e.getMessage());
}
sync(device);
}else {
@@ -148,11 +146,11 @@ public class DeviceServiceImpl implements IDeviceService {
deviceMapper.update(device);
redisCatchStorage.updateDevice(device);
if (userSetting.getSyncChannelOnDeviceOnline()) {
logger.info("[设备上线,离线状态下重新注册]: {},查询设备信息以及通道信息", device.getDeviceId());
log.info("[设备上线,离线状态下重新注册]: {},查询设备信息以及通道信息", device.getDeviceId());
try {
commander.deviceInfoQuery(device);
} catch (InvalidArgumentException | SipException | ParseException e) {
logger.error("[命令发送失败] 查询设备信息: {}", e.getMessage());
log.error("[命令发送失败] 查询设备信息: {}", e.getMessage());
}
sync(device);
// TODO 如果设备下的通道级联到了其他平台那么需要发送事件或者notify给上级平台
@@ -172,7 +170,7 @@ public class DeviceServiceImpl implements IDeviceService {
}else {
if (deviceChannelMapper.queryAllChannels(device.getDeviceId()).size() == 0) {
logger.info("[设备上线]: {}通道数为0,查询通道信息", device.getDeviceId());
log.info("[设备上线]: {}通道数为0,查询通道信息", device.getDeviceId());
sync(device);
}
@@ -202,7 +200,7 @@ public class DeviceServiceImpl implements IDeviceService {
@Override
public void offline(String deviceId, String reason) {
logger.warn("[设备离线]{}, device{}", reason, deviceId);
log.warn("[设备离线]{}, device{}", reason, deviceId);
Device device = deviceMapper.getDeviceByDeviceId(deviceId);
if (device == null) {
return;
@@ -255,7 +253,7 @@ public class DeviceServiceImpl implements IDeviceService {
if (device == null || device.getSubscribeCycleForCatalog() < 0) {
return false;
}
logger.info("[添加目录订阅] 设备{}", device.getDeviceId());
log.info("[添加目录订阅] 设备{}", device.getDeviceId());
// 添加目录订阅
CatalogSubscribeTask catalogSubscribeTask = new CatalogSubscribeTask(device, sipCommander, dynamicTask);
// 刷新订阅
@@ -272,7 +270,7 @@ public class DeviceServiceImpl implements IDeviceService {
if (device == null || device.getSubscribeCycleForCatalog() < 0) {
return false;
}
logger.info("[移除目录订阅]: {}", device.getDeviceId());
log.info("[移除目录订阅]: {}", device.getDeviceId());
String taskKey = device.getDeviceId() + "catalog";
if (device.isOnLine()) {
Runnable runnable = dynamicTask.get(taskKey);
@@ -290,7 +288,7 @@ public class DeviceServiceImpl implements IDeviceService {
if (device == null || device.getSubscribeCycleForMobilePosition() < 0) {
return false;
}
logger.info("[添加移动位置订阅] 设备{}", device.getDeviceId());
log.info("[添加移动位置订阅] 设备{}", device.getDeviceId());
// 添加目录订阅
MobilePositionSubscribeTask mobilePositionSubscribeTask = new MobilePositionSubscribeTask(device, sipCommander, dynamicTask);
// 设置最小值为30
@@ -306,7 +304,7 @@ public class DeviceServiceImpl implements IDeviceService {
if (device == null || device.getSubscribeCycleForCatalog() < 0) {
return false;
}
logger.info("[移除移动位置订阅]: {}", device.getDeviceId());
log.info("[移除移动位置订阅]: {}", device.getDeviceId());
String taskKey = device.getDeviceId() + "mobile_position";
if (device.isOnLine()) {
Runnable runnable = dynamicTask.get(taskKey);
@@ -332,7 +330,7 @@ public class DeviceServiceImpl implements IDeviceService {
@Override
public void sync(Device device) {
if (catalogResponseMessageHandler.isSyncRunning(device.getDeviceId())) {
logger.info("开启同步时发现同步已经存在");
log.info("开启同步时发现同步已经存在");
return;
}
int sn = (int)((Math.random()*9+1)*100000);
@@ -343,7 +341,7 @@ public class DeviceServiceImpl implements IDeviceService {
catalogResponseMessageHandler.setChannelSyncEnd(device.getDeviceId(), errorMsg);
});
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[同步通道], 信令发送失败:{}", e.getMessage() );
log.error("[同步通道], 信令发送失败:{}", e.getMessage() );
String errorMsg = String.format("同步通道失败,信令发送失败: %s", e.getMessage());
catalogResponseMessageHandler.setChannelSyncEnd(device.getDeviceId(), errorMsg);
}
@@ -381,7 +379,7 @@ public class DeviceServiceImpl implements IDeviceService {
try {
sipCommander.deviceStatusQuery(device, null);
} catch (InvalidArgumentException | SipException | ParseException e) {
logger.error("[命令发送失败] 设备状态查询: {}", e.getMessage());
log.error("[命令发送失败] 设备状态查询: {}", e.getMessage());
}
}
@@ -446,7 +444,7 @@ public class DeviceServiceImpl implements IDeviceService {
public void updateCustomDevice(Device device) {
Device deviceInStore = deviceMapper.getDeviceByDeviceId(device.getDeviceId());
if (deviceInStore == null) {
logger.warn("更新设备时未找到设备信息");
log.warn("更新设备时未找到设备信息");
return;
}

View File

@@ -12,8 +12,7 @@ import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.data.redis.core.RedisTemplate;
@@ -26,12 +25,11 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
@DS("master")
public class InviteStreamServiceImpl implements IInviteStreamService {
private final Logger logger = LoggerFactory.getLogger(InviteStreamServiceImpl.class);
private final Map<String, List<ErrorCallback<Object>>> inviteErrorCallbackMap = new ConcurrentHashMap<>();
@Autowired
@@ -73,7 +71,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
@Override
public void updateInviteInfo(InviteInfo inviteInfo, Long time) {
if (inviteInfo == null || (inviteInfo.getDeviceId() == null || inviteInfo.getChannelId() == null)) {
logger.warn("[更新Invite信息],参数不全: {}", JSON.toJSON(inviteInfo));
log.warn("[更新Invite信息],参数不全: {}", JSON.toJSON(inviteInfo));
return;
}
InviteInfo inviteInfoForUpdate = null;
@@ -91,7 +89,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
InviteInfo inviteInfoInRedis = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(),
inviteInfo.getChannelId(), inviteInfo.getStream());
if (inviteInfoInRedis == null) {
logger.warn("[更新Invite信息]未从缓存中读取到Invite信息 deviceId: {}, channel: {}, stream: {}",
log.warn("[更新Invite信息]未从缓存中读取到Invite信息 deviceId: {}, channel: {}, stream: {}",
inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
return;
}
@@ -165,7 +163,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
return null;
}
if (scanResult.size() != 1) {
logger.warn("[获取InviteInfo] 发现 key: {}存在多条", key);
log.warn("[获取InviteInfo] 发现 key: {}存在多条", key);
}
return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));

View File

@@ -13,18 +13,20 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.bean.ResultForOnPublish;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.IMediaService;
import com.genersoft.iot.vmp.service.IUserService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.MediaServerUtils;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.OtherPsSendInfo;
import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
@@ -35,11 +37,10 @@ import java.text.ParseException;
import java.util.List;
import java.util.Map;
@Slf4j
@Service
public class MediaServiceImpl implements IMediaService {
private final static Logger logger = LoggerFactory.getLogger(MediaServiceImpl.class);
@Autowired
private IRedisCatchStorage redisCatchStorage;
@@ -104,13 +105,13 @@ public class MediaServiceImpl implements IMediaService {
Map<String, String> paramMap = MediaServerUtils.urlParamToMap(params);
// 推流鉴权
if (params == null) {
logger.info("推流鉴权失败: 缺少必要参数sign=md5(user表的pushKey)");
log.info("推流鉴权失败: 缺少必要参数sign=md5(user表的pushKey)");
throw new ControllerException(ErrorCode.ERROR401.getCode(), "Unauthorized");
}
String sign = paramMap.get("sign");
if (sign == null) {
logger.info("推流鉴权失败: 缺少必要参数sign=md5(user表的pushKey)");
log.info("推流鉴权失败: 缺少必要参数sign=md5(user表的pushKey)");
throw new ControllerException(ErrorCode.ERROR401.getCode(), "Unauthorized");
}
// 推流自定义播放鉴权码
@@ -118,7 +119,7 @@ public class MediaServiceImpl implements IMediaService {
// 鉴权配置
boolean hasAuthority = userService.checkPushAuthority(callId, sign);
if (!hasAuthority) {
logger.info("推流鉴权失败: sign 无权限: callId={}. sign={}", callId, sign);
log.info("推流鉴权失败: sign 无权限: callId={}. sign={}", callId, sign);
throw new ControllerException(ErrorCode.ERROR401.getCode(), "Unauthorized");
}
StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(app, stream, mediaServer.getId());
@@ -150,7 +151,7 @@ public class MediaServiceImpl implements IMediaService {
inviteInfo = inviteStreamService.getInviteInfoBySSRC(ssrc);
if (inviteInfo != null) {
result.setStream_replace(inviteInfo.getStream());
logger.info("[ZLM HOOK]推流鉴权 stream: {} 替换为 {}", stream, inviteInfo.getStream());
log.info("[ZLM HOOK]推流鉴权 stream: {} 替换为 {}", stream, inviteInfo.getStream());
stream = inviteInfo.getStream();
}
}
@@ -234,7 +235,7 @@ public class MediaServiceImpl implements IMediaService {
try {
commanderForPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
log.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
}
redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
sendRtpItem.getCallId(), sendRtpItem.getStream());
@@ -254,14 +255,14 @@ public class MediaServiceImpl implements IMediaService {
commander.streamByeCmd(device, inviteInfo.getChannelId(),
inviteInfo.getStream(), null);
} else {
logger.info("[无人观看] 未找到设备的点播信息: {} 流:{}", inviteInfo.getDeviceId(), stream);
log.info("[无人观看] 未找到设备的点播信息: {} 流:{}", inviteInfo.getDeviceId(), stream);
}
} catch (InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e) {
logger.error("[无人观看]点播, 发送BYE失败 {}", e.getMessage());
log.error("[无人观看]点播, 发送BYE失败 {}", e.getMessage());
}
} else {
logger.info("[无人观看] 未找到设备: {},流:{}", inviteInfo.getDeviceId(), stream);
log.info("[无人观看] 未找到设备: {},流:{}", inviteInfo.getDeviceId(), stream);
}
inviteStreamService.removeInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(),
@@ -284,7 +285,7 @@ public class MediaServiceImpl implements IMediaService {
// 无人观看自动移除
result = true;
streamProxyService.del(app, stream);
logger.info("[{}/{}]<-[{}] 拉流代理无人观看已经移除", app, stream, streamProxy.getSrcUrl());
log.info("[{}/{}]<-[{}] 拉流代理无人观看已经移除", app, stream, streamProxy.getSrcUrl());
} else if (streamProxy.isEnableDisableNoneReader()) {
// 无人观看停用
result = true;

View File

@@ -6,8 +6,7 @@ import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.service.IMobilePositionService;
import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.storager.dao.DeviceMobilePositionMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
@@ -19,7 +18,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Service
public class MobilePositionServiceImpl implements IMobilePositionService {
@@ -35,8 +34,6 @@ public class MobilePositionServiceImpl implements IMobilePositionService {
@Autowired
private RedisTemplate<String, MobilePosition> redisTemplate;
private final static Logger logger = LoggerFactory.getLogger(MobilePositionServiceImpl.class);
private final String REDIS_MOBILE_POSITION_LIST = "redis_mobile_position_list";
@Override
@@ -78,7 +75,7 @@ public class MobilePositionServiceImpl implements IMobilePositionService {
if (userSetting.getSavePositionHistory()) {
mobilePositionMapper.batchadd(mobilePositions);
}
logger.info("[移动位置订阅]更新通道位置: {}", mobilePositions.size());
log.info("[移动位置订阅]更新通道位置: {}", mobilePositions.size());
Map<String, DeviceChannel> updateChannelMap = new HashMap<>();
for (MobilePosition mobilePosition : mobilePositions) {
DeviceChannel deviceChannel = new DeviceChannel();

View File

@@ -10,8 +10,7 @@ import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformCatalogMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformChannelMapper;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service;
@@ -27,12 +26,11 @@ import java.util.Map;
/**
* @author lin
*/
@Slf4j
@Service
@DS("master")
public class PlatformChannelServiceImpl implements IPlatformChannelService {
private final static Logger logger = LoggerFactory.getLogger(PlatformChannelServiceImpl.class);
@Autowired
private PlatformChannelMapper platformChannelMapper;
@@ -62,7 +60,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
public int updateChannelForGB(String platformId, List<ChannelReduce> channelReduces, String catalogId) {
ParentPlatform platform = platformMapper.getParentPlatByServerGBId(platformId);
if (platform == null) {
logger.warn("更新级联通道信息时未找到平台{}的信息", platformId);
log.warn("更新级联通道信息时未找到平台{}的信息", platformId);
return 0;
}
Map<Integer, ChannelReduce> deviceAndChannels = new HashMap<>();
@@ -95,12 +93,12 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
int count = platformChannelMapper.addChannels(platformId, channelReducesToAdd.subList(i, toIndex));
result = result || count < 0;
allCount += count;
logger.info("[关联通道]国标通道 平台:{}, 共需关联通道数:{}, 已关联:{}", platformId, channelReducesToAdd.size(), toIndex);
log.info("[关联通道]国标通道 平台:{}, 共需关联通道数:{}, 已关联:{}", platformId, channelReducesToAdd.size(), toIndex);
}
}else {
allCount = platformChannelMapper.addChannels(platformId, channelReducesToAdd);
result = result || allCount < 0;
logger.info("[关联通道]国标通道 平台:{}, 关联通道数:{}", platformId, channelReducesToAdd.size());
log.info("[关联通道]国标通道 平台:{}, 关联通道数:{}", platformId, channelReducesToAdd.size());
}
if (result) {
@@ -108,7 +106,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
dataSourceTransactionManager.rollback(transactionStatus);
allCount = 0;
}else {
logger.info("[关联通道]国标通道 平台:{}, 正在存入数据库", platformId);
log.info("[关联通道]国标通道 平台:{}, 正在存入数据库", platformId);
dataSourceTransactionManager.commit(transactionStatus);
}
@@ -119,7 +117,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
eventPublisher.catalogEventPublish(platformId, deviceChannelList, CatalogEvent.ADD);
}
}
logger.info("[关联通道]国标通道 平台:{}, 存入数据库成功", platformId);
log.info("[关联通道]国标通道 平台:{}, 存入数据库成功", platformId);
}
return allCount;
}
@@ -137,7 +135,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
}
return deviceChannelList;
} else if (catalog == null && !catalogId.equals(platform.getDeviceGBId())) {
logger.warn("未查询到目录{}的信息", catalogId);
log.warn("未查询到目录{}的信息", catalogId);
return null;
}
for (ChannelReduce channelReduce : channelReduces) {

View File

@@ -30,8 +30,7 @@ import com.genersoft.iot.vmp.utils.DateUtil;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import gov.nist.javax.sip.message.SIPResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
@@ -49,6 +48,7 @@ import java.util.Vector;
/**
* @author lin
*/
@Slf4j
@Service
@DS("master")
public class PlatformServiceImpl implements IPlatformService {
@@ -58,8 +58,6 @@ public class PlatformServiceImpl implements IPlatformService {
private final static String REGISTER_FAIL_AGAIN_KEY_PREFIX = "platform_register_fail_again_";
private final static String KEEPALIVE_KEY_PREFIX = "platform_keepalive_";
private final static Logger logger = LoggerFactory.getLogger(PlatformServiceImpl.class);
@Autowired
private ParentPlatformMapper platformMapper;
@@ -117,7 +115,7 @@ public class PlatformServiceImpl implements IPlatformService {
sendRtpItem.getCallId(), sendRtpItem.getStream());
}
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 发送BYE: {}", e.getMessage());
log.error("[命令发送失败] 发送BYE: {}", e.getMessage());
}
}
}
@@ -139,7 +137,7 @@ public class PlatformServiceImpl implements IPlatformService {
try {
commanderForPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
log.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
}
redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
sendRtpItem.getCallId(), sendRtpItem.getStream());
@@ -184,10 +182,10 @@ public class PlatformServiceImpl implements IPlatformService {
// 注册成功时由程序直接调用了online方法
try {
commanderForPlatform.register(parentPlatform, eventResult -> {
logger.info("[国标级联] {},添加向上级注册失败,请确定上级平台可用时重新保存", parentPlatform.getServerGBId());
log.info("[国标级联] {},添加向上级注册失败,请确定上级平台可用时重新保存", parentPlatform.getServerGBId());
}, null);
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[命令发送失败] 国标级联: {}", e.getMessage());
log.error("[命令发送失败] 国标级联: {}", e.getMessage());
}
}
return result > 0;
@@ -195,7 +193,7 @@ public class PlatformServiceImpl implements IPlatformService {
@Override
public boolean update(ParentPlatform parentPlatform) {
logger.info("[国标级联]更新平台 {}", parentPlatform.getDeviceGBId());
log.info("[国标级联]更新平台 {}", parentPlatform.getDeviceGBId());
parentPlatform.setCharacterSet(parentPlatform.getCharacterSet().toUpperCase());
ParentPlatform parentPlatformOld = platformMapper.getParentPlatById(parentPlatform.getId());
ParentPlatformCatch parentPlatformCatchOld = redisCatchStorage.queryPlatformCatchInfo(parentPlatformOld.getServerGBId());
@@ -210,13 +208,13 @@ public class PlatformServiceImpl implements IPlatformService {
// 注销旧的
try {
if (parentPlatformOld.isStatus() && parentPlatformCatchOld != null) {
logger.info("保存平台{}时发现旧平台在线,发送注销命令", parentPlatformOld.getServerGBId());
log.info("保存平台{}时发现旧平台在线,发送注销命令", parentPlatformOld.getServerGBId());
commanderForPlatform.unregister(parentPlatformOld, parentPlatformCatchOld.getSipTransactionInfo(), null, eventResult -> {
logger.info("[国标级联] 注销成功, 平台:{}", parentPlatformOld.getServerGBId());
log.info("[国标级联] 注销成功, 平台:{}", parentPlatformOld.getServerGBId());
});
}
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[命令发送失败] 国标级联 注销: {}", e.getMessage());
log.error("[命令发送失败] 国标级联 注销: {}", e.getMessage());
}
// 更新数据库
@@ -239,12 +237,12 @@ public class PlatformServiceImpl implements IPlatformService {
// 保存时启用就发送注册
// 注册成功时由程序直接调用了online方法
try {
logger.info("[国标级联] 平台注册 {}", parentPlatform.getDeviceGBId());
log.info("[国标级联] 平台注册 {}", parentPlatform.getDeviceGBId());
commanderForPlatform.register(parentPlatform, eventResult -> {
logger.info("[国标级联] {},添加向上级注册失败,请确定上级平台可用时重新保存", parentPlatform.getServerGBId());
log.info("[国标级联] {},添加向上级注册失败,请确定上级平台可用时重新保存", parentPlatform.getServerGBId());
}, null);
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[命令发送失败] 国标级联: {}", e.getMessage());
log.error("[命令发送失败] 国标级联: {}", e.getMessage());
}
}
@@ -255,7 +253,7 @@ public class PlatformServiceImpl implements IPlatformService {
@Override
public void online(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo) {
logger.info("[国标级联]{}, 平台上线", parentPlatform.getServerGBId());
log.info("[国标级联]{}, 平台上线", parentPlatform.getServerGBId());
final String registerFailAgainTaskKey = REGISTER_FAIL_AGAIN_KEY_PREFIX + parentPlatform.getServerGBId();
dynamicTask.stop(registerFailAgainTaskKey);
@@ -275,7 +273,7 @@ public class PlatformServiceImpl implements IPlatformService {
final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
if (!dynamicTask.isAlive(registerTaskKey)) {
logger.info("[国标级联]{}, 添加定时注册任务", parentPlatform.getServerGBId());
log.info("[国标级联]{}, 添加定时注册任务", parentPlatform.getServerGBId());
// 添加注册任务
dynamicTask.startCron(registerTaskKey,
// 注册失败注册成功时由程序直接调用了online方法
@@ -286,7 +284,7 @@ public class PlatformServiceImpl implements IPlatformService {
final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId();
if (!dynamicTask.contains(keepaliveTaskKey)) {
logger.info("[国标级联]{}, 添加定时心跳任务", parentPlatform.getServerGBId());
log.info("[国标级联]{}, 添加定时心跳任务", parentPlatform.getServerGBId());
// 添加心跳任务
dynamicTask.startCron(keepaliveTaskKey,
()-> {
@@ -294,14 +292,14 @@ public class PlatformServiceImpl implements IPlatformService {
commanderForPlatform.keepalive(parentPlatform, eventResult -> {
// 心跳失败
if (eventResult.type != SipSubscribe.EventResultType.timeout) {
logger.warn("[国标级联]发送心跳收到错误code {}, msg: {}", eventResult.statusCode, eventResult.msg);
log.warn("[国标级联]发送心跳收到错误code {}, msg: {}", eventResult.statusCode, eventResult.msg);
}
// 心跳失败
ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
// 此时是第三次心跳超时, 平台离线
if (platformCatch.getKeepAliveReply() == 2) {
// 设置平台离线,并重新注册
logger.info("[国标级联] 三次心跳失败, 平台{}({})离线", parentPlatform.getName(), parentPlatform.getServerGBId());
log.info("[国标级联] 三次心跳失败, 平台{}({})离线", parentPlatform.getName(), parentPlatform.getServerGBId());
offline(parentPlatform, false);
}else {
platformCatch.setKeepAliveReply(platformCatch.getKeepAliveReply() + 1);
@@ -316,17 +314,17 @@ public class PlatformServiceImpl implements IPlatformService {
platformCatch.setKeepAliveReply(0);
redisCatchStorage.updatePlatformCatchInfo(platformCatch);
}
logger.info("[发送心跳] 国标级联 发送心跳, code {}, msg: {}", eventResult.statusCode, eventResult.msg);
log.info("[发送心跳] 国标级联 发送心跳, code {}, msg: {}", eventResult.statusCode, eventResult.msg);
});
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 发送心跳: {}", e.getMessage());
log.error("[命令发送失败] 国标级联 发送心跳: {}", e.getMessage());
}
},
(parentPlatform.getKeepTimeout())*1000);
}
if (parentPlatform.isAutoPushChannel()) {
if (subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId()) == null) {
logger.info("[国标级联]{}, 添加自动通道推送模拟订阅信息", parentPlatform.getServerGBId());
log.info("[国标级联]{}, 添加自动通道推送模拟订阅信息", parentPlatform.getServerGBId());
addSimulatedSubscribeInfo(parentPlatform);
}
}else {
@@ -360,24 +358,24 @@ public class PlatformServiceImpl implements IPlatformService {
}
if (sipTransactionInfo == null) {
logger.info("[国标级联] 平台:{}注册即将到期,开始重新注册", parentPlatform.getServerGBId());
log.info("[国标级联] 平台:{}注册即将到期,开始重新注册", parentPlatform.getServerGBId());
}else {
logger.info("[国标级联] 平台:{}注册即将到期,开始续订", parentPlatform.getServerGBId());
log.info("[国标级联] 平台:{}注册即将到期,开始续订", parentPlatform.getServerGBId());
}
commanderForPlatform.register(parentPlatform, sipTransactionInfo, eventResult -> {
logger.info("[国标级联] 平台:{}注册失败,{}:{}", parentPlatform.getServerGBId(),
log.info("[国标级联] 平台:{}注册失败,{}:{}", parentPlatform.getServerGBId(),
eventResult.statusCode, eventResult.msg);
offline(parentPlatform, false);
}, null);
} catch (Exception e) {
logger.error("[命令发送失败] 国标级联定时注册: {}", e.getMessage());
log.error("[命令发送失败] 国标级联定时注册: {}", e.getMessage());
}
}
@Override
public void offline(ParentPlatform parentPlatform, boolean stopRegister) {
logger.info("[平台离线]{}", parentPlatform.getServerGBId());
log.info("[平台离线]{}", parentPlatform.getServerGBId());
ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
parentPlatformCatch.setKeepAliveReply(0);
parentPlatformCatch.setRegisterAliveReply(0);
@@ -388,17 +386,17 @@ public class PlatformServiceImpl implements IPlatformService {
platformMapper.updateParentPlatformStatus(parentPlatform.getServerGBId(), false);
// 停止所有推流
logger.info("[平台离线] {}, 停止所有推流", parentPlatform.getServerGBId());
log.info("[平台离线] {}, 停止所有推流", parentPlatform.getServerGBId());
stopAllPush(parentPlatform.getServerGBId());
// 清除注册定时
logger.info("[平台离线] {}, 停止定时注册任务", parentPlatform.getServerGBId());
log.info("[平台离线] {}, 停止定时注册任务", parentPlatform.getServerGBId());
final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
if (dynamicTask.contains(registerTaskKey)) {
dynamicTask.stop(registerTaskKey);
}
// 清除心跳定时
logger.info("[平台离线] {}, 停止定时发送心跳任务", parentPlatform.getServerGBId());
log.info("[平台离线] {}, 停止定时发送心跳任务", parentPlatform.getServerGBId());
final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId();
if (dynamicTask.contains(keepaliveTaskKey)) {
// 清除心跳任务
@@ -408,11 +406,11 @@ public class PlatformServiceImpl implements IPlatformService {
SubscribeInfo catalogSubscribe = subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId());
if (catalogSubscribe != null) {
if (catalogSubscribe.getExpires() > 0) {
logger.info("[平台离线] {}, 停止目录订阅回复", parentPlatform.getServerGBId());
log.info("[平台离线] {}, 停止目录订阅回复", parentPlatform.getServerGBId());
subscribeHolder.removeCatalogSubscribe(parentPlatform.getServerGBId());
}
}
logger.info("[平台离线] {}, 停止移动位置订阅回复", parentPlatform.getServerGBId());
log.info("[平台离线] {}, 停止移动位置订阅回复", parentPlatform.getServerGBId());
subscribeHolder.removeMobilePositionSubscribe(parentPlatform.getServerGBId());
// 发起定时自动重新注册
if (!stopRegister) {
@@ -444,15 +442,15 @@ public class PlatformServiceImpl implements IPlatformService {
final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
try {
commanderForPlatform.register(parentPlatform, eventResult1 -> {
logger.info("[国标级联] {}开始定时发起注册间隔为1分钟", parentPlatform.getServerGBId());
log.info("[国标级联] {}开始定时发起注册间隔为1分钟", parentPlatform.getServerGBId());
// 添加注册任务
dynamicTask.startCron(registerTaskKey,
// 注册失败注册成功时由程序直接调用了online方法
()->logger.info("[国标级联] {},平台离线后持续发起注册,失败", parentPlatform.getServerGBId()),
()-> log.info("[国标级联] {},平台离线后持续发起注册,失败", parentPlatform.getServerGBId()),
60*1000);
}, null);
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[命令发送失败] 国标级联注册: {}", e.getMessage());
log.error("[命令发送失败] 国标级联注册: {}", e.getMessage());
}
}
@@ -484,7 +482,7 @@ public class PlatformServiceImpl implements IPlatformService {
commanderForPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, subscribe);
} catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException |
IllegalAccessException e) {
logger.error("[命令发送失败] 国标级联 移动位置通知: {}", e.getMessage());
log.error("[命令发送失败] 国标级联 移动位置通知: {}", e.getMessage());
}
}
}
@@ -496,7 +494,7 @@ public class PlatformServiceImpl implements IPlatformService {
SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback) throws InvalidArgumentException, ParseException, SipException {
if (mediaServerItem == null) {
logger.info("[国标级联] 语音喊话未找到可用的zlm. platform: {}", platform.getServerGBId());
log.info("[国标级联] 语音喊话未找到可用的zlm. platform: {}", platform.getServerGBId());
return;
}
InviteInfo inviteInfoForOld = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, platform.getServerGBId(), channelId);
@@ -537,7 +535,7 @@ public class PlatformServiceImpl implements IPlatformService {
}
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, ssrcCheck, false, null, true, false, false, tcpMode);
if (ssrcInfo == null || ssrcInfo.getPort() < 0) {
logger.info("[国标级联] 发起语音喊话 开启端口监听失败, platform: {}, channel {}", platform.getServerGBId(), channelId);
log.info("[国标级联] 发起语音喊话 开启端口监听失败, platform: {}, channel {}", platform.getServerGBId(), channelId);
SipSubscribe.EventResult<Object> eventResult = new SipSubscribe.EventResult<>();
eventResult.statusCode = -1;
eventResult.msg = "端口监听失败";
@@ -545,7 +543,7 @@ public class PlatformServiceImpl implements IPlatformService {
errorEvent.response(eventResult);
return;
}
logger.info("[国标级联] 语音喊话发起Invite消息 deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验{}",
log.info("[国标级联] 语音喊话发起Invite消息 deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验{}",
platform.getServerGBId(), channelId, ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), ssrcInfo.getSsrc(), ssrcCheck);
// 初始化redis中的invite消息状态
@@ -558,12 +556,12 @@ public class PlatformServiceImpl implements IPlatformService {
// 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况
InviteInfo inviteInfoForBroadcast = inviteStreamService.getInviteInfo(InviteSessionType.BROADCAST, platform.getServerGBId(), channelId, null);
if (inviteInfoForBroadcast == null) {
logger.info("[国标级联] 发起语音喊话 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", platform.getServerGBId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc());
log.info("[国标级联] 发起语音喊话 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", platform.getServerGBId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc());
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
try {
commanderForPlatform.streamByeCmd(platform, channelId, ssrcInfo.getStream(), null, null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
logger.error("[点播超时] 发送BYE失败 {}", e.getMessage());
log.error("[点播超时] 发送BYE失败 {}", e.getMessage());
} finally {
timeoutCallback.run(1, "收流超时");
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
@@ -574,7 +572,7 @@ public class PlatformServiceImpl implements IPlatformService {
}
}, userSetting.getPlayTimeout());
commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (hookData)->{
logger.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channelId);
log.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channelId);
dynamicTask.stop(timeOutTaskKey);
// hook响应
playService.onPublishHandlerForPlay(hookData.getMediaServer(), hookData.getMediaInfo(), platform.getServerGBId(), channelId);
@@ -656,27 +654,27 @@ public class PlatformServiceImpl implements IPlatformService {
}else {
// 单端口
if (tcpMode == 2) {
logger.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流");
log.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流");
}
}
}else {
logger.info("[Invite 200OK] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
log.info("[Invite 200OK] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
// ssrc 不一致
if (mediaServerItem.isRtpEnable()) {
// 多端口
if (ssrcCheck) {
// ssrc检验
// 更新ssrc
logger.info("[Invite 200OK] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
log.info("[Invite 200OK] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
if (!result) {
try {
logger.warn("[Invite 200OK] 更新ssrc失败停止喊话 {}/{}", platform.getServerGBId(), channelId);
log.warn("[Invite 200OK] 更新ssrc失败停止喊话 {}/{}", platform.getServerGBId(), channelId);
commanderForPlatform.streamByeCmd(platform, channelId, ssrcInfo.getStream(), null, null);
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
logger.error("[命令发送失败] 停止播放, 发送BYE: {}", e.getMessage());
log.error("[命令发送失败] 停止播放, 发送BYE: {}", e.getMessage());
}
dynamicTask.stop(timeOutTaskKey);
@@ -700,7 +698,7 @@ public class PlatformServiceImpl implements IPlatformService {
tcpActiveHandler(platform, channelId, contentString, mediaServerItem, tcpMode, ssrcCheck,
timeOutTaskKey, ssrcInfo, callback);
}else {
logger.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流");
log.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流");
}
}
inviteStreamService.updateInviteInfo(inviteInfo);
@@ -714,7 +712,7 @@ public class PlatformServiceImpl implements IPlatformService {
tcpActiveHandler(platform, channelId, contentString, mediaServerItem, tcpMode, ssrcCheck,
timeOutTaskKey, ssrcInfo, callback);
}else {
logger.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流");
log.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流");
}
}
inviteStreamService.updateInviteInfo(inviteInfo);
@@ -763,12 +761,12 @@ public class PlatformServiceImpl implements IPlatformService {
break;
}
}
logger.info("[TCP主动连接对方] serverGbId: {}, channelId: {}, 连接对方的地址:{}:{}, SSRC: {}, SSRC校验{}",
log.info("[TCP主动连接对方] serverGbId: {}, channelId: {}, 连接对方的地址:{}:{}, SSRC: {}, SSRC校验{}",
platform.getServerGBId(), channelId, sdp.getConnection().getAddress(), port, ssrcInfo.getSsrc(), ssrcCheck);
Boolean result = mediaServerService.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
logger.info("[TCP主动连接对方] 结果: {}", result);
log.info("[TCP主动连接对方] 结果: {}", result);
} catch (SdpException e) {
logger.error("[TCP主动连接对方] serverGbId: {}, channelId: {}, 解析200OK的SDP信息失败", platform.getServerGBId(), channelId, e);
log.error("[TCP主动连接对方] serverGbId: {}, channelId: {}, 解析200OK的SDP信息失败", platform.getServerGBId(), channelId, e);
dynamicTask.stop(timeOutTaskKey);
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
// 释放ssrc
@@ -792,7 +790,7 @@ public class PlatformServiceImpl implements IPlatformService {
commanderForPlatform.streamByeCmd(platform, channel.getDeviceId(), stream, null, null);
}
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
logger.warn("[消息发送失败] 停止语音对讲, 平台:{},通道:{}", platform.getId(), channel.getDeviceId() );
log.warn("[消息发送失败] 停止语音对讲, 平台:{},通道:{}", platform.getId(), channel.getDeviceId() );
} finally {
mediaServerService.closeRTPServer(mediaServerItem, stream);
InviteInfo inviteInfo = inviteStreamService.getInviteInfo(null, platform.getServerGBId(), channel.getDeviceId(), stream);

View File

@@ -37,6 +37,7 @@ import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent;
import gov.nist.javax.sip.message.SIPResponse;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -57,12 +58,11 @@ import java.text.ParseException;
import java.util.*;
@SuppressWarnings(value = {"rawtypes", "unchecked"})
@Slf4j
@Service
@DS("master")
public class PlayServiceImpl implements IPlayService {
private final static Logger logger = LoggerFactory.getLogger(PlayServiceImpl.class);
@Autowired
private IVideoManagerStorage storager;
@@ -125,7 +125,7 @@ public class PlayServiceImpl implements IPlayService {
String channelId = streamArray[1];
Device device = deviceService.getDevice(deviceId);
if (device == null) {
logger.info("[语音对讲/喊话] 未找到设备:{}", deviceId);
log.info("[语音对讲/喊话] 未找到设备:{}", deviceId);
return;
}
if ("broadcast".equals(event.getApp())) {
@@ -136,15 +136,15 @@ public class PlayServiceImpl implements IPlayService {
try {
audioBroadcastCmd(device, channelId, event.getMediaServer(),
event.getApp(), event.getStream(), 60, false, (msg) -> {
logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId);
log.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId);
});
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[命令发送失败] 语音对讲: {}", e.getMessage());
log.error("[命令发送失败] 语音对讲: {}", e.getMessage());
}
}else if ("talk".equals(event.getApp())) {
// 开启语音对讲通道
talkCmd(device, channelId, event.getMediaServer(), event.getStream(), (msg) -> {
logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId);
log.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId);
});
}
}
@@ -174,14 +174,14 @@ public class PlayServiceImpl implements IPlayService {
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
if (audioBroadcastCatch != null) {
// 来自上级平台的停止对讲
logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
log.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
}
}
}
} catch (SipException | InvalidArgumentException | ParseException |
SsrcTransactionNotFoundException e) {
logger.error("[命令发送失败] 发送BYE: {}", e.getMessage());
log.error("[命令发送失败] 发送BYE: {}", e.getMessage());
}
}
}
@@ -195,7 +195,7 @@ public class PlayServiceImpl implements IPlayService {
String channelId = streamArray[1];
Device device = deviceService.getDevice(deviceId);
if (device == null) {
logger.info("[语音对讲/喊话] 未找到设备:{}", deviceId);
log.info("[语音对讲/喊话] 未找到设备:{}", deviceId);
return;
}
if ("broadcast".equals(event.getApp())) {
@@ -232,7 +232,7 @@ public class PlayServiceImpl implements IPlayService {
return;
}
if (s.length == 2) {
logger.info("[ZLM HOOK] 预览流未找到, 发起自动点播:{}->{}->{}/{}", event.getMediaServer().getId(), event.getSchema(), event.getApp(), event.getStream());
log.info("[ZLM HOOK] 预览流未找到, 发起自动点播:{}->{}->{}/{}", event.getMediaServer().getId(), event.getSchema(), event.getApp(), event.getStream());
play(event.getMediaServer(), deviceId, channelId, null, null);
} else if (s.length == 4) {
// 此时为录像回放, 录像回放格式为> 设备ID_通道ID_开始时间_结束时间
@@ -243,7 +243,7 @@ public class PlayServiceImpl implements IPlayService {
}
String startTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(startTimeStr);
String endTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(endTimeStr);
logger.info("[ZLM HOOK] 回放流未找到, 发起自动点播:{}->{}->{}/{}-{}-{}",
log.info("[ZLM HOOK] 回放流未找到, 发起自动点播:{}->{}->{}/{}-{}-{}",
event.getMediaServer().getId(), event.getSchema(),
event.getApp(), event.getStream(),
startTime, endTime
@@ -260,17 +260,17 @@ public class PlayServiceImpl implements IPlayService {
@Override
public SSRCInfo play(MediaServer mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback<Object> callback) {
if (mediaServerItem == null) {
logger.warn("[点播] 未找到可用的zlm deviceId: {},channelId:{}", deviceId, channelId);
log.warn("[点播] 未找到可用的zlm deviceId: {},channelId:{}", deviceId, channelId);
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
}
Device device = redisCatchStorage.getDevice(deviceId);
if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && !mediaServerItem.isRtpEnable()) {
logger.warn("[点播] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId);
log.warn("[点播] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId);
throw new ControllerException(ErrorCode.ERROR100.getCode(), "单端口收流时不支持TCP主动方式收流");
}
DeviceChannel channel = channelService.getOne(deviceId, channelId);
if (channel == null) {
logger.warn("[点播] 未找到通道 deviceId: {},channelId:{}", deviceId, channelId);
log.warn("[点播] 未找到通道 deviceId: {},channelId:{}", deviceId, channelId);
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到通道");
}
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
@@ -280,7 +280,7 @@ public class PlayServiceImpl implements IPlayService {
ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc);
// 点播发起了但是尚未成功, 仅注册回调等待结果即可
inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
logger.info("[点播开始] 已经请求中,等待结果, deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
log.info("[点播开始] 已经请求中,等待结果, deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
return inviteInfo.getSsrcInfo();
}else {
StreamInfo streamInfo = inviteInfo.getStreamInfo();
@@ -302,7 +302,7 @@ public class PlayServiceImpl implements IPlayService {
InviteErrorCode.SUCCESS.getCode(),
InviteErrorCode.SUCCESS.getMsg(),
streamInfo);
logger.info("[点播已存在] 直接返回, deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
log.info("[点播已存在] 直接返回, deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
return inviteInfo.getSsrcInfo();
}else {
// 点播发起了但是尚未成功, 仅注册回调等待结果即可
@@ -359,24 +359,24 @@ public class PlayServiceImpl implements IPlayService {
int port = sendRtpPortManager.getNextPort(mediaServerItem);
//端口获取失败的ssrcInfo 没有必要发送点播指令
if (port <= 0) {
logger.info("[语音对讲] 端口分配异常deviceId={},channelId={}", device.getDeviceId(), channelId);
log.info("[语音对讲] 端口分配异常deviceId={},channelId={}", device.getDeviceId(), channelId);
audioEvent.call("端口分配异常");
return;
}
sendRtpItem.setLocalPort(port);
sendRtpItem.setPort(port);
logger.info("[语音对讲]开始 deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验{}", device.getDeviceId(), channelId, sendRtpItem.getLocalPort(), device.getStreamMode(), sendRtpItem.getSsrc(), false);
log.info("[语音对讲]开始 deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验{}", device.getDeviceId(), channelId, sendRtpItem.getLocalPort(), device.getStreamMode(), sendRtpItem.getSsrc(), false);
// 超时处理
String timeOutTaskKey = UUID.randomUUID().toString();
dynamicTask.startDelay(timeOutTaskKey, () -> {
logger.info("[语音对讲] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, sendRtpItem.getPort(), sendRtpItem.getSsrc());
log.info("[语音对讲] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, sendRtpItem.getPort(), sendRtpItem.getSsrc());
timeoutCallback.run();
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
try {
cmder.streamByeCmd(device, channelId, sendRtpItem.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
logger.error("[语音对讲]超时, 发送BYE失败 {}", e.getMessage());
log.error("[语音对讲]超时, 发送BYE失败 {}", e.getMessage());
} finally {
timeoutCallback.run();
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
@@ -388,7 +388,7 @@ public class PlayServiceImpl implements IPlayService {
mediaServerService.startSendRtpPassive(mediaServerItem, sendRtpItem, userSetting.getPlayTimeout() * 1000);
}catch (ControllerException e) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
logger.info("[语音对讲]失败 deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
log.info("[语音对讲]失败 deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
audioEvent.call("失败, " + e.getMessage());
// 查看是否已经建立了通道存在则发送bye
stopTalk(device, channelId);
@@ -398,11 +398,11 @@ public class PlayServiceImpl implements IPlayService {
// 查看设备是否已经在推流
try {
cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (hookData) -> {
logger.info("[语音对讲] 流已生成, 开始推流: " + hookData);
log.info("[语音对讲] 流已生成, 开始推流: " + hookData);
dynamicTask.stop(timeOutTaskKey);
// TODO 暂不做处理
}, (hookData) -> {
logger.info("[语音对讲] 设备开始推流: " + hookData);
log.info("[语音对讲] 设备开始推流: " + hookData);
dynamicTask.stop(timeOutTaskKey);
}, (event) -> {
@@ -421,10 +421,10 @@ public class PlayServiceImpl implements IPlayService {
sendRtpItem.getStream(), sendRtpItem.getSsrc(), sendRtpItem.getMediaServerId(),
response, InviteSessionType.TALK);
} else {
logger.error("[语音对讲]收到的消息错误response不是SIPResponse");
log.error("[语音对讲]收到的消息错误response不是SIPResponse");
}
} else {
logger.error("[语音对讲]收到的消息错误event不是ResponseEvent");
log.error("[语音对讲]收到的消息错误event不是ResponseEvent");
}
}, (event) -> {
@@ -437,7 +437,7 @@ public class PlayServiceImpl implements IPlayService {
});
} catch (InvalidArgumentException | SipException | ParseException e) {
logger.error("[命令发送失败] 对讲消息: {}", e.getMessage());
log.error("[命令发送失败] 对讲消息: {}", e.getMessage());
dynamicTask.stop(timeOutTaskKey);
mediaServerService.closeRTPServer(mediaServerItem, sendRtpItem.getStream());
// 释放ssrc
@@ -466,12 +466,12 @@ public class PlayServiceImpl implements IPlayService {
null);
return;
}
logger.info("[点播开始] deviceId: {}, channelId: {},码流类型:{}, 收流端口: {}, 码流:{}, 收流模式:{}, SSRC: {}, SSRC校验{}",
log.info("[点播开始] deviceId: {}, channelId: {},码流类型:{}, 收流端口: {}, 码流:{}, 收流模式:{}, SSRC: {}, SSRC校验{}",
device.getDeviceId(), channel.getDeviceId(), channel.getStreamIdentification(), ssrcInfo.getPort(), ssrcInfo.getStream(),
device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
//端口获取失败的ssrcInfo 没有必要发送点播指令
if (ssrcInfo.getPort() <= 0) {
logger.info("[点播端口分配异常]deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channel.getDeviceId(), ssrcInfo);
log.info("[点播端口分配异常]deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channel.getDeviceId(), ssrcInfo);
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channel.getDeviceId(), ssrcInfo.getStream());
@@ -493,7 +493,7 @@ public class PlayServiceImpl implements IPlayService {
// 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况
InviteInfo inviteInfoForTimeOut = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channel.getDeviceId());
if (inviteInfoForTimeOut == null || inviteInfoForTimeOut.getStreamInfo() == null) {
logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流:{},端口:{}, SSRC: {}",
log.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流:{},端口:{}, SSRC: {}",
device.getDeviceId(), channel.getDeviceId(), channel.getStreamIdentification(),
ssrcInfo.getPort(), ssrcInfo.getSsrc());
@@ -505,7 +505,7 @@ public class PlayServiceImpl implements IPlayService {
try {
cmder.streamByeCmd(device, channel.getDeviceId(), ssrcInfo.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
logger.error("[点播超时] 发送BYE失败 {}", e.getMessage());
log.error("[点播超时] 发送BYE失败 {}", e.getMessage());
} finally {
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
@@ -515,7 +515,7 @@ public class PlayServiceImpl implements IPlayService {
subscribe.removeSubscribe(Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcInfo.getStream(), mediaServerItem.getId()));
}
}else {
logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流:{},端口:{}, SSRC: {}",
log.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流:{},端口:{}, SSRC: {}",
device.getDeviceId(), channel.getDeviceId(), channel.getStreamIdentification(),
ssrcInfo.getPort(), ssrcInfo.getSsrc());
@@ -528,7 +528,7 @@ public class PlayServiceImpl implements IPlayService {
try {
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channel, (hookData ) -> {
logger.info("收到订阅消息: " + hookData);
log.info("收到订阅消息: " + hookData);
dynamicTask.stop(timeOutTaskKey);
// hook响应
StreamInfo streamInfo = onPublishHandlerForPlay(hookData.getMediaServer(), hookData.getMediaInfo(), device.getDeviceId(), channel.getDeviceId());
@@ -545,7 +545,7 @@ public class PlayServiceImpl implements IPlayService {
InviteErrorCode.SUCCESS.getCode(),
InviteErrorCode.SUCCESS.getMsg(),
streamInfo);
logger.info("[点播成功] deviceId: {}, channelId:{}, 码流类型:{}", device.getDeviceId(), channel.getDeviceId(),
log.info("[点播成功] deviceId: {}, channelId:{}, 码流类型:{}", device.getDeviceId(), channel.getDeviceId(),
channel.getStreamIdentification());
snapOnPlay(hookData.getMediaServer(), device.getDeviceId(), channel.getDeviceId(), ssrcInfo.getStream());
}, (eventResult) -> {
@@ -553,7 +553,7 @@ public class PlayServiceImpl implements IPlayService {
InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channel.getDeviceId(),
timeOutTaskKey, callback, inviteInfo, InviteSessionType.PLAY);
}, (event) -> {
logger.info("[点播失败] deviceId: {}, channelId:{}, {}: {}", device.getDeviceId(), channel.getDeviceId(), event.statusCode, event.msg);
log.info("[点播失败] deviceId: {}, channelId:{}, {}: {}", device.getDeviceId(), channel.getDeviceId(), event.statusCode, event.msg);
dynamicTask.stop(timeOutTaskKey);
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
// 释放ssrc
@@ -570,7 +570,7 @@ public class PlayServiceImpl implements IPlayService {
});
} catch (InvalidArgumentException | SipException | ParseException e) {
logger.error("[命令发送失败] 点播消息: {}", e.getMessage());
log.error("[命令发送失败] 点播消息: {}", e.getMessage());
dynamicTask.stop(timeOutTaskKey);
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
// 释放ssrc
@@ -615,9 +615,9 @@ public class PlayServiceImpl implements IPlayService {
break;
}
}
logger.info("[TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验{}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
log.info("[TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验{}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
Boolean result = mediaServerService.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
logger.info("[TCP主动连接对方] 结果: {}" , result);
log.info("[TCP主动连接对方] 结果: {}" , result);
if (!result) {
// 主动连接失败,结束流程, 清理数据
dynamicTask.stop(timeOutTaskKey);
@@ -634,7 +634,7 @@ public class PlayServiceImpl implements IPlayService {
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
}
} catch (SdpException e) {
logger.error("[TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e);
log.error("[TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e);
dynamicTask.stop(timeOutTaskKey);
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
// 释放ssrc
@@ -668,7 +668,7 @@ public class PlayServiceImpl implements IPlayService {
String path = "snap";
String fileName = deviceId + "_" + channelId + ".jpg";
// 请求截图
logger.info("[请求截图]: " + fileName);
log.info("[请求截图]: " + fileName);
mediaServerService.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
}
@@ -727,7 +727,7 @@ public class PlayServiceImpl implements IPlayService {
mediaServerItem = mediaServerService.getOne(device.getMediaServerId());
}
if (mediaServerItem == null) {
logger.warn("点播时未找到可使用的ZLM...");
log.warn("点播时未找到可使用的ZLM...");
}
return mediaServerItem;
}
@@ -737,19 +737,19 @@ public class PlayServiceImpl implements IPlayService {
String endTime, ErrorCallback<Object> callback) {
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
logger.warn("[录像回放] 未找到设备 deviceId: {},channelId:{}", deviceId, channelId);
log.warn("[录像回放] 未找到设备 deviceId: {},channelId:{}", deviceId, channelId);
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到设备:" + deviceId);
}
DeviceChannel channel = channelService.getOne(deviceId, channelId);
if (channel == null) {
logger.warn("[录像回放] 未找到通道 deviceId: {},channelId:{}", deviceId, channelId);
log.warn("[录像回放] 未找到通道 deviceId: {},channelId:{}", deviceId, channelId);
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到通道:" + channelId);
}
MediaServer newMediaServerItem = getNewMediaServerItem(device);
if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && ! newMediaServerItem.isRtpEnable()) {
logger.warn("[录像回放] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId);
log.warn("[录像回放] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId);
throw new ControllerException(ErrorCode.ERROR100.getCode(), "单端口收流时不支持TCP主动方式收流");
}
String startTimeStr = startTime.replace("-", "")
@@ -779,7 +779,7 @@ public class PlayServiceImpl implements IPlayService {
if (device == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备: " + deviceId + "不存在");
}
logger.info("[录像回放] deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}, 收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验{}",
log.info("[录像回放] deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}, 收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验{}",
device.getDeviceId(), channelId, startTime, endTime, ssrcInfo.getPort(), device.getStreamMode(),
ssrcInfo.getSsrc(), device.isSsrcCheck());
// 初始化redis中的invite消息状态
@@ -789,14 +789,14 @@ public class PlayServiceImpl implements IPlayService {
inviteStreamService.updateInviteInfo(inviteInfo);
String playBackTimeOutTaskKey = UUID.randomUUID().toString();
dynamicTask.startDelay(playBackTimeOutTaskKey, () -> {
logger.warn("[录像回放] 超时deviceId{} channelId{}", deviceId, channelId);
log.warn("[录像回放] 超时deviceId{} channelId{}", deviceId, channelId);
inviteStreamService.removeInviteInfo(inviteInfo);
callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
try {
cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[录像回放] 超时 发送BYE失败 {}", e.getMessage());
log.error("[录像回放] 超时 发送BYE失败 {}", e.getMessage());
} catch (SsrcTransactionNotFoundException e) {
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
@@ -806,7 +806,7 @@ public class PlayServiceImpl implements IPlayService {
}, userSetting.getPlayTimeout());
SipSubscribe.Event errorEvent = event -> {
logger.info("[录像回放] 失败,{} {}", event.statusCode, event.msg);
log.info("[录像回放] 失败,{} {}", event.statusCode, event.msg);
dynamicTask.stop(playBackTimeOutTaskKey);
callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(),
String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg), null);
@@ -817,17 +817,17 @@ public class PlayServiceImpl implements IPlayService {
};
HookSubscribe.Event hookEvent = (hookData) -> {
logger.info("收到回放订阅消息: " + hookData);
log.info("收到回放订阅消息: " + hookData);
dynamicTask.stop(playBackTimeOutTaskKey);
StreamInfo streamInfo = onPublishHandlerForPlayback(hookData.getMediaServer(), hookData.getMediaInfo(), deviceId, channelId, startTime, endTime);
if (streamInfo == null) {
logger.warn("设备回放API调用失败");
log.warn("设备回放API调用失败");
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
return;
}
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
logger.info("[录像回放] 成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channelId, startTime, endTime);
log.info("[录像回放] 成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channelId, startTime, endTime);
};
try {
@@ -838,7 +838,7 @@ public class PlayServiceImpl implements IPlayService {
playBackTimeOutTaskKey, callback, inviteInfo, InviteSessionType.PLAYBACK);
}, errorEvent);
} catch (InvalidArgumentException | SipException | ParseException e) {
logger.error("[命令发送失败] 录像回放: {}", e.getMessage());
log.error("[命令发送失败] 录像回放: {}", e.getMessage());
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult();
eventResult.type = SipSubscribe.EventResultType.cmdSendFailEvent;
@@ -870,28 +870,28 @@ public class PlayServiceImpl implements IPlayService {
}else {
// 单端口
if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
logger.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流");
log.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流");
}
}
}else {
logger.info("[Invite 200OK] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
log.info("[Invite 200OK] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
// ssrc 不一致
if (mediaServerItem.isRtpEnable()) {
// 多端口
if (device.isSsrcCheck()) {
// ssrc检验
// 更新ssrc
logger.info("[Invite 200OK] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
log.info("[Invite 200OK] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
if (!result) {
try {
logger.warn("[Invite 200OK] 更新ssrc失败停止点播 {}/{}", device.getDeviceId(), channelId);
log.warn("[Invite 200OK] 更新ssrc失败停止点播 {}/{}", device.getDeviceId(), channelId);
cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null);
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
logger.error("[命令发送失败] 停止播放, 发送BYE: {}", e.getMessage());
log.error("[命令发送失败] 停止播放, 发送BYE: {}", e.getMessage());
}
dynamicTask.stop(timeOutTaskKey);
@@ -914,7 +914,7 @@ public class PlayServiceImpl implements IPlayService {
if (mediaServerItem.isRtpEnable()) {
tcpActiveHandler(device, channelId, contentString, mediaServerItem, timeOutTaskKey, ssrcInfo, callback);
}else {
logger.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流");
log.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流");
}
}
inviteStreamService.updateInviteInfo(inviteInfo);
@@ -978,7 +978,7 @@ public class PlayServiceImpl implements IPlayService {
null);
return;
}
logger.info("[录像下载] deviceId: {}, channelId: {}, 下载速度:{}, 收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验{}", device.getDeviceId(), channelId, downloadSpeed, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
log.info("[录像下载] deviceId: {}, channelId: {}, 下载速度:{}, 收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验{}", device.getDeviceId(), channelId, downloadSpeed, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
// 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.DOWNLOAD,
@@ -986,7 +986,7 @@ public class PlayServiceImpl implements IPlayService {
inviteStreamService.updateInviteInfo(inviteInfo);
String downLoadTimeOutTaskKey = UUID.randomUUID().toString();
dynamicTask.startDelay(downLoadTimeOutTaskKey, () -> {
logger.warn(String.format("录像下载请求超时deviceId%s channelId%s", deviceId, channelId));
log.warn(String.format("录像下载请求超时deviceId%s channelId%s", deviceId, channelId));
inviteStreamService.removeInviteInfo(inviteInfo);
callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
@@ -995,7 +995,7 @@ public class PlayServiceImpl implements IPlayService {
try {
cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[录像流]录像下载请求超时, 发送BYE失败 {}", e.getMessage());
log.error("[录像流]录像下载请求超时, 发送BYE失败 {}", e.getMessage());
} catch (SsrcTransactionNotFoundException e) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
@@ -1012,17 +1012,17 @@ public class PlayServiceImpl implements IPlayService {
inviteStreamService.removeInviteInfo(inviteInfo);
};
HookSubscribe.Event hookEvent = (hookData) -> {
logger.info("[录像下载]收到订阅消息: " + hookData);
log.info("[录像下载]收到订阅消息: " + hookData);
dynamicTask.stop(downLoadTimeOutTaskKey);
StreamInfo streamInfo = onPublishHandlerForDownload(hookData.getMediaServer(), hookData.getMediaInfo(), deviceId, channelId, startTime, endTime);
if (streamInfo == null) {
logger.warn("[录像下载] 获取流地址信息失败");
log.warn("[录像下载] 获取流地址信息失败");
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
return;
}
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
logger.info("[录像下载] 调用成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channelId, startTime, endTime);
log.info("[录像下载] 调用成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channelId, startTime, endTime);
};
try {
cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed,
@@ -1033,9 +1033,9 @@ public class PlayServiceImpl implements IPlayService {
// 注册录像回调事件,录像下载结束后写入下载地址
HookSubscribe.Event hookEventForRecord = (hookData) -> {
logger.info("[录像下载] 收到录像写入磁盘消息: {}/{}-{}",
log.info("[录像下载] 收到录像写入磁盘消息: {}/{}-{}",
inviteInfo.getDeviceId(), inviteInfo.getChannelId(), ssrcInfo.getStream());
logger.info("[录像下载] 收到录像写入磁盘消息内容: " + hookData);
log.info("[录像下载] 收到录像写入磁盘消息内容: " + hookData);
RecordInfo recordInfo = hookData.getRecordInfo();
String filePath = recordInfo.getFilePath();
DownloadFileInfo downloadFileInfo = CloudRecordUtils.getDownloadFilePath(mediaServerItem, filePath);
@@ -1051,7 +1051,7 @@ public class PlayServiceImpl implements IPlayService {
subscribe.addSubscribe(hook, hookEventForRecord);
});
} catch (InvalidArgumentException | SipException | ParseException e) {
logger.error("[命令发送失败] 录像下载: {}", e.getMessage());
log.error("[命令发送失败] 录像下载: {}", e.getMessage());
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult();
eventResult.type = SipSubscribe.EventResultType.cmdSendFailEvent;
@@ -1065,7 +1065,7 @@ public class PlayServiceImpl implements IPlayService {
public StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream) {
InviteInfo inviteInfo = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, stream);
if (inviteInfo == null || inviteInfo.getStreamInfo() == null) {
logger.warn("[获取下载进度] 未查询到录像下载的信息");
log.warn("[获取下载进度] 未查询到录像下载的信息");
return null;
}
@@ -1077,13 +1077,13 @@ public class PlayServiceImpl implements IPlayService {
String mediaServerId = inviteInfo.getStreamInfo().getMediaServerId();
MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem == null) {
logger.warn("[获取下载进度] 查询录像信息时发现节点不存在");
log.warn("[获取下载进度] 查询录像信息时发现节点不存在");
return null;
}
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, null, stream);
if (ssrcTransaction == null) {
logger.warn("[获取下载进度] 下载已结束");
log.warn("[获取下载进度] 下载已结束");
return null;
}
String app = "rtp";
@@ -1118,7 +1118,7 @@ public class PlayServiceImpl implements IPlayService {
streamInfo.setEndTime(endTime);
InviteInfo inviteInfo = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, streamInfo.getStream());
if (inviteInfo != null) {
logger.info("[录像下载] 更新invite消息中的stream信息");
log.info("[录像下载] 更新invite消息中的stream信息");
inviteInfo.setStatus(InviteSessionStatus.ok);
inviteInfo.setStreamInfo(streamInfo);
inviteStreamService.updateInviteInfo(inviteInfo);
@@ -1147,7 +1147,7 @@ public class PlayServiceImpl implements IPlayService {
try {
sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
log.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
}
}
}
@@ -1166,7 +1166,7 @@ public class PlayServiceImpl implements IPlayService {
ssrcTransaction.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e) {
logger.error("[zlm离线]为正在使用此zlm的设备 发送BYE失败 {}", e.getMessage());
log.error("[zlm离线]为正在使用此zlm的设备 发送BYE失败 {}", e.getMessage());
}
}
}
@@ -1179,10 +1179,10 @@ public class PlayServiceImpl implements IPlayService {
if (device == null || channelId == null) {
return null;
}
logger.info("[语音喊话] device {}, channel: {}", device.getDeviceId(), channelId);
log.info("[语音喊话] device {}, channel: {}", device.getDeviceId(), channelId);
DeviceChannel deviceChannel = storager.queryChannel(device.getDeviceId(), channelId);
if (deviceChannel == null) {
logger.warn("开启语音广播的时候未找到通道: {}", channelId);
log.warn("开启语音广播的时候未找到通道: {}", channelId);
return null;
}
MediaServer mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
@@ -1204,10 +1204,10 @@ public class PlayServiceImpl implements IPlayService {
if (device == null || channelId == null) {
return false;
}
logger.info("[语音喊话] device {}, channel: {}", device.getDeviceId(), channelId);
log.info("[语音喊话] device {}, channel: {}", device.getDeviceId(), channelId);
DeviceChannel deviceChannel = storager.queryChannel(device.getDeviceId(), channelId);
if (deviceChannel == null) {
logger.warn("开启语音广播的时候未找到通道: {}", channelId);
log.warn("开启语音广播的时候未找到通道: {}", channelId);
event.call("开启语音广播的时候未找到通道");
return false;
}
@@ -1218,7 +1218,7 @@ public class PlayServiceImpl implements IPlayService {
// 查询流是否存在,不存在则认为是异常状态
Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
if (streamReady) {
logger.warn("语音广播已经开启: {}", channelId);
log.warn("语音广播已经开启: {}", channelId);
event.call("语音广播已经开启");
return false;
} else {
@@ -1238,12 +1238,12 @@ public class PlayServiceImpl implements IPlayService {
key += audioBroadcastCatch.getChannelId();
}
dynamicTask.startDelay(key, ()->{
logger.info("[语音广播]等待invite消息超时{}/{}", device.getDeviceId(), channelId);
log.info("[语音广播]等待invite消息超时{}/{}", device.getDeviceId(), channelId);
stopAudioBroadcast(device.getDeviceId(), channelId);
}, 10*1000);
}, eventResultForError -> {
// 发送失败
logger.error("语音广播发送失败: {}:{}", channelId, eventResultForError.msg);
log.error("语音广播发送失败: {}:{}", channelId, eventResultForError.msg);
event.call("语音广播发送失败");
stopAudioBroadcast(device.getDeviceId(), channelId);
});
@@ -1259,7 +1259,7 @@ public class PlayServiceImpl implements IPlayService {
MediaServer mediaServerServiceOne = mediaServerService.getOne(sendRtpItem.getMediaServerId());
Boolean streamReady = mediaServerService.isStreamReady(mediaServerServiceOne, sendRtpItem.getApp(), sendRtpItem.getStream());
if (streamReady) {
logger.warn("语音广播通道使用中: {}", channelId);
log.warn("语音广播通道使用中: {}", channelId);
return true;
}
}
@@ -1270,7 +1270,7 @@ public class PlayServiceImpl implements IPlayService {
@Override
public void stopAudioBroadcast(String deviceId, String channelId) {
logger.info("[停止对讲] 设备:{}, 通道:{}", deviceId, channelId);
log.info("[停止对讲] 设备:{}, 通道:{}", deviceId, channelId);
List<AudioBroadcastCatch> audioBroadcastCatchList = new ArrayList<>();
if (channelId == null) {
audioBroadcastCatchList.addAll(audioBroadcastManager.get(deviceId));
@@ -1292,7 +1292,7 @@ public class PlayServiceImpl implements IPlayService {
cmder.streamByeCmdForDeviceInvite(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
} catch (InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e) {
logger.error("[消息发送失败] 发送语音喊话BYE失败");
log.error("[消息发送失败] 发送语音喊话BYE失败");
}
}
@@ -1358,14 +1358,14 @@ public class PlayServiceImpl implements IPlayService {
public void pauseRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId);
if (null == inviteInfo || inviteInfo.getStreamInfo() == null) {
logger.warn("streamId不存在!");
log.warn("streamId不存在!");
throw new ServiceException("streamId不存在");
}
inviteInfo.getStreamInfo().setPause(true);
inviteStreamService.updateInviteInfo(inviteInfo);
MediaServer mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
if (null == mediaServerItem) {
logger.warn("mediaServer 不存在!");
log.warn("mediaServer 不存在!");
throw new ServiceException("mediaServer不存在");
}
// zlm 暂停RTP超时检查
@@ -1386,14 +1386,14 @@ public class PlayServiceImpl implements IPlayService {
public void resumeRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId);
if (null == inviteInfo || inviteInfo.getStreamInfo() == null) {
logger.warn("streamId不存在!");
log.warn("streamId不存在!");
throw new ServiceException("streamId不存在");
}
inviteInfo.getStreamInfo().setPause(false);
inviteStreamService.updateInviteInfo(inviteInfo);
MediaServer mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
if (null == mediaServerItem) {
logger.warn("mediaServer 不存在!");
log.warn("mediaServer 不存在!");
throw new ServiceException("mediaServer不存在");
}
// zlm 暂停RTP超时检查
@@ -1424,12 +1424,12 @@ public class PlayServiceImpl implements IPlayService {
}
redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, platform);
}catch (ControllerException e) {
logger.error("RTP推流失败: {}", e.getMessage());
log.error("RTP推流失败: {}", e.getMessage());
startSendRtpStreamFailHand(sendRtpItem, platform, callIdHeader);
return;
}
logger.info("RTP推流成功[ {}/{} ]{}, ", sendRtpItem.getApp(), sendRtpItem.getStream(),
log.info("RTP推流成功[ {}/{} ]{}, ", sendRtpItem.getApp(), sendRtpItem.getStream(),
sendRtpItem.isTcpActive()?"被动发流": sendRtpItem.getIp() + ":" + sendRtpItem.getPort());
}
@@ -1445,7 +1445,7 @@ public class PlayServiceImpl implements IPlayService {
cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
} catch (SipException | ParseException | InvalidArgumentException |
SsrcTransactionNotFoundException exception) {
logger.error("[命令发送失败] 停止语音对讲: {}", exception.getMessage());
log.error("[命令发送失败] 停止语音对讲: {}", exception.getMessage());
}
}
} else {
@@ -1454,7 +1454,7 @@ public class PlayServiceImpl implements IPlayService {
try {
commanderForPlatform.streamByeCmd(platform, callIdHeader.getCallId());
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
log.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
}
}
@@ -1467,10 +1467,10 @@ public class PlayServiceImpl implements IPlayService {
return;
}
// TODO 必须多端口模式才支持语音喊话鹤语音对讲
logger.info("[语音对讲] device {}, channel: {}", device.getDeviceId(), channelId);
log.info("[语音对讲] device {}, channel: {}", device.getDeviceId(), channelId);
DeviceChannel deviceChannel = storager.queryChannel(device.getDeviceId(), channelId);
if (deviceChannel == null) {
logger.warn("开启语音对讲的时候未找到通道: {}", channelId);
log.warn("开启语音对讲的时候未找到通道: {}", channelId);
event.call("开启语音对讲的时候未找到通道");
return;
}
@@ -1482,7 +1482,7 @@ public class PlayServiceImpl implements IPlayService {
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
Boolean streamReady = mediaServerService.isStreamReady(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream());
if (streamReady) {
logger.warn("[语音对讲] 正在语音广播,无法开启语音通话: {}", channelId);
log.warn("[语音对讲] 正在语音广播,无法开启语音通话: {}", channelId);
event.call("正在语音广播");
return;
} else {
@@ -1496,7 +1496,7 @@ public class PlayServiceImpl implements IPlayService {
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
Boolean streamReady = mediaServerService.isStreamReady(mediaServer, "rtp", sendRtpItem.getReceiveStream());
if (streamReady) {
logger.warn("[语音对讲] 进行中: {}", channelId);
log.warn("[语音对讲] 进行中: {}", channelId);
event.call("语音对讲进行中");
return;
} else {
@@ -1505,16 +1505,16 @@ public class PlayServiceImpl implements IPlayService {
}
talk(mediaServerItem, device, channelId, stream, (hookData) -> {
logger.info("[语音对讲] 收到设备发来的流");
log.info("[语音对讲] 收到设备发来的流");
}, eventResult -> {
logger.warn("[语音对讲] 失败,{}/{}, 错误码 {} {}", device.getDeviceId(), channelId, eventResult.statusCode, eventResult.msg);
log.warn("[语音对讲] 失败,{}/{}, 错误码 {} {}", device.getDeviceId(), channelId, eventResult.statusCode, eventResult.msg);
event.call("失败,错误码 " + eventResult.statusCode + ", " + eventResult.msg);
}, () -> {
logger.warn("[语音对讲] 失败,{}/{} 超时", device.getDeviceId(), channelId);
log.warn("[语音对讲] 失败,{}/{} 超时", device.getDeviceId(), channelId);
event.call("失败,超时 ");
stopTalk(device, channelId);
}, errorMsg -> {
logger.warn("[语音对讲] 失败,{}/{} {}", device.getDeviceId(), channelId, errorMsg);
log.warn("[语音对讲] 失败,{}/{} {}", device.getDeviceId(), channelId, errorMsg);
event.call(errorMsg);
stopTalk(device, channelId);
});
@@ -1526,10 +1526,10 @@ public class PlayServiceImpl implements IPlayService {
@Override
public void stopTalk(Device device, String channelId, Boolean streamIsReady) {
logger.info("[语音对讲] 停止, {}/{}", device.getDeviceId(), channelId);
log.info("[语音对讲] 停止, {}/{}", device.getDeviceId(), channelId);
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
if (sendRtpItem == null) {
logger.info("[语音对讲] 停止失败, 未找到发送信息,可能已经停止");
log.info("[语音对讲] 停止失败, 未找到发送信息,可能已经停止");
return;
}
// 停止向设备推流
@@ -1551,7 +1551,7 @@ public class PlayServiceImpl implements IPlayService {
try {
cmder.streamByeCmd(device, channelId, sendRtpItem.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
logger.info("[语音对讲] 停止消息发送失败,可能已经停止");
log.info("[语音对讲] 停止消息发送失败,可能已经停止");
}
}
redisCatchStorage.deleteSendRTPServer(device.getDeviceId(), channelId,null, null);
@@ -1577,7 +1577,7 @@ public class PlayServiceImpl implements IPlayService {
}
String path = "snap";
// 请求截图
logger.info("[请求截图]: " + fileName);
log.info("[请求截图]: " + fileName);
mediaServerService.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
File snapFile = new File(path + File.separator + fileName);
if (snapFile.exists()) {
@@ -1612,10 +1612,10 @@ public class PlayServiceImpl implements IPlayService {
}
if (InviteSessionStatus.ok == inviteInfo.getStatus()) {
try {
logger.info("[停止点播] {}/{}", device.getDeviceId(), channelId);
log.info("[停止点播] {}/{}", device.getDeviceId(), channelId);
cmder.streamByeCmd(device, channelId, inviteInfo.getStream(), null, null);
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
log.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
}
}

View File

@@ -14,17 +14,15 @@ import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class RedisRpcServiceImpl implements IRedisRpcService {
private final static Logger logger = LoggerFactory.getLogger(RedisRpcServiceImpl.class);
@Autowired
private RedisRpcConfig redisRpcConfig;
@@ -60,7 +58,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
@Override
public WVPResult startSendRtp(String sendRtpItemKey, SendRtpItem sendRtpItem) {
logger.info("[请求其他WVP] 开始推流wvp{} {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream());
log.info("[请求其他WVP] 开始推流wvp{} {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream());
RedisRpcRequest request = buildRequest("startSendRtp", sendRtpItemKey);
request.setToId(sendRtpItem.getServerId());
RedisRpcResponse response = redisRpcConfig.request(request, 10);
@@ -71,10 +69,10 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
public WVPResult stopSendRtp(String sendRtpItemKey) {
SendRtpItem sendRtpItem = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey);
if (sendRtpItem == null) {
logger.info("[请求其他WVP] 停止推流, 未找到redis中的发流信息 key{}", sendRtpItemKey);
log.info("[请求其他WVP] 停止推流, 未找到redis中的发流信息 key{}", sendRtpItemKey);
return WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到发流信息");
}
logger.info("[请求其他WVP] 停止推流wvp{} {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream());
log.info("[请求其他WVP] 停止推流wvp{} {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream());
RedisRpcRequest request = buildRequest("stopSendRtp", sendRtpItemKey);
request.setToId(sendRtpItem.getServerId());
RedisRpcResponse response = redisRpcConfig.request(request, 10);
@@ -83,7 +81,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
@Override
public long waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback) {
logger.info("[请求所有WVP监听流上线] {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
log.info("[请求所有WVP监听流上线] {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
// 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
RedisRpcRequest request = buildRequest("waitePushStreamOnline", sendRtpItem);
@@ -109,10 +107,10 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
redisRpcConfig.request(request, response -> {
if (response.getBody() == null) {
logger.info("[请求所有WVP监听流上线] 流上线,但是未找到发流信息:{}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
log.info("[请求所有WVP监听流上线] 流上线,但是未找到发流信息:{}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
return;
}
logger.info("[请求所有WVP监听流上线] 流上线 {}/{}->{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.toString());
log.info("[请求所有WVP监听流上线] 流上线 {}/{}->{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.toString());
if (callback != null) {
callback.run(response.getBody().toString());
@@ -124,7 +122,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
@Override
public void stopWaitePushStreamOnline(SendRtpItem sendRtpItem) {
logger.info("[停止WVP监听流上线] {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
log.info("[停止WVP监听流上线] {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
hookSubscribe.removeSubscribe(hook);
RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItem);
@@ -136,7 +134,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
public void rtpSendStopped(String sendRtpItemKey) {
SendRtpItem sendRtpItem = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey);
if (sendRtpItem == null) {
logger.info("[停止WVP监听流上线] 未找到redis中的发流信息 key{}", sendRtpItemKey);
log.info("[停止WVP监听流上线] 未找到redis中的发流信息 key{}", sendRtpItemKey);
return;
}
RedisRpcRequest request = buildRequest("rtpSendStopped", sendRtpItemKey);