优化设备订阅的操作方式

This commit is contained in:
648540858
2024-12-30 15:59:15 +08:00
parent 058909197e
commit 0dd2826b9e
10 changed files with 225 additions and 103 deletions

View File

@@ -125,4 +125,5 @@ public interface IDeviceChannelService {
List<DeviceChannel> queryChaneListByDeviceDbId(Integer deviceDbId);
List<Integer> queryChaneIdListByDeviceDbIds(List<Integer> deviceDbId);
}

View File

@@ -161,4 +161,9 @@ public interface IDeviceService {
Device getDeviceByChannelId(Integer channelId);
Device getDeviceBySourceChannelDeviceId(String requesterId);
void subscribeCatalog(int id, int cycle);
void subscribeMobilePosition(int id, int cycle, int interval);
}

View File

@@ -12,7 +12,6 @@ import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper;
import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
@@ -35,6 +34,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
@@ -263,6 +263,9 @@ public class DeviceServiceImpl implements IDeviceService {
@Override
public boolean removeCatalogSubscribe(Device device, CommonCallback<Boolean> callback) {
if (device == null || device.getSubscribeCycleForCatalog() < 0) {
if (callback != null) {
callback.run(false);
}
return false;
}
log.info("[移除目录订阅]: {}", device.getDeviceId());
@@ -272,6 +275,16 @@ public class DeviceServiceImpl implements IDeviceService {
if (runnable instanceof ISubscribeTask) {
ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
subscribeTask.stop(callback);
}else {
log.info("[移除目录订阅]失败,未找到订阅任务 : {}", device.getDeviceId());
if (callback != null) {
callback.run(false);
}
}
}else {
log.info("[移除移动位置订阅]失败,设备已经离线 : {}", device.getDeviceId());
if (callback != null) {
callback.run(false);
}
}
dynamicTask.stop(taskKey);
@@ -297,6 +310,9 @@ public class DeviceServiceImpl implements IDeviceService {
@Override
public boolean removeMobilePositionSubscribe(Device device, CommonCallback<Boolean> callback) {
if (device == null || device.getSubscribeCycleForCatalog() < 0) {
if (callback != null) {
callback.run(false);
}
return false;
}
log.info("[移除移动位置订阅]: {}", device.getDeviceId());
@@ -306,6 +322,16 @@ public class DeviceServiceImpl implements IDeviceService {
if (runnable instanceof ISubscribeTask) {
ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
subscribeTask.stop(callback);
}else {
log.info("[移除移动位置订阅]失败,未找到订阅任务 : {}", device.getDeviceId());
if (callback != null) {
callback.run(false);
}
}
}else {
log.info("[移除移动位置订阅]失败,设备已经离线 : {}", device.getDeviceId());
if (callback != null) {
callback.run(false);
}
}
dynamicTask.stop(taskKey);
@@ -421,64 +447,12 @@ public class DeviceServiceImpl implements IDeviceService {
@Override
public void updateCustomDevice(Device device) {
// 订阅状态的修改使用一个单独方法控制,此处不再进行状态修改
Device deviceInStore = deviceMapper.query(device.getId());
if (deviceInStore == null) {
log.warn("更新设备时未找到设备信息");
return;
}
// 目录订阅相关的信息
if (deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) {
if (device.getSubscribeCycleForCatalog() > 0) {
// 若已开启订阅,但订阅周期不同,则先取消
if (deviceInStore.getSubscribeCycleForCatalog() != 0) {
removeCatalogSubscribe(deviceInStore, result->{
// 开启订阅
deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
addCatalogSubscribe(deviceInStore);
// 因为是异步执行,需要在这里更新下数据
deviceMapper.updateCustom(deviceInStore);
redisCatchStorage.updateDevice(deviceInStore);
});
}else {
// 开启订阅
deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
addCatalogSubscribe(deviceInStore);
}
}else if (device.getSubscribeCycleForCatalog() == 0) {
// 取消订阅
deviceInStore.setSubscribeCycleForCatalog(0);
removeCatalogSubscribe(deviceInStore, null);
}
}
// 移动位置订阅相关的信息
if (deviceInStore.getSubscribeCycleForMobilePosition() != device.getSubscribeCycleForMobilePosition()) {
if (device.getSubscribeCycleForMobilePosition() > 0) {
// 若已开启订阅,但订阅周期不同,则先取消
if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) {
removeMobilePositionSubscribe(deviceInStore, result->{
// 开启订阅
deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
addMobilePositionSubscribe(deviceInStore);
// 因为是异步执行,需要在这里更新下数据
deviceMapper.updateCustom(deviceInStore);
redisCatchStorage.updateDevice(deviceInStore);
});
}else {
// 开启订阅
deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
addMobilePositionSubscribe(deviceInStore);
}
}else if (device.getSubscribeCycleForMobilePosition() == 0) {
// 取消订阅
deviceInStore.setSubscribeCycleForMobilePosition(0);
deviceInStore.setMobilePositionSubmissionInterval(0);
removeMobilePositionSubscribe(deviceInStore, null);
}
}
if (deviceInStore.getGeoCoordSys() != null) {
// 坐标系变化需要重新计算GCJ02坐标和WGS84坐标
if (!deviceInStore.getGeoCoordSys().equals(device.getGeoCoordSys())) {
@@ -547,4 +521,68 @@ public class DeviceServiceImpl implements IDeviceService {
public Device getDeviceBySourceChannelDeviceId(String channelId) {
return deviceMapper.getDeviceBySourceChannelDeviceId(ChannelDataType.GB28181.value,channelId);
}
@Override
public void subscribeCatalog(int id, int cycle) {
Device device = deviceMapper.query(id);
Assert.notNull(device, "未找到设备");
if (device.getSubscribeCycleForCatalog() == cycle) {
return;
}
// 目录订阅相关的信息
if (device.getSubscribeCycleForCatalog() > 0) {
// 订阅周期不同,则先取消
removeCatalogSubscribe(device, result->{
device.setSubscribeCycleForCatalog(cycle);
if (cycle > 0) {
// 开启订阅
addCatalogSubscribe(device);
}
// 因为是异步执行,需要在这里更新下数据
deviceMapper.updateSubscribeCatalog(device);
redisCatchStorage.updateDevice(device);
});
}else {
// 开启订阅
device.setSubscribeCycleForCatalog(cycle);
addCatalogSubscribe(device);
deviceMapper.updateSubscribeCatalog(device);
redisCatchStorage.updateDevice(device);
}
}
@Override
public void subscribeMobilePosition(int id, int cycle, int interval) {
Device device = deviceMapper.query(id);
Assert.notNull(device, "未找到设备");
if (device.getSubscribeCycleForMobilePosition() == cycle) {
return;
}
// 目录订阅相关的信息
if (device.getSubscribeCycleForMobilePosition() > 0) {
// 订阅周期已经开启,则先取消
removeMobilePositionSubscribe(device, result->{
// 开启订阅
device.setSubscribeCycleForMobilePosition(cycle);
device.setMobilePositionSubmissionInterval(interval);
if (cycle > 0) {
addMobilePositionSubscribe(device);
}
// 因为是异步执行,需要在这里更新下数据
deviceMapper.updateSubscribeMobilePosition(device);
redisCatchStorage.updateDevice(device);
});
}else {
// 订阅未开启
device.setSubscribeCycleForMobilePosition(cycle);
device.setMobilePositionSubmissionInterval(interval);
// 开启订阅
addMobilePositionSubscribe(device);
// 因为是异步执行,需要在这里更新下数据
deviceMapper.updateSubscribeMobilePosition(device);
redisCatchStorage.updateDevice(device);
}
}
}