@@ -6,6 +6,9 @@ import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.VideoManagerConstants ;
import com.genersoft.iot.vmp.conf.DynamicTask ;
import com.genersoft.iot.vmp.conf.UserSetting ;
import com.genersoft.iot.vmp.conf.exception.ControllerException ;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform ;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem ;
import com.genersoft.iot.vmp.jt1078.bean.* ;
import com.genersoft.iot.vmp.jt1078.bean.common.ConfigAttribute ;
import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template ;
@@ -14,26 +17,37 @@ import com.genersoft.iot.vmp.jt1078.event.CallbackManager;
import com.genersoft.iot.vmp.jt1078.proc.request.J1205 ;
import com.genersoft.iot.vmp.jt1078.proc.response.* ;
import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service ;
import com.genersoft.iot.vmp.jt1078.util.SSRCUtil ;
import com.genersoft.iot.vmp.media.bean.MediaInfo ;
import com.genersoft.iot.vmp.media.bean.MediaServer ;
import com.genersoft.iot.vmp.media.event.hook.Hook ;
import com.genersoft.iot.vmp.media.event.hook.HookData ;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe ;
import com.genersoft.iot.vmp.media.event.hook.HookType ;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent ;
import com.genersoft.iot.vmp.media.service.IMediaServerService ;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils ;
import com.genersoft.iot.vmp.service.IMediaService ;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode ;
import com.genersoft.iot.vmp.service.bean.SSRCInfo ;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage ;
import com.genersoft.iot.vmp.utils.DateUtil ;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode ;
import com.github.pagehelper.PageHelper ;
import com.github.pagehelper.PageInfo ;
import org.apache.commons.lang3.ObjectUtils ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.context.event.EventListener ;
import org.springframework.data.redis.core.RedisTemplate ;
import org.springframework.scheduling.annotation.Async ;
import org.springframework.stereotype.Service ;
import javax.sip.InvalidArgumentException ;
import javax.sip.SipException ;
import java.lang.reflect.Field ;
import java.text.ParseException ;
import java.util.ArrayList ;
import java.util.List ;
import java.util.Map ;
@@ -74,6 +88,9 @@ public class jt1078ServiceImpl implements Ijt1078Service {
@Autowired
private CallbackManager callbackManager ;
@Autowired
private IRedisCatchStorage redisCatchStorage ;
@Override
public JTDevice getDevice ( String terminalId ) {
@@ -213,7 +230,7 @@ public class jt1078ServiceImpl implements Ijt1078Service {
}
// 清理回调
List < GeneralCallback < StreamInfo > > generalCallbacks = inviteErrorCallbackMap . get ( playKey ) ;
if ( ! generalCallbacks . isEmpty ( ) ) {
if ( generalCallbacks ! = null & & ! generalCallbacks . isEmpty ( ) ) {
for ( GeneralCallback < StreamInfo > callback : generalCallbacks ) {
callback . run ( InviteErrorCode . ERROR_FOR_FINISH . getCode ( ) , InviteErrorCode . ERROR_FOR_FINISH . getMsg ( ) , null ) ;
}
@@ -377,7 +394,7 @@ public class jt1078ServiceImpl implements Ijt1078Service {
}
// 清理回调
List < GeneralCallback < StreamInfo > > generalCallbacks = inviteErrorCallbackMap . get ( playKey ) ;
if ( ! generalCallbacks . isEmpty ( ) ) {
if ( generalCallbacks ! = null & & ! generalCallbacks . isEmpty ( ) ) {
for ( GeneralCallback < StreamInfo > callback : generalCallbacks ) {
callback . run ( InviteErrorCode . ERROR_FOR_FINISH . getCode ( ) , InviteErrorCode . ERROR_FOR_FINISH . getMsg ( ) , null ) ;
}
@@ -751,46 +768,74 @@ public class jt1078ServiceImpl implements Ijt1078Service {
return ( JTMediaAttribute ) jt1078Template . queryMediaAttribute ( deviceId , j9003 , 300 ) ;
}
/**
* 监听发流停止
*/
@EventListener
public void onApplicationEvent ( MediaSendRtpStoppedEvent event ) {
List < SendRtpItem > sendRtpItems = redisCatchStorage . querySendRTPServerByStream ( event . getStream ( ) ) ;
for ( SendRtpItem sendRtpItem : sendRtpItems ) {
if ( ! sendRtpItem . isOnlyAudio ( )
| | ObjectUtils . isEmpty ( sendRtpItem . getDeviceId ( ) )
| | ObjectUtils . isEmpty ( sendRtpItem . getChannelId ( ) ) ) {
continue ;
}
if ( ! sendRtpItem . getSsrc ( ) . contains ( " _ " ) ) {
continue ;
}
redisCatchStorage . deleteSendRTPServer ( sendRtpItem ) ;
String playKey = VideoManagerConstants . INVITE_INFO_1078_TALK + sendRtpItem . getDeviceId ( ) + " : " + sendRtpItem . getChannelId ( ) ;
redisTemplate . delete ( playKey ) ;
}
}
@Override
public void broadcast ( String deviceId , String channelId , String app , String stream , String mediaServerId , GeneralCallback < StreamInfo > callback ) {
public void startTalk ( String deviceId , String channelId , String app , String stream , String mediaServerId , GeneralCallback < StreamInfo > callback ) {
// 检查流是否已经存在,存在则返回
String playKey = VideoManagerConstants . INVITE_INFO_1078_BROADCAST + deviceId + " : " + channelId ;
String playKey = VideoManagerConstants . INVITE_INFO_1078_TALK + deviceId + " : " + channelId ;
List < GeneralCallback < StreamInfo > > errorCallbacks = inviteErrorCallbackMap . computeIfAbsent ( playKey , k - > new ArrayList < > ( ) ) ;
errorCallbacks . add ( callback ) ;
StreamInfo streamInfo = ( StreamInfo ) redisTemplate . opsForValue ( ) . get ( playKey ) ;
if ( streamInfo ! = null ) {
String mediaServerId = streamInfo . getMediaServerId ( ) ;
MediaServer mediaServer = mediaServerService . getOne ( mediaServerId ) ;
if ( mediaServer ! = null ) {
// 查询流是否存在,不存在则删除缓存数据
JSONObject mediaInfo = zlmresTfulUtils . getMediaInfo ( mediaServer , " rtp " , " rtsp " , streamInfo . getStream ( ) ) ;
if ( mediaInfo ! = null & & mediaInfo . getInteger ( " code " ) = = 0 ) {
Boolean online = mediaInfo . getBoolean ( " online " ) ;
if ( online ! = null & & online ) {
logger . info ( " [1078-点播] 点播已经存在,直接返回, deviceId: {}, channelId: {} " , deviceId , channelId ) ;
for ( GeneralCallback < StreamInfo > errorCallback : errorCallbacks ) {
errorCallback . run ( InviteErrorCode . SUCCESS . getCode ( ) , InviteErrorCode . SUCCESS . getMsg ( ) , streamInfo ) ;
}
return ;
}
}
}
// 清理数据
redisTemplate . delete ( playKey ) ;
throw new ControllerException ( ErrorCode . ERROR100 . getCode ( ) , " 对讲进行中 " ) ;
}
String stream = deviceId + " _ " + channelId ;
MediaServer mediaServer = mediaServerService . getMediaServerForMinimumLoad ( null ) ;
String reciveStream = deviceId + " _ " + channelId ;
MediaServer mediaServer = mediaServerService . getOne ( mediaServerId ) ;
if ( mediaServer = = null ) {
for ( GeneralCallback < StreamInfo > errorCallback : errorCallbacks ) {
errorCallback . run ( InviteErrorCode . FAIL . getCode ( ) , " 未找到可用的媒体节点 " , streamInfo ) ;
}
return ;
}
// 检查待发送的流是否存在,
MediaInfo mediaInfo = mediaServerService . getMediaInfo ( mediaServer , app , stream ) ;
if ( mediaInfo = = null ) {
throw new ControllerException ( ErrorCode . ERROR100 . getCode ( ) , app + " / " + stream + " 流不存在 " ) ;
}
// 开启收流端口, zlm发送1078的rtp流需要将ssrc字段设置为 imei_channel格式
String ssrc = deviceId + " _ " + channelId ;
SendRtpItem sendRtpItem = new SendRtpItem ( ) ;
sendRtpItem . setMediaServerId ( mediaServerId ) ;
sendRtpItem . setPort ( 0 ) ;
sendRtpItem . setSsrc ( ssrc ) ;
sendRtpItem . setDeviceId ( deviceId ) ;
sendRtpItem . setChannelId ( channelId ) ;
sendRtpItem . setRtcp ( false ) ;
sendRtpItem . setApp ( app ) ;
sendRtpItem . setStream ( stream ) ;
sendRtpItem . setTcp ( true ) ;
sendRtpItem . setTcpActive ( true ) ;
sendRtpItem . setUsePs ( false ) ;
sendRtpItem . setOnlyAudio ( true ) ;
sendRtpItem . setReceiveStream ( reciveStream ) ;
sendRtpItem . setPlatformId ( deviceId ) ;
// 设置hook监听
Hook hook = Hook . getInstance ( HookType . on_media_arrival , " rtp " , s tream, mediaServer . getId ( ) ) ;
Hook hook = Hook . getInstance ( HookType . on_media_arrival , " rtp " , reciveS tream, mediaServer . getId ( ) ) ;
subscribe . addSubscribe ( hook , ( hookData ) - > {
dynamicTask . stop ( playKey ) ;
logger . info ( " [1078-点播] 点播 成功, deviceId: {}, channelId: {} " , deviceId , channelId ) ;
logger . info ( " [1078-对讲] 对讲 成功, deviceId: {}, channelId: {} " , deviceId , channelId ) ;
StreamInfo info = onPublishHandler ( mediaServer , hookData , deviceId , channelId ) ;
for ( GeneralCallback < StreamInfo > errorCallback : errorCallbacks ) {
@@ -798,10 +843,12 @@ public class jt1078ServiceImpl implements Ijt1078Service {
}
subscribe . removeSubscribe ( hook ) ;
redisTemplate . opsForValue ( ) . set ( playKey , info ) ;
// 存储发流信息
redisCatchStorage . updateSendRTPSever ( sendRtpItem ) ;
} ) ;
// 设置超时监听
dynamicTask . startDelay ( playKey , ( ) - > {
logger . info ( " [1078-点播 ] 超时, deviceId: {}, channelId: {} " , deviceId , channelId ) ;
logger . info ( " [1078-对讲 ] 超时, deviceId: {}, channelId: {} " , deviceId , channelId ) ;
for ( GeneralCallback < StreamInfo > errorCallback : errorCallbacks ) {
errorCallback . run ( InviteErrorCode . ERROR_FOR_SIGNALLING_TIMEOUT . getCode ( ) ,
InviteErrorCode . ERROR_FOR_SIGNALLING_TIMEOUT . getMsg ( ) , null ) ;
@@ -809,17 +856,46 @@ public class jt1078ServiceImpl implements Ijt1078Service {
} , userSetting . getPlayTimeout ( ) ) ;
// 开启收流端口
SSRCInfo ssrcInfo = mediaServerService . openRTPServer ( mediaServer , stream , null , false , false , 0 , false , false , false , 1 ) ;
logger . info ( " [1078-点播 ] deviceId: {}, channelId: {}, 端口: {} ", deviceId , channelId , ssrcInfo . getPort ( ) ) ;
Integer localPort = mediaServerService . startSendRtpPassive ( mediaServer , sendRtpItem , 15000 ) ;
logger . info ( " [1078-对讲 ] deviceId: {}, channelId: {}, 收发 端口: {}, app: {}, stream: {} " ,
deviceId , channelId , localPort , app , stream ) ;
J9101 j9101 = new J9101 ( ) ;
j9101 . setChannel ( Integer . valueOf ( channelId ) ) ;
j9101 . setIp ( mediaServer . getSdpIp ( ) ) ;
j9101 . setRate ( 1 ) ;
j9101 . setTcpPort ( ssrcInfo . getPort ( ) ) ;
j9101 . setUdpPort ( ssrcInfo . getPort ( ) ) ;
j9101 . setType ( type ) ;
j9101 . setTcpPort ( localPort ) ;
j9101 . setUdpPort ( localPort ) ;
j9101 . setType ( 2 ) ;
Object s = jt1078Template . startLive ( deviceId , j9101 , 6 ) ;
System . out . println ( " ssss=== " + s ) ;
}
@Override
public void stopTalk ( String deviceId , String channelId ) {
String playKey = VideoManagerConstants . INVITE_INFO_1078_TALK + deviceId + " : " + channelId ;
dynamicTask . stop ( playKey ) ;
StreamInfo streamInfo = ( StreamInfo ) redisTemplate . opsForValue ( ) . get ( playKey ) ;
// 发送停止命令
J9102 j9102 = new J9102 ( ) ;
j9102 . setChannel ( Integer . valueOf ( channelId ) ) ;
j9102 . setCommand ( 0 ) ;
j9102 . setCloseType ( 0 ) ;
j9102 . setStreamType ( 1 ) ;
jt1078Template . stopLive ( deviceId , j9102 , 6 ) ;
logger . info ( " [1078-停止对讲] deviceId: {}, channelId: {} " , deviceId , channelId ) ;
// 删除缓存数据
if ( streamInfo ! = null ) {
redisTemplate . delete ( playKey ) ;
// 关闭rtpServer
mediaServerService . closeRTPServer ( streamInfo . getMediaServerId ( ) , streamInfo . getStream ( ) ) ;
}
// 清理回调
List < GeneralCallback < StreamInfo > > generalCallbacks = inviteErrorCallbackMap . get ( playKey ) ;
if ( generalCallbacks ! = null & & ! generalCallbacks . isEmpty ( ) ) {
for ( GeneralCallback < StreamInfo > callback : generalCallbacks ) {
callback . run ( InviteErrorCode . ERROR_FOR_FINISH . getCode ( ) , InviteErrorCode . ERROR_FOR_FINISH . getMsg ( ) , null ) ;
}
}
}
}