去除对redis key过期事件的使用;重构国标级联的注册保活

This commit is contained in:
648540858
2022-08-31 11:29:13 +08:00
parent d47902bdca
commit 2de4c322f6
45 changed files with 513 additions and 693 deletions

View File

@@ -15,7 +15,7 @@ public interface ISIPCommanderForPlatform {
* @return
*/
boolean register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent);
boolean register(ParentPlatform parentPlatform, String callId, WWWAuthenticateHeader www, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain);
boolean register(ParentPlatform parentPlatform, String callId, WWWAuthenticateHeader www, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain, boolean isRegister);
/**
* 向上级平台注销
@@ -30,7 +30,7 @@ public interface ISIPCommanderForPlatform {
* @param parentPlatform
* @return callId(作为接受回复的判定)
*/
String keepalive(ParentPlatform parentPlatform);
String keepalive(ParentPlatform parentPlatform,SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent);
/**

View File

@@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.utils.HeaderUtils;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import gov.nist.javax.sip.message.MessageFactoryImpl;
import org.springframework.beans.factory.annotation.Autowired;
@@ -75,7 +76,7 @@ public class SIPRequestHeaderPlarformProvider {
}
public Request createRegisterRequest(@NotNull ParentPlatform platform, long CSeq, String fromTag, String viaTag, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException {
public Request createRegisterRequest(@NotNull ParentPlatform platform, long CSeq, String fromTag, String viaTag, CallIdHeader callIdHeader, boolean isRegister) throws ParseException, InvalidArgumentException, PeerUnavailableException {
Request request = null;
String sipAddress = sipConfig.getIp() + ":" + sipConfig.getPort();
//请求行
@@ -109,18 +110,20 @@ public class SIPRequestHeaderPlarformProvider {
.createSipURI(platform.getDeviceGBId(), sipAddress));
request.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
ExpiresHeader expires = sipFactory.createHeaderFactory().createExpiresHeader(Integer.parseInt(platform.getExpires()));
ExpiresHeader expires = sipFactory.createHeaderFactory().createExpiresHeader(isRegister ? platform.getExpires() : 0);
request.addHeader(expires);
UserAgentHeader userAgentHeader = HeaderUtils.createUserAgentHeader(sipFactory);
request.addHeader(userAgentHeader);
return request;
}
public Request createRegisterRequest(@NotNull ParentPlatform parentPlatform, String fromTag, String viaTag,
String callId, WWWAuthenticateHeader www , CallIdHeader callIdHeader) throws ParseException, PeerUnavailableException, InvalidArgumentException {
String callId, WWWAuthenticateHeader www , CallIdHeader callIdHeader, boolean isRegister) throws ParseException, PeerUnavailableException, InvalidArgumentException {
Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(), fromTag, viaTag, callIdHeader);
Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(), fromTag, viaTag, callIdHeader, isRegister);
SipURI requestURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP() + ":" + parentPlatform.getServerPort());
if (www == null) {
AuthorizationHeader authorizationHeader = sipFactory.createHeaderFactory().createAuthorizationHeader("Digest");

View File

@@ -12,6 +12,7 @@ import javax.sip.message.Request;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.utils.HeaderUtils;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import gov.nist.javax.sip.SipProviderImpl;
import gov.nist.javax.sip.SipStackImpl;
@@ -266,15 +267,7 @@ public class SIPRequestHeaderProvider {
Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory()
.createSipURI(sipConfig.getId(), sipConfig.getIp() + ":" + sipConfig.getPort()));
infoRequest.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
List<String> agentParam = new ArrayList<>();
agentParam.add("wvp-pro");
// TODO 添加版本信息以及日期
UserAgentHeader userAgentHeader = null;
try {
userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam);
} catch (ParseException e) {
throw new RuntimeException(e);
}
UserAgentHeader userAgentHeader = HeaderUtils.createUserAgentHeader(sipFactory);
infoRequest.addHeader(userAgentHeader);
ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application",

View File

@@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider;
import com.genersoft.iot.vmp.gb28181.utils.HeaderUtils;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.HookType;
@@ -740,15 +741,7 @@ public class SIPCommander implements ISIPCommander {
// 增加Contact header
Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getIp()+":"+sipConfig.getPort()));
byeRequest.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
List<String> agentParam = new ArrayList<>();
agentParam.add("wvp-pro");
// TODO 添加版本信息以及日期
UserAgentHeader userAgentHeader = null;
try {
userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam);
} catch (ParseException e) {
throw new RuntimeException(e);
}
UserAgentHeader userAgentHeader = HeaderUtils.createUserAgentHeader(sipFactory);
byeRequest.addHeader(userAgentHeader);
ClientTransaction clientTransaction = null;
if("TCP".equals(protocol)) {
@@ -1677,14 +1670,11 @@ public class SIPCommander implements ISIPCommander {
clientTransaction = udpSipProvider.getNewClientTransaction(request);
}
if (request.getHeader(UserAgentHeader.NAME) == null) {
List<String> agentParam = new ArrayList<>();
agentParam.add("wvp-pro");
// TODO 添加版本信息以及日期
UserAgentHeader userAgentHeader = null;
try {
userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam);
userAgentHeader = HeaderUtils.createUserAgentHeader(sipFactory);
} catch (ParseException e) {
throw new RuntimeException(e);
logger.error("添加UserAgentHeader失败", e);
}
request.addHeader(userAgentHeader);
}

View File

@@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider;
import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
@@ -75,28 +76,21 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
@Override
public boolean register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) {
return register(parentPlatform, null, null, errorEvent, okEvent, false);
return register(parentPlatform, null, null, errorEvent, okEvent, false, true);
}
@Override
public boolean unregister(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) {
ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
parentPlatform.setExpires("0");
if (parentPlatformCatch != null) {
parentPlatformCatch.setParentPlatform(parentPlatform);
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
}
return register(parentPlatform, null, null, errorEvent, okEvent, false);
return register(parentPlatform, null, null, errorEvent, okEvent, false, false);
}
@Override
public boolean register(ParentPlatform parentPlatform, @Nullable String callId, @Nullable WWWAuthenticateHeader www,
SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain) {
SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain, boolean isRegister) {
try {
Request request;
String tm = Long.toString(System.currentTimeMillis());
if (!registerAgain ) {
// //callid
CallIdHeader callIdHeader = null;
if(parentPlatform.getTransport().equals("TCP")) {
callIdHeader = tcpSipProvider.getNewCallId();
@@ -107,10 +101,10 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform,
redisCatchStorage.getCSEQ(), "FromRegister" + tm,
"z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""), callIdHeader);
"z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""), callIdHeader, isRegister);
// 将 callid 写入缓存, 等注册成功可以更新状态
String callIdFromHeader = callIdHeader.getCallId();
redisCatchStorage.updatePlatformRegisterInfo(callIdFromHeader, parentPlatform.getServerGBId());
redisCatchStorage.updatePlatformRegisterInfo(callIdFromHeader, PlatformRegisterInfo.getInstance(parentPlatform.getServerGBId(), isRegister));
sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), (event)->{
if (event != null) {
@@ -127,7 +121,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
}else {
CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId();
request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, "FromRegister" + tm, null, callId, www, callIdHeader);
request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, "FromRegister" + tm, null, callId, www, callIdHeader, isRegister);
}
transmitRequest(parentPlatform, request, null, okEvent);
@@ -145,7 +139,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
}
@Override
public String keepalive(ParentPlatform parentPlatform) {
public String keepalive(ParentPlatform parentPlatform,SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) {
String callId = null;
try {
String characterSet = parentPlatform.getCharacterSet();
@@ -168,7 +162,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
UUID.randomUUID().toString().replace("-", ""),
null,
callIdHeader);
transmitRequest(parentPlatform, request);
transmitRequest(parentPlatform, request, errorEvent, okEvent);
callId = callIdHeader.getCallId();
} catch (ParseException | InvalidArgumentException | SipException e) {
e.printStackTrace();

View File

@@ -59,6 +59,9 @@ public abstract class SIPRequestProcessorParent {
public ServerTransaction getServerTransaction(RequestEvent evt) {
Request request = evt.getRequest();
ServerTransaction serverTransaction = evt.getServerTransaction();
if (serverTransaction != null) {
System.out.println(serverTransaction.getState().toString());
}
// 判断TCP还是UDP
boolean isTcp = false;
ViaHeader reqViaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME);
@@ -86,6 +89,8 @@ public abstract class SIPRequestProcessorParent {
logger.error(e.getMessage());
} catch (TransactionUnavailableException e) {
logger.error(e.getMessage());
}finally {
}
}
return serverTransaction;
@@ -182,6 +187,10 @@ public abstract class SIPRequestProcessorParent {
sipFactory.createAddressFactory().createSipURI(sipURI.getUser(), sipURI.getHost()+":"+sipURI.getPort()
));
response.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
ServerTransaction serverTransaction = getServerTransaction(evt);
if (serverTransaction == null) {
}
getServerTransaction(evt).sendResponse(response);
}

View File

@@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract;
import com.genersoft.iot.vmp.gb28181.utils.HeaderUtils;
import gov.nist.javax.sip.ResponseEventExt;
import gov.nist.javax.sip.message.SIPResponse;
import gov.nist.javax.sip.stack.SIPDialog;
@@ -103,15 +104,7 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract {
}
requestURI.setPort(event.getRemotePort());
reqAck.setRequestURI(requestURI);
List<String> agentParam = new ArrayList<>();
agentParam.add("wvp-pro");
// TODO 添加版本信息以及日期
UserAgentHeader userAgentHeader = null;
try {
userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam);
} catch (ParseException e) {
throw new RuntimeException(e);
}
UserAgentHeader userAgentHeader = HeaderUtils.createUserAgentHeader(sipFactory);
reqAck.addHeader(userAgentHeader);
Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getIp()+":"+sipConfig.getPort()));
reqAck.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));

View File

@@ -6,8 +6,10 @@ import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract;
import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -44,6 +46,9 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract {
@Autowired
private SubscribeHolder subscribeHolder;
@Autowired
private IPlatformService platformService;
@Override
public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅
@@ -60,48 +65,39 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract {
Response response = evt.getResponse();
CallIdHeader callIdHeader = (CallIdHeader) response.getHeader(CallIdHeader.NAME);
String callId = callIdHeader.getCallId();
String platformGBId = redisCatchStorage.queryPlatformRegisterInfo(callId);
if (platformGBId == null) {
logger.info(String.format("未找到callId %s 的注册/注销平台id", callId ));
PlatformRegisterInfo platformRegisterInfo = redisCatchStorage.queryPlatformRegisterInfo(callId);
if (platformRegisterInfo == null) {
logger.info(String.format("[国标级联]未找到callId %s 的注册/注销平台id", callId ));
return;
}
ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(platformGBId);
ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(platformRegisterInfo.getPlatformId());
if (parentPlatformCatch == null) {
logger.warn(String.format("[收到注册/注销%S请求]平台:%s但是平台缓存信息未查询到!!!", response.getStatusCode(),platformGBId));
logger.warn(String.format("[国标级联]收到注册/注销%S请求平台:%s但是平台缓存信息未查询到!!!", response.getStatusCode(),platformRegisterInfo.getPlatformId()));
return;
}
String action = parentPlatformCatch.getParentPlatform().getExpires().equals("0") ? "注销" : "注册";
logger.info(String.format("[%s %S响应]%s ", action, response.getStatusCode(), platformGBId ));
String action = platformRegisterInfo.isRegister() ? "注册" : "注销";
logger.info(String.format("[国标级联]%s %S响应,%s ", action, response.getStatusCode(), platformRegisterInfo.getPlatformId() ));
ParentPlatform parentPlatform = parentPlatformCatch.getParentPlatform();
if (parentPlatform == null) {
logger.warn(String.format("收到 %s %s的%S请求, 但是平台信息未查询到!!!", platformGBId, action, response.getStatusCode()));
logger.warn(String.format("[国标级联]收到 %s %s的%S请求, 但是平台信息未查询到!!!", platformRegisterInfo.getPlatformId(), action, response.getStatusCode()));
return;
}
if (response.getStatusCode() == 401) {
if (response.getStatusCode() == Response.UNAUTHORIZED) {
WWWAuthenticateHeader www = (WWWAuthenticateHeader)response.getHeader(WWWAuthenticateHeader.NAME);
sipCommanderForPlatform.register(parentPlatform, callId, www, null, null, true);
}else if (response.getStatusCode() == 200){
// 注册/注销成功
logger.info(String.format("%s %s成功", platformGBId, action));
sipCommanderForPlatform.register(parentPlatform, callId, www, null, null, true, platformRegisterInfo.isRegister());
}else if (response.getStatusCode() == Response.OK){
if (platformRegisterInfo.isRegister()) {
platformService.online(parentPlatform);
}else {
platformService.offline(parentPlatform);
}
// 注册/注销成功移除缓存的信息
redisCatchStorage.delPlatformRegisterInfo(callId);
redisCatchStorage.delPlatformCatchInfo(platformGBId);
// 取回Expires设置避免注销过程中被置为0
ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId);
if (parentPlatformTmp != null) {
parentPlatformTmp.setStatus("注册".equals(action));
redisCatchStorage.updatePlatformRegister(parentPlatformTmp);
redisCatchStorage.updatePlatformKeepalive(parentPlatformTmp);
parentPlatformCatch.setParentPlatform(parentPlatformTmp);
}
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
storager.updateParentPlatformStatus(platformGBId, "注册".equals(action));
if ("注销".equals(action)) {
subscribeHolder.removeCatalogSubscribe(platformGBId);
subscribeHolder.removeMobilePositionSubscribe(platformGBId);
}
}
}