完成向上级联->点播--增加了sdp解析

修复修改平台主键带来的bug
This commit is contained in:
panlinlin
2021-01-15 17:21:02 +08:00
parent cf8a22f50b
commit 503f891c9e
21 changed files with 976 additions and 81 deletions

View File

@@ -119,6 +119,9 @@ public class SIPProcessorFactory {
processor.setRequestEvent(evt);
processor.setTcpSipProvider(getTcpSipProvider());
processor.setUdpSipProvider(getUdpSipProvider());
processor.setCmderFroPlatform(cmderFroPlatform);
processor.setStorager(storager);
return processor;
} else if (Request.REGISTER.equals(method)) {
RegisterRequestProcessor processor = new RegisterRequestProcessor();

View File

@@ -70,7 +70,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
@Override
public boolean unregister(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) {
parentPlatform.setExpires("0");
ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getDeviceGBId());
ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
if (parentPlatformCatch != null) {
parentPlatformCatch.setParentPlatform(parentPlatform);
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
@@ -86,11 +86,21 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
if (www == null ) {
request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, 1L, null, null);
// 将 callid 写入缓存, 等注册成功可以更新状态
CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME);
redisCatchStorage.updatePlatformRegisterInfo(callIdHeader.getCallId(), parentPlatform.getServerGBId());
sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), (event)->{
redisCatchStorage.delPlatformRegisterInfo(callIdHeader.getCallId());
if (errorEvent != null) {
errorEvent.response(event);
}
});
}else {
request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, null, null, callId, www);
}
transmitRequest(parentPlatform, request, errorEvent, okEvent);
transmitRequest(parentPlatform, request, null, okEvent);
return true;
} catch (ParseException e) {
e.printStackTrace();

View File

@@ -1,71 +1,231 @@
package com.genersoft.iot.vmp.gb28181.transmit.request.impl;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.address.SipURI;
import javax.sip.header.ContentTypeHeader;
import javax.sip.header.FromHeader;
import javax.sip.header.SubjectHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.sdp.Codec;
import com.genersoft.iot.vmp.gb28181.sdp.MediaDescription;
import com.genersoft.iot.vmp.gb28181.sdp.SdpParser;
import com.genersoft.iot.vmp.gb28181.sdp.SessionDescription;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import gov.nist.javax.sip.address.AddressImpl;
import gov.nist.javax.sip.address.SipUri;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.ParseException;
import java.util.List;
/**
* @Description:处理INVITE请求
* @author: swwheihei
* @date: 2020年5月3日 下午4:43:52
* @author: panll
* @date: 2021年1月14日
*/
public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
private final static Logger logger = LoggerFactory.getLogger(MessageRequestProcessor.class);
private SIPCommanderFroPlatform cmderFroPlatform;
private IVideoManagerStorager storager;
/**
* 处理invite请求
*
* @param request
* @param evt
* 请求消息
*/
@Override
public void process(RequestEvent evt) {
// TODO 优先级99 Invite Request消息实现此消息一般为级联消息上级给下级发送请求视频指令
// Request request = requestEvent.getRequest();
//
// try {
// // 发送100 Trying
// ServerTransaction serverTransaction = getServerTransaction(requestEvent);
// // 查询目标地址
// URI reqUri = request.getRequestURI();
// URI contactURI = currUser.get(reqUri);
//
// System.out.println("processInvite rqStr=" + reqUri + " contact=" + contactURI);
//
// // 根据Request uri来路由后续的响应消息通过VIA来路由
// Request cliReq = messageFactory.createRequest(request.toString());
// cliReq.setRequestURI(contactURI);
//
// HeaderFactory headerFactory = SipFactory.getInstance().createHeaderFactory();
// Via callerVia = (Via) request.getHeader(Via.NAME);
// Via via = (Via) headerFactory.createViaHeader(SIPMain.ip, SIPMain.port, "UDP",
// callerVia.getBranch() + "sipphone");
//
// cliReq.removeHeader(Via.NAME);
// cliReq.addHeader(via);
//
// // 更新contact的地址
// ContactHeader contactHeader = headerFactory.createContactHeader();
// Address address = SipFactory.getInstance().createAddressFactory()
// .createAddress("sip:sipsoft@" + SIPMain.ip + ":" + SIPMain.port);
// contactHeader.setAddress(address);
// contactHeader.setExpires(3600);
// cliReq.setHeader(contactHeader);
//
// clientTransactionId = sipProvider.getNewClientTransaction(cliReq);
// clientTransactionId.sendRequest();
//
// System.out.println("processInvite clientTransactionId=" + clientTransactionId.toString());
//
// System.out.println("send invite to callee: " + cliReq);
// } catch (TransactionUnavailableException e1) {
// e1.printStackTrace();
// } catch (SipException e) {
// e.printStackTrace();
// } catch (ParseException e) {
// e.printStackTrace();
// } catch (Exception e) {
// e.printStackTrace();
// }
// Invite Request消息实现此消息一般为级联消息上级给下级发送请求视频指令
try {
Request request = evt.getRequest();
SipURI sipURI = (SipURI) request.getRequestURI();
String channelId = sipURI.getUser();
String platformId = null;
// SubjectHeader subjectHeader = (SubjectHeader)request.getHeader(SubjectHeader.NAME);
// // 查询通道是否存在 不存在回复404
// if (subjectHeader != null) { // 存在则从subjectHeader 获取平台信息
// String subject = subjectHeader.getSubject();
// if (subject != null) {
// String[] info1 = subject.split(",");
// if (info1 != null && info1 .length == 2) {
// String[] info2 = info1[1].split(":");
// if (info2 != null && info2.length == 2) {
// platformId = info2[0];
// }
// }
// }
// }
FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME);
AddressImpl address = (AddressImpl) fromHeader.getAddress();
SipUri uri = (SipUri) address.getURI();
platformId = uri.getUser();
// if (platformId == null) { // 不存在则从fromHeader 获取平台信息
// FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME);
// platformId = fromHeader.getName();
// }
if (platformId == null || channelId == null) {
response400Ack(evt); // 参数不全, 发400请求错误
return;
}
// 查询平台下是否有该通道
DeviceChannel channel = storager.queryChannelInParentPlatform(platformId, channelId);
if (channel == null) {
response404Ack(evt); // 通道不存在发404资源不存在
return;
}else {
response100Ack(evt); // 通道存在发100trying
}
// 解析sdp消息
byte[] sdpByteArray = request.getRawContent();
SdpParser sdpParser = new SdpParser(); // TODO keng
SessionDescription sdp = sdpParser.parse(sdpByteArray);
// 获取支持的格式
List<MediaDescription> mediaDescriptions = sdp.getMediaDescriptions();
// 查看是否支持PS 负载96
String ip = null;
int port = -1;
for (MediaDescription mediaDescription : mediaDescriptions) {
List<Codec> codecs = mediaDescription.getCodecs();
for (Codec codec : codecs) {
if("96".equals(codec.getPayloadType()) || "PS".equals(codec.getName()) || "ps".equals(codec.getName())) {
ip = mediaDescription.getIpAddress().getHostName();
port = mediaDescription.getPort();
break;
}
}
}
if (ip == null || port == -1) { // TODO 没有合适的视频流格式, 可配置是否使用第一个media信息
if (mediaDescriptions.size() > 0) {
ip = mediaDescriptions.get(0).getIpAddress().getHostName();
port = mediaDescriptions.get(0).getPort();
}
}
if (ip == null || port == -1) {
response488Ack(evt);
return;
}
String ssrc = sdp.getSsrc();
// 通知下级推流,
// 查找合适的端口推流,
// 发送 200ok
// 收到ack后调用推流接口
} catch (SipException | InvalidArgumentException | ParseException e) {
e.printStackTrace();
} catch (IOException e) {
logger.warn("sdp解析错误");
e.printStackTrace();
}
}
/***
* 回复100 trying
* @param evt
* @throws SipException
* @throws InvalidArgumentException
* @throws ParseException
*/
private void response100Ack(RequestEvent evt) throws SipException, InvalidArgumentException, ParseException {
Response response = getMessageFactory().createResponse(Response.TRYING, evt.getRequest());
getServerTransaction(evt).sendResponse(response);
}
/***
* 回复404
* @param evt
* @throws SipException
* @throws InvalidArgumentException
* @throws ParseException
*/
private void response404Ack(RequestEvent evt) throws SipException, InvalidArgumentException, ParseException {
Response response = getMessageFactory().createResponse(Response.NOT_FOUND, evt.getRequest());
getServerTransaction(evt).sendResponse(response);
}
/***
* 回复400
* @param evt
* @throws SipException
* @throws InvalidArgumentException
* @throws ParseException
*/
private void response400Ack(RequestEvent evt) throws SipException, InvalidArgumentException, ParseException {
Response response = getMessageFactory().createResponse(Response.BAD_REQUEST, evt.getRequest());
getServerTransaction(evt).sendResponse(response);
}
/***
* 回复488
* @param evt
* @throws SipException
* @throws InvalidArgumentException
* @throws ParseException
*/
private void response488Ack(RequestEvent evt) throws SipException, InvalidArgumentException, ParseException {
Response response = getMessageFactory().createResponse(Response.NOT_ACCEPTABLE_HERE, evt.getRequest());
getServerTransaction(evt).sendResponse(response);
}
/***
* 回复200 OK
* @param evt
* @throws SipException
* @throws InvalidArgumentException
* @throws ParseException
*/
private void responseAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException {
Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest());
ContentTypeHeader contentTypeHeader = getHeaderFactory().createContentTypeHeader("APPLICATION", "SDP");
response.setContent(sdp, contentTypeHeader);
getServerTransaction(evt).sendResponse(response);
}
public SIPCommanderFroPlatform getCmderFroPlatform() {
return cmderFroPlatform;
}
public void setCmderFroPlatform(SIPCommanderFroPlatform cmderFroPlatform) {
this.cmderFroPlatform = cmderFroPlatform;
}
public IVideoManagerStorager getStorager() {
return storager;
}
public void setStorager(IVideoManagerStorager storager) {
this.storager = storager;
}
}

View File

@@ -15,6 +15,8 @@ import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.vmanager.platform.bean.ChannelReduce;
import gov.nist.javax.sip.address.AddressImpl;
import gov.nist.javax.sip.address.SipUri;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
@@ -166,10 +168,15 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
Element deviceIdElement = rootElement.element("DeviceID");
String deviceId = deviceIdElement.getText();
Element deviceListElement = rootElement.element("DeviceList");
FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
AddressImpl address = (AddressImpl) fromHeader.getAddress();
SipUri uri = (SipUri) address.getURI();
String platformId = uri.getUser();
// if (deviceListElement == null) { // 存在DeviceList则为响应 catalog 不存在DeviceList则为查询请求
if (name == "Query") { // 区分是Response——查询响应还是Query——查询请求
// TODO 后续将代码拆分
ParentPlatform parentPlatform = storager.queryParentPlatById(deviceId);
ParentPlatform parentPlatform = storager.queryParentPlatById(platformId);
if (parentPlatform == null) {
response404Ack(evt);
return;
@@ -179,9 +186,8 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
Element snElement = rootElement.element("SN");
String sn = snElement.getText();
FromHeader fromHeader = (FromHeader)evt.getRequest().getHeader(FromHeader.NAME);
// 准备回复通道信息
List<ChannelReduce> channelReduces = storager.queryChannelListInParentPlatform(parentPlatform.getDeviceGBId());
List<ChannelReduce> channelReduces = storager.queryChannelListInParentPlatform(parentPlatform.getServerGBId());
if (channelReduces.size() >0 ) {
for (ChannelReduce channelReduce : channelReduces) {
DeviceChannel deviceChannel = storager.queryChannel(channelReduce.getDeviceId(), channelReduce.getChannelId());
@@ -499,7 +505,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
}
/***
* 回复200 OK
* 回复404
* @param evt
* @throws SipException
* @throws InvalidArgumentException

View File

@@ -61,10 +61,14 @@ public class RegisterResponseProcessor implements ISIPResponseProcessor {
public void process(ResponseEvent evt, SipLayer layer, SipConfig config) {
// TODO Auto-generated method stub
Response response = evt.getResponse();
ToHeader toHeader = (ToHeader) response.getHeader(ToHeader.NAME);
SipUri uri = (SipUri)toHeader.getAddress().getURI();
String platformGBId = uri.getAuthority().getUser();
CallIdHeader callIdHeader = (CallIdHeader) response.getHeader(CallIdHeader.NAME);
String callId = callIdHeader.getCallId();
String platformGBId = redisCatchStorage.queryPlatformRegisterInfo(callId);
if (platformGBId == null) {
logger.info(String.format("未找到callId %s 的注册/注销平台id", callId ));
return;
}
logger.info(String.format("收到 %s 的注册/注销%S响应", platformGBId, response.getStatusCode() ));
ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(platformGBId);
@@ -80,18 +84,13 @@ public class RegisterResponseProcessor implements ISIPResponseProcessor {
if (response.getStatusCode() == 401) {
WWWAuthenticateHeader www = (WWWAuthenticateHeader)response.getHeader(WWWAuthenticateHeader.NAME);
CallIdHeader callIdHeader = (CallIdHeader)response.getHeader(CallIdHeader.NAME);
String callId = callIdHeader.getCallId();
sipCommanderForPlatform.register(parentPlatform, callId, www, null, null);
}else if (response.getStatusCode() == 200){
// 注册成功
logger.info(String.format("%s 注册成功", platformGBId ));
redisCatchStorage.delPlatformRegisterInfo(callId);
parentPlatform.setStatus(true);
storager.updateParentPlatform(parentPlatform);
//
redisCatchStorage.updatePlatformRegister(parentPlatform);
redisCatchStorage.updatePlatformKeepalive(parentPlatform);