优化sip消息,去除自动dialog创建

This commit is contained in:
648540858
2022-09-21 18:18:37 +08:00
parent 1ee56d50d8
commit 710600db6f
59 changed files with 892 additions and 865 deletions

View File

@@ -7,6 +7,4 @@ import javax.sip.DialogState;
*/
public interface ISubscribeTask extends Runnable{
void stop();
DialogState getDialogState();
}

View File

@@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import gov.nist.javax.sip.message.SIPRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
@@ -12,6 +13,8 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.sip.Dialog;
import javax.sip.DialogState;
import javax.sip.ResponseEvent;
import javax.sip.header.ToHeader;
import java.text.ParseException;
import java.util.Timer;
import java.util.TimerTask;
@@ -23,7 +26,7 @@ public class CatalogSubscribeTask implements ISubscribeTask {
private final Logger logger = LoggerFactory.getLogger(CatalogSubscribeTask.class);
private Device device;
private final ISIPCommander sipCommander;
private Dialog dialog;
private SIPRequest request;
private DynamicTask dynamicTask;
@@ -41,24 +44,26 @@ public class CatalogSubscribeTask implements ISubscribeTask {
if (dynamicTask.get(taskKey) != null) {
dynamicTask.stop(taskKey);
}
sipCommander.catalogSubscribe(device, dialog, eventResult -> {
if (eventResult.dialog != null || eventResult.dialog.getState().equals(DialogState.CONFIRMED)) {
dialog = eventResult.dialog;
}
SIPRequest sipRequest = sipCommander.catalogSubscribe(device, request, eventResult -> {
ResponseEvent event = (ResponseEvent) eventResult.event;
if (event.getResponse().getRawContent() != null) {
// 成功
logger.info("[目录订阅]成功: {}", device.getDeviceId());
}else {
// 成功
logger.info("[目录订阅]成功: {}", device.getDeviceId());
// 成功
logger.info("[目录订阅]成功: {}", device.getDeviceId());
ToHeader toHeader = (ToHeader)event.getResponse().getHeader(ToHeader.NAME);
try {
this.request.getToHeader().setTag(toHeader.getTag());
} catch (ParseException e) {
logger.info("[目录订阅]成功: 但为request设置ToTag失败");
this.request = null;
}
},eventResult -> {
dialog = null;
this.request = null;
// 失败
logger.warn("[目录订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
dynamicTask.startDelay(taskKey, CatalogSubscribeTask.this, 2000);
});
if (sipRequest != null) {
this.request = sipRequest;
}
}
@Override
@@ -74,29 +79,19 @@ public class CatalogSubscribeTask implements ISubscribeTask {
if (dynamicTask.get(taskKey) != null) {
dynamicTask.stop(taskKey);
}
if (dialog != null && dialog.getState().equals(DialogState.CONFIRMED)) {
device.setSubscribeCycleForCatalog(0);
sipCommander.catalogSubscribe(device, dialog, eventResult -> {
ResponseEvent event = (ResponseEvent) eventResult.event;
if (event.getResponse().getRawContent() != null) {
// 成功
logger.info("[取消目录订阅订阅]成功: {}", device.getDeviceId());
}else {
// 成功
logger.info("[取消目录订阅订阅]成功: {}", device.getDeviceId());
}
},eventResult -> {
// 失败
logger.warn("[取消目录订阅订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
});
}
}
@Override
public DialogState getDialogState() {
if (dialog == null) {
return null;
}
return dialog.getState();
device.setSubscribeCycleForCatalog(0);
sipCommander.catalogSubscribe(device, request, eventResult -> {
ResponseEvent event = (ResponseEvent) eventResult.event;
if (event.getResponse().getRawContent() != null) {
// 成功
logger.info("[取消目录订阅订阅]成功: {}", device.getDeviceId());
}else {
// 成功
logger.info("[取消目录订阅订阅]成功: {}", device.getDeviceId());
}
},eventResult -> {
// 失败
logger.warn("[取消目录订阅订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
});
}
}

View File

@@ -4,9 +4,11 @@ import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.SpringBeanFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
@@ -20,71 +22,23 @@ import java.util.List;
*/
public class MobilePositionSubscribeHandlerTask implements ISubscribeTask {
private Logger logger = LoggerFactory.getLogger(MobilePositionSubscribeHandlerTask.class);
private IRedisCatchStorage redisCatchStorage;
private IVideoManagerStorage storager;
private ISIPCommanderForPlatform sipCommanderForPlatform;
private SubscribeHolder subscribeHolder;
private ParentPlatform platform;
private IPlatformService platformService;
private String platformId;
private String sn;
private String key;
public MobilePositionSubscribeHandlerTask(IRedisCatchStorage redisCatchStorage,
ISIPCommanderForPlatform sipCommanderForPlatform,
IVideoManagerStorage storager,
String platformId,
String sn,
String key,
SubscribeHolder subscribeInfo,
DynamicTask dynamicTask) {
this.redisCatchStorage = redisCatchStorage;
this.storager = storager;
this.platform = storager.queryParentPlatByServerGBId(platformId);
this.sn = sn;
this.key = key;
this.sipCommanderForPlatform = sipCommanderForPlatform;
this.subscribeHolder = subscribeInfo;
public MobilePositionSubscribeHandlerTask(String platformId) {
this.platformService = SpringBeanFactory.getBean("platformServiceImpl");
this.platformId = platformId;
}
@Override
public void run() {
if (platform == null) {
return;
}
SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platform.getServerGBId());
if (subscribe != null) {
// TODO 暂时只处理视频流的回复,后续增加对国标设备的支持
List<DeviceChannel> gbStreams = storager.queryGbStreamListInPlatform(platform.getServerGBId());
if (gbStreams.size() == 0) {
return;
}
for (DeviceChannel deviceChannel : gbStreams) {
String gbId = deviceChannel.getChannelId();
GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId);
// 无最新位置不发送
if (gpsMsgInfo != null) {
// 经纬度都为0不发送
if (gpsMsgInfo.getLng() == 0 && gpsMsgInfo.getLat() == 0) {
continue;
}
// 发送GPS消息
sipCommanderForPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, subscribe);
}
}
}
platformService.sendNotifyMobilePosition(this.platformId);
}
@Override
public void stop() {
}
@Override
public DialogState getDialogState() {
return null;
}
}

View File

@@ -4,6 +4,8 @@ import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import org.dom4j.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -12,6 +14,8 @@ import org.springframework.scheduling.annotation.Async;
import javax.sip.Dialog;
import javax.sip.DialogState;
import javax.sip.ResponseEvent;
import javax.sip.header.ToHeader;
import java.text.ParseException;
import java.util.Timer;
import java.util.TimerTask;
@@ -21,9 +25,10 @@ import java.util.TimerTask;
*/
public class MobilePositionSubscribeTask implements ISubscribeTask {
private final Logger logger = LoggerFactory.getLogger(MobilePositionSubscribeTask.class);
private Device device;
private ISIPCommander sipCommander;
private Dialog dialog;
private Device device;
private ISIPCommander sipCommander;
private SIPRequest request;
private DynamicTask dynamicTask;
private String taskKey = "mobile-position-subscribe-timeout";
@@ -38,24 +43,26 @@ public class MobilePositionSubscribeTask implements ISubscribeTask {
if (dynamicTask.get(taskKey) != null) {
dynamicTask.stop(taskKey);
}
sipCommander.mobilePositionSubscribe(device, dialog, eventResult -> {
if (eventResult.dialog != null || eventResult.dialog.getState().equals(DialogState.CONFIRMED)) {
dialog = eventResult.dialog;
}
SIPRequest sipRequest = sipCommander.mobilePositionSubscribe(device, request, eventResult -> {
// 成功
logger.info("[移动位置订阅]成功: {}", device.getDeviceId());
ResponseEvent event = (ResponseEvent) eventResult.event;
if (event.getResponse().getRawContent() != null) {
// 成功
logger.info("[移动位置订阅]成功: {}", device.getDeviceId());
}else {
// 成功
logger.info("[移动位置订阅]成功: {}", device.getDeviceId());
ToHeader toHeader = (ToHeader)event.getResponse().getHeader(ToHeader.NAME);
try {
this.request.getToHeader().setTag(toHeader.getTag());
} catch (ParseException e) {
logger.info("[移动位置订阅]成功: 为request设置ToTag失败");
this.request = null;
}
},eventResult -> {
dialog = null;
this.request = null;
// 失败
logger.warn("[移动位置订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
dynamicTask.startDelay(taskKey, MobilePositionSubscribeTask.this, 2000);
});
if (sipRequest != null) {
this.request = sipRequest;
}
}
@@ -71,29 +78,19 @@ public class MobilePositionSubscribeTask implements ISubscribeTask {
if (dynamicTask.get(taskKey) != null) {
dynamicTask.stop(taskKey);
}
if (dialog != null && dialog.getState().equals(DialogState.CONFIRMED)) {
logger.info("取消移动订阅时dialog状态为{}", dialog.getState());
device.setSubscribeCycleForMobilePosition(0);
sipCommander.mobilePositionSubscribe(device, dialog, eventResult -> {
ResponseEvent event = (ResponseEvent) eventResult.event;
if (event.getResponse().getRawContent() != null) {
// 成功
logger.info("[取消移动位置订阅]成功: {}", device.getDeviceId());
}else {
// 成功
logger.info("[取消移动位置订阅]成功: {}", device.getDeviceId());
}
},eventResult -> {
// 失败
logger.warn("[取消移动位置订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
});
}
}
@Override
public DialogState getDialogState() {
if (dialog == null) {
return null;
}
return dialog.getState();
device.setSubscribeCycleForMobilePosition(0);
sipCommander.mobilePositionSubscribe(device, request, eventResult -> {
ResponseEvent event = (ResponseEvent) eventResult.event;
if (event.getResponse().getRawContent() != null) {
// 成功
logger.info("[取消移动位置订阅]成功: {}", device.getDeviceId());
}else {
// 成功
logger.info("[取消移动位置订阅]成功: {}", device.getDeviceId());
}
},eventResult -> {
// 失败
logger.warn("[取消移动位置订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
});
}
}