Merge remote-tracking branch 'origin/wvp-28181-2.0' into commercial
# Conflicts: # src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
This commit is contained in:
@@ -1,76 +0,0 @@
|
||||
package com.genersoft.iot.vmp.gb28181.transmit.callback;
|
||||
|
||||
import com.genersoft.iot.vmp.gb28181.bean.RecordInfo;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.RecordItem;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.RecordInfoResponseMessageHandler;
|
||||
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public class CheckForAllRecordsThread extends Thread {
|
||||
|
||||
private String key;
|
||||
|
||||
private RecordInfo recordInfo;
|
||||
|
||||
private RedisUtil redis;
|
||||
|
||||
private Logger logger;
|
||||
|
||||
private DeferredResultHolder deferredResultHolder;
|
||||
|
||||
public CheckForAllRecordsThread(String key, RecordInfo recordInfo) {
|
||||
this.key = key;
|
||||
this.recordInfo = recordInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
String cacheKey = this.key;
|
||||
|
||||
for (long stop = System.nanoTime() + TimeUnit.SECONDS.toNanos(10); stop > System.nanoTime();) {
|
||||
List<Object> cacheKeys = redis.scan(cacheKey + "_*");
|
||||
List<RecordItem> totalRecordList = new ArrayList<RecordItem>();
|
||||
for (int i = 0; i < cacheKeys.size(); i++) {
|
||||
totalRecordList.addAll((List<RecordItem>) redis.get(cacheKeys.get(i).toString()));
|
||||
}
|
||||
if (totalRecordList.size() < this.recordInfo.getSumNum()) {
|
||||
logger.info("已获取" + totalRecordList.size() + "项录像数据,共" + this.recordInfo.getSumNum() + "项");
|
||||
} else {
|
||||
logger.info("录像数据已全部获取,共 {} 项", this.recordInfo.getSumNum());
|
||||
this.recordInfo.setRecordList(totalRecordList);
|
||||
for (int i = 0; i < cacheKeys.size(); i++) {
|
||||
redis.del(cacheKeys.get(i).toString());
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
// 自然顺序排序, 元素进行升序排列
|
||||
this.recordInfo.getRecordList().sort(Comparator.naturalOrder());
|
||||
RequestMessage msg = new RequestMessage();
|
||||
msg.setKey(DeferredResultHolder.CALLBACK_CMD_RECORDINFO + recordInfo.getDeviceId() + recordInfo.getSn());
|
||||
msg.setData(recordInfo);
|
||||
deferredResultHolder.invokeAllResult(msg);
|
||||
logger.info("处理完成,返回结果");
|
||||
RecordInfoResponseMessageHandler.threadNameList.remove(cacheKey);
|
||||
}
|
||||
|
||||
public void setRedis(RedisUtil redis) {
|
||||
this.redis = redis;
|
||||
}
|
||||
|
||||
public void setDeferredResultHolder(DeferredResultHolder deferredResultHolder) {
|
||||
this.deferredResultHolder = deferredResultHolder;
|
||||
}
|
||||
|
||||
public void setLogger(Logger logger) {
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -370,7 +370,7 @@ public class SIPCommander implements ISIPCommander {
|
||||
//
|
||||
StringBuffer content = new StringBuffer(200);
|
||||
content.append("v=0\r\n");
|
||||
content.append("o="+ sipConfig.getId()+" 0 0 IN IP4 "+ mediaServerItem.getSdpIp() +"\r\n");
|
||||
content.append("o="+ channelId+" 0 0 IN IP4 "+ mediaServerItem.getSdpIp() +"\r\n");
|
||||
content.append("s=Play\r\n");
|
||||
content.append("c=IN IP4 "+ mediaServerItem.getSdpIp() +"\r\n");
|
||||
content.append("t=0 0\r\n");
|
||||
@@ -389,8 +389,7 @@ public class SIPCommander implements ISIPCommander {
|
||||
content.append("a=rtpmap:126 H264/90000\r\n");
|
||||
content.append("a=rtpmap:125 H264S/90000\r\n");
|
||||
content.append("a=fmtp:125 profile-level-id=42e01e\r\n");
|
||||
content.append("a=rtpmap:99 MP4V-ES/90000\r\n");
|
||||
content.append("a=fmtp:99 profile-level-id=3\r\n");
|
||||
content.append("a=rtpmap:99 H265/90000\r\n");
|
||||
content.append("a=rtpmap:98 H264/90000\r\n");
|
||||
content.append("a=rtpmap:97 MPEG4/90000\r\n");
|
||||
if("TCP-PASSIVE".equals(streamMode)){ // tcp被动模式
|
||||
@@ -402,16 +401,17 @@ public class SIPCommander implements ISIPCommander {
|
||||
}
|
||||
}else {
|
||||
if("TCP-PASSIVE".equals(streamMode)) {
|
||||
content.append("m=video "+ ssrcInfo.getPort() +" TCP/RTP/AVP 96 98 97\r\n");
|
||||
content.append("m=video "+ ssrcInfo.getPort() +" TCP/RTP/AVP 96 97 98 99\r\n");
|
||||
}else if ("TCP-ACTIVE".equals(streamMode)) {
|
||||
content.append("m=video "+ ssrcInfo.getPort() +" TCP/RTP/AVP 96 98 97\r\n");
|
||||
content.append("m=video "+ ssrcInfo.getPort() +" TCP/RTP/AVP 96 97 98 99\r\n");
|
||||
}else if("UDP".equals(streamMode)) {
|
||||
content.append("m=video "+ ssrcInfo.getPort() +" RTP/AVP 96 98 97\r\n");
|
||||
content.append("m=video "+ ssrcInfo.getPort() +" RTP/AVP 96 97 98 99\r\n");
|
||||
}
|
||||
content.append("a=recvonly\r\n");
|
||||
content.append("a=rtpmap:96 PS/90000\r\n");
|
||||
content.append("a=rtpmap:98 H264/90000\r\n");
|
||||
content.append("a=rtpmap:97 MPEG4/90000\r\n");
|
||||
content.append("a=rtpmap:99 H265/90000\r\n");
|
||||
if ("TCP-PASSIVE".equals(streamMode)) { // tcp被动模式
|
||||
content.append("a=setup:passive\r\n");
|
||||
content.append("a=connection:new\r\n");
|
||||
@@ -467,7 +467,7 @@ public class SIPCommander implements ISIPCommander {
|
||||
|
||||
StringBuffer content = new StringBuffer(200);
|
||||
content.append("v=0\r\n");
|
||||
content.append("o="+sipConfig.getId()+" 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
|
||||
content.append("o="+channelId+" 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
|
||||
content.append("s=Playback\r\n");
|
||||
content.append("u="+channelId+":0\r\n");
|
||||
content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
|
||||
@@ -490,8 +490,7 @@ public class SIPCommander implements ISIPCommander {
|
||||
content.append("a=rtpmap:126 H264/90000\r\n");
|
||||
content.append("a=rtpmap:125 H264S/90000\r\n");
|
||||
content.append("a=fmtp:125 profile-level-id=42e01e\r\n");
|
||||
content.append("a=rtpmap:99 MP4V-ES/90000\r\n");
|
||||
content.append("a=fmtp:99 profile-level-id=3\r\n");
|
||||
content.append("a=rtpmap:99 H265/90000\r\n");
|
||||
content.append("a=rtpmap:98 H264/90000\r\n");
|
||||
content.append("a=rtpmap:97 MPEG4/90000\r\n");
|
||||
if("TCP-PASSIVE".equals(streamMode)){ // tcp被动模式
|
||||
@@ -503,16 +502,17 @@ public class SIPCommander implements ISIPCommander {
|
||||
}
|
||||
}else {
|
||||
if("TCP-PASSIVE".equals(streamMode)) {
|
||||
content.append("m=video "+ ssrcInfo.getPort() +" TCP/RTP/AVP 96 98 97\r\n");
|
||||
content.append("m=video "+ ssrcInfo.getPort() +" TCP/RTP/AVP 96 97 98 99\r\n");
|
||||
}else if ("TCP-ACTIVE".equals(streamMode)) {
|
||||
content.append("m=video "+ ssrcInfo.getPort() +" TCP/RTP/AVP 96 98 97\r\n");
|
||||
content.append("m=video "+ ssrcInfo.getPort() +" TCP/RTP/AVP 96 97 98 99\r\n");
|
||||
}else if("UDP".equals(streamMode)) {
|
||||
content.append("m=video "+ ssrcInfo.getPort() +" RTP/AVP 96 98 97\r\n");
|
||||
content.append("m=video "+ ssrcInfo.getPort() +" RTP/AVP 96 97 98 99\r\n");
|
||||
}
|
||||
content.append("a=recvonly\r\n");
|
||||
content.append("a=rtpmap:96 PS/90000\r\n");
|
||||
content.append("a=rtpmap:98 H264/90000\r\n");
|
||||
content.append("a=rtpmap:97 MPEG4/90000\r\n");
|
||||
content.append("a=rtpmap:98 H264/90000\r\n");
|
||||
content.append("a=rtpmap:99 H265/90000\r\n");
|
||||
if("TCP-PASSIVE".equals(streamMode)){ // tcp被动模式
|
||||
content.append("a=setup:passive\r\n");
|
||||
content.append("a=connection:new\r\n");
|
||||
@@ -577,7 +577,7 @@ public class SIPCommander implements ISIPCommander {
|
||||
|
||||
StringBuffer content = new StringBuffer(200);
|
||||
content.append("v=0\r\n");
|
||||
content.append("o="+sipConfig.getId()+" 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
|
||||
content.append("o="+channelId+" 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
|
||||
content.append("s=Download\r\n");
|
||||
content.append("u="+channelId+":0\r\n");
|
||||
content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
|
||||
@@ -613,16 +613,17 @@ public class SIPCommander implements ISIPCommander {
|
||||
}
|
||||
}else {
|
||||
if("TCP-PASSIVE".equals(streamMode)) {
|
||||
content.append("m=video "+ ssrcInfo.getPort() +" TCP/RTP/AVP 96 98 97\r\n");
|
||||
content.append("m=video "+ ssrcInfo.getPort() +" TCP/RTP/AVP 96 97 98 99\r\n");
|
||||
}else if ("TCP-ACTIVE".equals(streamMode)) {
|
||||
content.append("m=video "+ ssrcInfo.getPort() +" TCP/RTP/AVP 96 98 97\r\n");
|
||||
content.append("m=video "+ ssrcInfo.getPort() +" TCP/RTP/AVP 96 97 98 99\r\n");
|
||||
}else if("UDP".equals(streamMode)) {
|
||||
content.append("m=video "+ ssrcInfo.getPort() +" RTP/AVP 96 98 97\r\n");
|
||||
content.append("m=video "+ ssrcInfo.getPort() +" RTP/AVP 96 97 98 99\r\n");
|
||||
}
|
||||
content.append("a=recvonly\r\n");
|
||||
content.append("a=rtpmap:96 PS/90000\r\n");
|
||||
content.append("a=rtpmap:98 H264/90000\r\n");
|
||||
content.append("a=rtpmap:97 MPEG4/90000\r\n");
|
||||
content.append("a=rtpmap:98 H264/90000\r\n");
|
||||
content.append("a=rtpmap:99 H265/90000\r\n");
|
||||
if("TCP-PASSIVE".equals(streamMode)){ // tcp被动模式
|
||||
content.append("a=setup:passive\r\n");
|
||||
content.append("a=connection:new\r\n");
|
||||
@@ -651,6 +652,17 @@ public class SIPCommander implements ISIPCommander {
|
||||
(MediaServerItem mediaServerItemInUse, JSONObject json)->{
|
||||
hookEvent.call(new InviteStreamInfo(mediaServerItem, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream()));
|
||||
subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
|
||||
subscribeKey.put("regist", false);
|
||||
subscribeKey.put("schema", "rtmp");
|
||||
// 添加流注销的订阅,注销了后向设备发送bye
|
||||
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
|
||||
(MediaServerItem mediaServerItemForEnd, JSONObject jsonForEnd)->{
|
||||
ClientTransaction transaction = streamSession.getTransaction(device.getDeviceId(), channelId, ssrcInfo.getStream(), callIdHeader.getCallId());
|
||||
if (transaction != null) {
|
||||
logger.info("[录像]下载结束, 发送BYE");
|
||||
streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), callIdHeader.getCallId());
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc());
|
||||
@@ -683,10 +695,10 @@ public class SIPCommander implements ISIPCommander {
|
||||
@Override
|
||||
public void streamByeCmd(String deviceId, String channelId, String stream, String callId, SipSubscribe.Event okEvent) {
|
||||
try {
|
||||
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, null, stream);
|
||||
ClientTransaction transaction = streamSession.getTransactionByStream(deviceId, channelId, stream);
|
||||
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, callId, stream);
|
||||
ClientTransaction transaction = streamSession.getTransaction(deviceId, channelId, stream, callId);
|
||||
|
||||
if (transaction == null) {
|
||||
if (transaction == null ) {
|
||||
logger.warn("[ {} -> {}]停止视频流的时候发现事务已丢失", deviceId, channelId);
|
||||
SipSubscribe.EventResult<Object> eventResult = new SipSubscribe.EventResult<>();
|
||||
if (okEvent != null) {
|
||||
@@ -1630,6 +1642,7 @@ public class SIPCommander implements ISIPCommander {
|
||||
sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), (eventResult -> {
|
||||
errorEvent.response(eventResult);
|
||||
sipSubscribe.removeErrorSubscribe(eventResult.callId);
|
||||
sipSubscribe.removeOkSubscribe(eventResult.callId);
|
||||
}));
|
||||
}
|
||||
// 添加订阅
|
||||
@@ -1637,6 +1650,7 @@ public class SIPCommander implements ISIPCommander {
|
||||
sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), eventResult ->{
|
||||
okEvent.response(eventResult);
|
||||
sipSubscribe.removeOkSubscribe(eventResult.callId);
|
||||
sipSubscribe.removeErrorSubscribe(eventResult.callId);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -50,6 +50,7 @@ import javax.sip.header.CallIdHeader;
|
||||
import javax.sip.message.Request;
|
||||
import javax.sip.message.Response;
|
||||
import java.text.ParseException;
|
||||
import java.time.Instant;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Vector;
|
||||
@@ -201,16 +202,16 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
||||
|
||||
Long startTime = null;
|
||||
Long stopTime = null;
|
||||
Date start = null;
|
||||
Date end = null;
|
||||
Instant start = null;
|
||||
Instant end = null;
|
||||
if (sdp.getTimeDescriptions(false) != null && sdp.getTimeDescriptions(false).size() > 0) {
|
||||
TimeDescriptionImpl timeDescription = (TimeDescriptionImpl)(sdp.getTimeDescriptions(false).get(0));
|
||||
TimeField startTimeFiled = (TimeField)timeDescription.getTime();
|
||||
startTime = startTimeFiled.getStartTime();
|
||||
stopTime = startTimeFiled.getStopTime();
|
||||
|
||||
start = new Date(startTime*1000);
|
||||
end = new Date(stopTime*1000);
|
||||
start = Instant.ofEpochMilli(startTime*1000);
|
||||
end = Instant.ofEpochMilli(stopTime*1000);
|
||||
}
|
||||
// 获取支持的格式
|
||||
Vector mediaDescriptions = sdp.getMediaDescriptions(true);
|
||||
@@ -352,12 +353,12 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
||||
sendRtpItem.setApp("rtp");
|
||||
if ("Playback".equals(sessionName)) {
|
||||
sendRtpItem.setPlayType(InviteStreamType.PLAYBACK);
|
||||
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true);
|
||||
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true, true);
|
||||
sendRtpItem.setStreamId(ssrcInfo.getStream());
|
||||
// 写入redis, 超时时回复
|
||||
redisCatchStorage.updateSendRTPSever(sendRtpItem);
|
||||
playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.format.format(start),
|
||||
DateUtil.format.format(end), null, result -> {
|
||||
playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
|
||||
DateUtil.formatter.format(end), null, result -> {
|
||||
if (result.getCode() != 0){
|
||||
logger.warn("录像回放失败");
|
||||
if (result.getEvent() != null) {
|
||||
@@ -393,7 +394,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
||||
if (mediaServerItem.isRtpEnable()) {
|
||||
streamId = String.format("%s_%s", device.getDeviceId(), channelId);
|
||||
}
|
||||
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, true);
|
||||
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, true, false);
|
||||
sendRtpItem.setStreamId(ssrcInfo.getStream());
|
||||
// 写入redis, 超时时回复
|
||||
redisCatchStorage.updateSendRTPSever(sendRtpItem);
|
||||
|
||||
@@ -12,6 +12,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP
|
||||
import com.genersoft.iot.vmp.service.IDeviceService;
|
||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
|
||||
import com.genersoft.iot.vmp.utils.DateUtil;
|
||||
import gov.nist.javax.sip.RequestEventExt;
|
||||
import gov.nist.javax.sip.address.AddressImpl;
|
||||
import gov.nist.javax.sip.address.SipUri;
|
||||
@@ -80,7 +81,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
|
||||
try {
|
||||
RequestEventExt evtExt = (RequestEventExt) evt;
|
||||
String requestAddress = evtExt.getRemoteIpAddress() + ":" + evtExt.getRemotePort();
|
||||
logger.info("[{}] 收到注册请求,开始处理", requestAddress);
|
||||
logger.info("[注册请求] 开始处理: {}", requestAddress);
|
||||
Request request = evt.getRequest();
|
||||
ExpiresHeader expiresHeader = (ExpiresHeader) request.getHeader(Expires.NAME);
|
||||
Response response = null;
|
||||
@@ -94,7 +95,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
|
||||
|
||||
AuthorizationHeader authHead = (AuthorizationHeader) request.getHeader(AuthorizationHeader.NAME);
|
||||
if (authHead == null) {
|
||||
logger.info("[{}] 未携带授权头 回复401", requestAddress);
|
||||
logger.info("[注册请求] 未携带授权头 回复401: {}", requestAddress);
|
||||
response = getMessageFactory().createResponse(Response.UNAUTHORIZED, request);
|
||||
new DigestServerAuthenticationHelper().generateChallenge(getHeaderFactory(), response, sipConfig.getDomain());
|
||||
sendResponse(evt, response);
|
||||
@@ -110,7 +111,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
|
||||
// 注册失败
|
||||
response = getMessageFactory().createResponse(Response.FORBIDDEN, request);
|
||||
response.setReasonPhrase("wrong password");
|
||||
logger.info("[{}] 密码/SIP服务器ID错误, 回复403", requestAddress);
|
||||
logger.info("[注册请求] 密码/SIP服务器ID错误, 回复403: {}", requestAddress);
|
||||
sendResponse(evt, response);
|
||||
return;
|
||||
}
|
||||
@@ -175,10 +176,11 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
|
||||
// 注册成功
|
||||
// 保存到redis
|
||||
if (registerFlag) {
|
||||
logger.info("[{}] 注册成功! deviceId:" + deviceId, requestAddress);
|
||||
logger.info("[注册成功] deviceId: {}->{}", deviceId, requestAddress);
|
||||
device.setRegisterTime(DateUtil.getNow());
|
||||
deviceService.online(device);
|
||||
} else {
|
||||
logger.info("[{}] 注销成功! deviceId:" + deviceId, requestAddress);
|
||||
logger.info("[注销成功] deviceId: {}->{}" ,deviceId, requestAddress);
|
||||
deviceService.offline(deviceId);
|
||||
}
|
||||
} catch (SipException | InvalidArgumentException | NoSuchAlgorithmException | ParseException e) {
|
||||
@@ -190,7 +192,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
|
||||
private void sendResponse(RequestEvent evt, Response response) throws InvalidArgumentException, SipException {
|
||||
ServerTransaction serverTransaction = getServerTransaction(evt);
|
||||
if (serverTransaction == null) {
|
||||
logger.warn("回复失败:{}", response);
|
||||
logger.warn("[回复失败]:{}", response);
|
||||
return;
|
||||
}
|
||||
serverTransaction.sendResponse(response);
|
||||
|
||||
@@ -24,8 +24,6 @@ import javax.sip.SipException;
|
||||
import javax.sip.header.ViaHeader;
|
||||
import javax.sip.message.Response;
|
||||
import java.text.ParseException;
|
||||
import java.util.Calendar;
|
||||
import java.util.Date;
|
||||
|
||||
@Component
|
||||
public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler {
|
||||
@@ -72,6 +70,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
|
||||
device.setPort(rPort);
|
||||
device.setHostAddress(received.concat(":").concat(String.valueOf(rPort)));
|
||||
}
|
||||
device.setKeepaliveTime(DateUtil.getNow());
|
||||
deviceService.online(device);
|
||||
// 回复200 OK
|
||||
responseAck(evt, Response.OK);
|
||||
|
||||
@@ -60,10 +60,9 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
|
||||
CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
|
||||
String NotifyType =getText(rootElement, "NotifyType");
|
||||
if (NotifyType.equals("121")){
|
||||
logger.info("媒体播放完毕,通知关流");
|
||||
logger.info("[录像流]推送完毕,收到关流通知");
|
||||
String channelId =getText(rootElement, "DeviceID");
|
||||
// redisCatchStorage.stopPlayback(device.getDeviceId(), channelId, null, callIdHeader.getCallId());
|
||||
// redisCatchStorage.stopDownload(device.getDeviceId(), channelId, null, callIdHeader.getCallId());
|
||||
// 查询是设备
|
||||
StreamInfo streamInfo = redisCatchStorage.queryDownload(device.getDeviceId(), channelId, null, callIdHeader.getCallId());
|
||||
// 设置进度100%
|
||||
streamInfo.setProgress(1);
|
||||
|
||||
@@ -5,14 +5,14 @@ import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.RecordInfo;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.RecordItem;
|
||||
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.callback.CheckForAllRecordsThread;
|
||||
import com.genersoft.iot.vmp.gb28181.session.RecordDataCatch;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
|
||||
import com.genersoft.iot.vmp.utils.DateUtil;
|
||||
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
|
||||
import org.dom4j.DocumentException;
|
||||
import org.dom4j.Element;
|
||||
import org.slf4j.Logger;
|
||||
@@ -20,19 +20,20 @@ import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import javax.sip.InvalidArgumentException;
|
||||
import javax.sip.RequestEvent;
|
||||
import javax.sip.SipException;
|
||||
import javax.sip.message.Response;
|
||||
import java.text.ParseException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.*;
|
||||
|
||||
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
|
||||
|
||||
/**
|
||||
* @author lin
|
||||
*/
|
||||
@Component
|
||||
public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler {
|
||||
|
||||
@@ -45,11 +46,13 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
|
||||
private ResponseMessageHandler responseMessageHandler;
|
||||
|
||||
@Autowired
|
||||
private RedisUtil redis;
|
||||
private RecordDataCatch recordDataCatch;
|
||||
|
||||
@Autowired
|
||||
private DeferredResultHolder deferredResultHolder;
|
||||
|
||||
|
||||
|
||||
@Autowired
|
||||
private EventPublisher eventPublisher;
|
||||
|
||||
@@ -66,32 +69,22 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
|
||||
responseAck(evt, Response.OK);
|
||||
|
||||
rootElement = getRootElement(evt, device.getCharset());
|
||||
String uuid = UUID.randomUUID().toString().replace("-", "");
|
||||
RecordInfo recordInfo = new RecordInfo();
|
||||
String sn = getText(rootElement, "SN");
|
||||
String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + device.getDeviceId() + sn;
|
||||
recordInfo.setDeviceId(device.getDeviceId());
|
||||
recordInfo.setSn(sn);
|
||||
recordInfo.setName(getText(rootElement, "Name"));
|
||||
if (getText(rootElement, "SumNum") == null || getText(rootElement, "SumNum") == "") {
|
||||
recordInfo.setSumNum(0);
|
||||
} else {
|
||||
recordInfo.setSumNum(Integer.parseInt(getText(rootElement, "SumNum")));
|
||||
|
||||
String sumNumStr = getText(rootElement, "SumNum");
|
||||
int sumNum = 0;
|
||||
if (!StringUtils.isEmpty(sumNumStr)) {
|
||||
sumNum = Integer.parseInt(sumNumStr);
|
||||
}
|
||||
Element recordListElement = rootElement.element("RecordList");
|
||||
if (recordListElement == null || recordInfo.getSumNum() == 0) {
|
||||
if (recordListElement == null || sumNum == 0) {
|
||||
logger.info("无录像数据");
|
||||
eventPublisher.recordEndEventPush(recordInfo);
|
||||
RequestMessage msg = new RequestMessage();
|
||||
msg.setKey(key);
|
||||
msg.setData(recordInfo);
|
||||
deferredResultHolder.invokeAllResult(msg);
|
||||
recordDataCatch.put(device.getDeviceId(), sn, sumNum, new ArrayList<>());
|
||||
releaseRequest(device.getDeviceId(), sn);
|
||||
} else {
|
||||
Iterator<Element> recordListIterator = recordListElement.elementIterator();
|
||||
List<RecordItem> recordList = new ArrayList<RecordItem>();
|
||||
if (recordListIterator != null) {
|
||||
RecordItem record = new RecordItem();
|
||||
logger.info("处理录像列表数据...");
|
||||
List<RecordItem> recordList = new ArrayList<>();
|
||||
// 遍历DeviceList
|
||||
while (recordListIterator.hasNext()) {
|
||||
Element itemRecord = recordListIterator.next();
|
||||
@@ -100,43 +93,31 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
|
||||
logger.info("记录为空,下一个...");
|
||||
continue;
|
||||
}
|
||||
record = new RecordItem();
|
||||
RecordItem record = new RecordItem();
|
||||
record.setDeviceId(getText(itemRecord, "DeviceID"));
|
||||
record.setName(getText(itemRecord, "Name"));
|
||||
record.setFilePath(getText(itemRecord, "FilePath"));
|
||||
record.setFileSize(getText(itemRecord, "FileSize"));
|
||||
record.setAddress(getText(itemRecord, "Address"));
|
||||
record.setStartTime(
|
||||
DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(getText(itemRecord, "StartTime")));
|
||||
record.setEndTime(
|
||||
DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(getText(itemRecord, "EndTime")));
|
||||
|
||||
String startTimeStr = getText(itemRecord, "StartTime");
|
||||
record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr));
|
||||
|
||||
String endTimeStr = getText(itemRecord, "EndTime");
|
||||
record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr));
|
||||
|
||||
record.setSecrecy(itemRecord.element("Secrecy") == null ? 0
|
||||
: Integer.parseInt(getText(itemRecord, "Secrecy")));
|
||||
record.setType(getText(itemRecord, "Type"));
|
||||
record.setRecorderId(getText(itemRecord, "RecorderID"));
|
||||
recordList.add(record);
|
||||
}
|
||||
recordInfo.setRecordList(recordList);
|
||||
int count = recordDataCatch.put(device.getDeviceId(), sn, sumNum, recordList);
|
||||
logger.info("[国标录像], {}->{}: {}/{}", device.getDeviceId(), sn, count, sumNum);
|
||||
}
|
||||
eventPublisher.recordEndEventPush(recordInfo);
|
||||
// 改用单独线程统计已获取录像文件数量,避免多包并行分别统计不完整的问题
|
||||
String cacheKey = CACHE_RECORDINFO_KEY + device.getDeviceId() + sn;
|
||||
redis.set(cacheKey + "_" + uuid, recordList, 90);
|
||||
if (!threadNameList.contains(cacheKey)) {
|
||||
threadNameList.add(cacheKey);
|
||||
CheckForAllRecordsThread chk = new CheckForAllRecordsThread(cacheKey, recordInfo);
|
||||
chk.setName(cacheKey);
|
||||
chk.setDeferredResultHolder(deferredResultHolder);
|
||||
chk.setRedis(redis);
|
||||
chk.setLogger(logger);
|
||||
chk.start();
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Start Thread " + cacheKey + ".");
|
||||
}
|
||||
} else {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Thread " + cacheKey + " already started.");
|
||||
}
|
||||
|
||||
if (recordDataCatch.isComplete(device.getDeviceId(), sn)){
|
||||
releaseRequest(device.getDeviceId(), sn);
|
||||
}
|
||||
}
|
||||
} catch (SipException e) {
|
||||
@@ -154,4 +135,20 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
|
||||
public void handForPlatform(RequestEvent evt, ParentPlatform parentPlatform, Element element) {
|
||||
|
||||
}
|
||||
|
||||
public void releaseRequest(String deviceId, String sn){
|
||||
String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + deviceId + sn;
|
||||
WVPResult<RecordInfo> wvpResult = new WVPResult<>();
|
||||
wvpResult.setCode(0);
|
||||
wvpResult.setMsg("success");
|
||||
// 对数据进行排序
|
||||
Collections.sort(recordDataCatch.getRecordInfo(deviceId, sn).getRecordList());
|
||||
wvpResult.setData(recordDataCatch.getRecordInfo(deviceId, sn));
|
||||
|
||||
RequestMessage msg = new RequestMessage();
|
||||
msg.setKey(key);
|
||||
msg.setData(wvpResult);
|
||||
deferredResultHolder.invokeAllResult(msg);
|
||||
recordDataCatch.remove(deviceId, sn);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user