- sample代码架构
- sample代码实现
- mqtt消息处理
- 业务处理
- 开发注意事项
4.1.1 sample代码架构
sample对设备上云消息(sys/product/{gateway_sn}/status)的处理主要包含两部分处理:
- mqtt消息处理:对mqtt消息管理,将mqtt进行序列化、分派到不同通道(Channel)处理等操作,转化为符合业务处理的Java实体类。经过不同Channel处理后,再流经对应的业务处理中。
- 业务处理:设备上下线后的在线状态等处理
sample对设备上云的整体处理流程如下图所示,在后续章节我们将详细介绍每部分的代码和处理。
4.1.2 sample代码实现
mqtt消息实现
1. 所有mqtt消息都会经过InboundMessageRouter#determineTargetChannels方法,在该方法中找到通过DeviceTopicEnum#find方法找到消息对应的主题,如是上线的消息会被选择到ChannelName.INBOUND_STATUS通道中。
public class InboundMessageRouter extends AbstractMessageRouter {
@Override
@Router(inputChannel = ChannelName.INBOUND)
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
// 获取消息头
MessageHeaders headers = message.getHeaders();
String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString();
byte[] payload = (byte[])message.getPayload();
log.debug("received topic :{} \t payload :{}", topic, new String(payload));
// 根据消息topic不同,将消息转发到不同的channel中。
// 上线消息sys/product/{gateway_sn}/status会被转发到ChannelName.INBOUND_STATUS通道中。
DeviceTopicEnum topicEnum = DeviceTopicEnum.find(topic);
MessageChannel bean = (MessageChannel) SpringBeanUtils.getBean(topicEnum.getBeanName());
return Collections.singleton(bean);
}
}
2. 上线消息被转发到ChannelName.INBOUND_STATUS中后,通过StatusRouter#statusRouterFlow方法中定义的流程处理,会将上线消息和下线消息交给不同的channel进行处理。
public IntegrationFlow statusRouterFlow() {
return IntegrationFlows
// 处理ChannelName.INBOUND_STATUS中的消息
.from(ChannelName.INBOUND_STATUS)
// 将消息转换为TopicStatusRequest<UpdateTopo>的Java实体消息
.transform(Message.class, source -> {
try {
TopicStatusRequest<UpdateTopo> response = Common.getObjectMapper().readValue((byte[]) source.getPayload(), new TypeReference<TopicStatusRequest<UpdateTopo>>() {});
String topic = String.valueOf(source.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
return response.setFrom(topic.substring((BASIC_PRE + PRODUCT).length(), topic.indexOf(STATUS_SUF)));
} catch (IOException e) {
throw new CloudSDKException(e);
}
}, null)
// 通过消息中的subDevices字段进行判断
// 如果subDevices为空,则将消息交给ChannelName.INBOUND_STATUS_OFFLINE通道处理
// 如果subDevices不为空,则将消息交给ChannelName.INBOUND_STATUS_ONLINE通道处理
.<TopicStatusRequest<UpdateTopo>, Boolean>route(
response -> Optional.ofNullable(response.getData()).map(UpdateTopo::getSubDevices).map(CollectionUtils::isEmpty).orElse(true),
mapping -> mapping.channelMapping(true, ChannelName.INBOUND_STATUS_OFFLINE)
.channelMapping(false, ChannelName.INBOUND_STATUS_ONLINE))
.get();
}
3. 上下线的消息经过业务处理后,会被转发到ChannelName.OUTBOUND_STATUS通道中,在StatusRouter#replySuccessStatus方法中会被定义的流程进行处理。然后回复sys/product/{gateway_sn}/status_reply消息
public IntegrationFlow replySuccessStatus() {
return IntegrationFlows
// 接收ChannelName.OUTBOUND_STATUS通道中的消息
.from(ChannelName.OUTBOUND_STATUS)
// 通过以下的publish方法进行处理
.handle(this::publish)
.nullChannel();
}
private TopicStatusResponse publish(TopicStatusResponse request, MessageHeaders headers) {
if (Objects.isNull(request)) {
return null;
}
// 回复sys/product/{gateway_sn}/status_reply主题消息
gatewayPublish.publishReply(request, headers);
return request;
}
业务处理
设备上线
1. 设备上线的消息通过ChannelName.INBOUND_STATUS_ONLINE通道会传输给AbstractDeviceService#updateTopoOnline方法进行处理。如果开发需要自定义上线逻辑,如在设备上线时,将设备上线状态保存,即可定义业务实现类继承AbstractDeviceService实现updateTopoOnline方法。sample的实现是在SDKDeviceService类中,以下是对sample实现updateTopoOnline方法的说明:
public TopicStatusResponse<MqttReply> updateTopoOnline(TopicStatusRequest<UpdateTopo> request, MessageHeaders headers) {
// 获取上线的子设备(飞行器)
UpdateTopoSubDevice updateTopoSubDevice = request.getData().getSubDevices().get(0);
// 获取飞行器设备的sn号
String deviceSn = updateTopoSubDevice.getSn();
// 在Redis中查找飞行器和网关设备(如果之前已经上线,会将它们存放到Redis中,防止重复上线)
Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(deviceSn);
Optional<DeviceDTO> gatewayOpt = deviceRedisService.getDeviceOnline(request.getFrom());
// 查询网关设备和飞行器的数据信息
GatewayManager gatewayManager = SDKManager.registerDevice(request.getFrom(), deviceSn,
request.getData().getDomain(), request.getData().getType(),
request.getData().getSubType(), request.getData().getThingVersion(), updateTopoSubDevice.getThingVersion());
if (deviceOpt.isPresent() && gatewayOpt.isPresent()) {
// 设备再次上线
deviceOnlineAgain(deviceOpt.get().getWorkspaceId(), request.getFrom(), deviceSn);
return new TopicStatusResponse<MqttReply>().setData(MqttReply.success());
}
// 飞行器可能之前绑定在其他的网关设备上,通过这个方法将飞行器的网关设备绑定到当前网关设备
changeSubDeviceParent(deviceSn, request.getFrom());
// 将网关设备保存到数据库,并在Redis的在线设备中添加上当前网关设备
DeviceDTO gateway = deviceGatewayConvertToDevice(request.getFrom(), request.getData());
Optional<DeviceDTO> gatewayEntityOpt = onlineSaveDevice(gateway, deviceSn, null);
// 如果保存失败,那么云端有bug,返回上线失败
if (gatewayEntityOpt.isEmpty()) {
log.error("Failed to go online, please check the status data or code logic.");
return null;
}
// 将飞行器保存到数据库,并在Redis的在线设备中添加上当前飞行器
DeviceDTO subDevice = subDeviceConvertToDevice(updateTopoSubDevice);
Optional<DeviceDTO> subDeviceEntityOpt = onlineSaveDevice(subDevice, null, gateway.getDeviceSn());
if (subDeviceEntityOpt.isEmpty()) {
log.error("Failed to go online, please check the status data or code logic.");
return null;
}
subDevice = subDeviceEntityOpt.get();
gateway = gatewayEntityOpt.get();
// 如果网关设备是机场,直接将飞行器绑定到机场的工作空间中
dockGoOnline(gateway, subDevice);
// 订阅网关设备上线后的主题,如osd、services、services_reply主题等
deviceService.gatewayOnlineSubscribeTopic(gatewayManager);
// 如果子设备没有绑定工作空间,那么就直接回复设备上线消息。不会走后面的通过websocket连接通知其他设备,设备上线了
if (!StringUtils.hasText(subDevice.getWorkspaceId())) {
return new TopicStatusResponse<MqttReply>().setData(MqttReply.success());
}
// 订阅子设备上线后的主题,如osd、services、services_reply主题等
deviceService.subDeviceOnlineSubscribeTopic(gatewayManager);
// 发布设备上线消息,通过websocket连接通知其他设备更新设备拓扑
deviceService.pushDeviceOnlineTopo(gateway.getWorkspaceId(), gateway.getDeviceSn(), subDevice.getDeviceSn());
deviceRedisService.setDeviceOnline(subDevice);
log.debug("{} online.", subDevice.getDeviceSn());
return new TopicStatusResponse<MqttReply>().setData(MqttReply.success());
}
2. 在AbstractDeviceService#updateTopoOnline方法中的@ServiceActivator注解中的outputChannel = ChannelName.OUTBOUND_STATUS说明,该方法的返回值会作为消息被传送给ChannelName.OUTBOUND_STATUS方法中进行回复设备。
@ServiceActivator(inputChannel = ChannelName.INBOUND_STATUS_ONLINE, outputChannel = ChannelName.OUTBOUND_STATUS)
public TopicStatusResponse<MqttReply> updateTopoOnline(TopicStatusRequest<UpdateTopo> request, MessageHeaders headers) {
throw new UnsupportedOperationException("updateTopoOnline not implemented");
}
设备下线
1. 设备下线的消息通过ChannelName.INBOUND_STATUS_ONLINE通道会传输给AbstractDeviceService#
public TopicStatusResponse<MqttReply> updateTopoOffline(TopicStatusRequest<UpdateTopo> request, MessageHeaders headers) {
// 获取设备信息
GatewayManager gatewayManager = SDKManager.registerDevice(request.getFrom(), null,
request.getData().getDomain(), request.getData().getType(),
request.getData().getSubType(), request.getData().getThingVersion(), null);
// 网关设备在线,子设备不在线。云端也需要订阅网关设备上线的主题,如osd、services、services_reply等主题
deviceService.gatewayOnlineSubscribeTopic(gatewayManager);
// 如果是网关设备已经上线过(在Redis中能查找到设备消息),就不需要重复处理消息。
Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(request.getFrom());
if (deviceOpt.isEmpty()) {
// 将网关设备保存到数据库,已经在线设备添加上网关设备
DeviceDTO gatewayDevice = deviceGatewayConvertToDevice(request.getFrom(), request.getData());
Optional<DeviceDTO> gatewayDeviceOpt = onlineSaveDevice(gatewayDevice, null, null);
// 如果保存设备失败,证明云端有bug,不回复设备上线消息
if (gatewayDeviceOpt.isEmpty()) {
return null;
}
// 向所有websocket连接推送网关设备上线的消息,让它们更新拓扑
deviceService.pushDeviceOnlineTopo(gatewayDeviceOpt.get().getWorkspaceId(), request.getFrom(), null);
return new TopicStatusResponse<MqttReply>().setData(MqttReply.success());
}
// 如果网关设备没有子设备,证明飞行器还没有开机,不是飞行器下线,直接返回
String deviceSn = deviceOpt.get().getChildDeviceSn();
if (!StringUtils.hasText(deviceSn)) {
return new TopicStatusResponse<MqttReply>().setData(MqttReply.success());
}
// 云端处理子设备下线,取消订阅飞行器的主题,如osd、services、services_reply等主题
deviceService.subDeviceOffline(deviceSn);
// 回复设备下线消息
return new TopicStatusResponse<MqttReply>().setData(MqttReply.success());
}
2. 在AbstractDeviceService#updateTopoOffline方法中的@ServiceActivator注解中的outputChannel = ChannelName.OUTBOUND_STATUS说明,该方法的返回值会作为消息被传送给ChannelName.OUTBOUND_STATUS方法中进行回复设备。
@ServiceActivator(inputChannel = ChannelName.INBOUND_STATUS_OFFLINE, outputChannel = ChannelName.OUTBOUND_STATUS)
public TopicStatusResponse<MqttReply> updateTopoOffline(TopicStatusRequest<UpdateTopo> request, MessageHeaders headers) {
throw new UnsupportedOperationException("updateTopoOffline not implemented");
}
4.1.3 开发主题事项
1. sample后端会启动一个进程,定时检查设备的在线状态,如果长时间没有收到设备上报的osd信息,会将设备进行下线处理。具体逻辑在GlobalScheduleService#deviceStatusListen方法中。
// 每30s进行一次检查
@Scheduled(initialDelay = 10, fixedRate = 30, timeUnit = TimeUnit.SECONDS)
private void deviceStatusListen() {
int start = RedisConst.DEVICE_ONLINE_PREFIX.length();
// 获取到所有的在线设备
RedisOpsUtils.getAllKeys(RedisConst.DEVICE_ONLINE_PREFIX + "*").forEach(key -> {
long expire = RedisOpsUtils.getExpire(key);
// 如果过期时间小于等于30s
if (expire <= 30) {
DeviceDTO device = (DeviceDTO) RedisOpsUtils.get(key);
if (null == device) {
return;
}
if (DeviceDomainEnum.DRONE == device.getDomain()) {
// 如果是飞行器,就走飞行器下线的逻辑
deviceService.subDeviceOffline(key.substring(start));
} else {
// 如果是网关设备,就走网关设备的下线逻辑
deviceService.gatewayOffline(key.substring(start));
}
// 删除已下线的设备
RedisOpsUtils.del(key);
}
});
log.info("Subscriptions: {}", Arrays.toString(topicService.getSubscribedTopic()));
}
评论
0 条评论
请登录写评论。