- sample代码架构
- sample代码实现
- 开发注意事项
4.5.1 sample代码架构
远程调试功能只对机场上云生效,远程调试命令可以分为命令(cmd)和任务(job)。命令(cmd)一般指命令下发后,设备能即刻回复的行为,而任务(job)为任务下发后,设备需要持续动作的行为。云端下发任务(job)给机场进行调试,机场会在响应消息后,在事件通知中持续上报云端该调试任务的执行结果。
4.5.2 sample代码实现
1. 机场执行远程调试指令,需要机场先进入远程调试模式,下发debug_mode_open指令。机场正确返回该指令后,云端才可以下发调试指令。
Topic: thing/product/{gateway_sn}/services
Direction: down
Method: debug_mode_open
在前端点击开启调试模式后,调用后端${url.control.prefix}${url.control.version}/devices
/{sn}/jobs/{service_identifier}进行实现,该方法在示例代码中对应的实现为:DockController#createControlJob。示例代码为:
@PostMapping("/{sn}/jobs/{service_identifier}")
public HttpResultResponse createControlJob(@PathVariable String sn,
@PathVariable("service_identifier") String serviceIdentifier,
@Valid @RequestBody(required = false) RemoteDebugParam param) {
// 调用controlService#controlDockDebug实现远程调试的逻辑
return controlService.controlDockDebug(sn, RemoteDebugMethodEnum.find(serviceIdentifier), param);
}
controlService#controlDockDebug方法实现如下:
@Override
public HttpResultResponse controlDockDebug(String sn, RemoteDebugMethodEnum controlMethodEnum, RemoteDebugParam param) {
DebugMethodEnum methodEnum = controlMethodEnum.getDebugMethodEnum();
RemoteDebugHandler data = checkDebugCondition(sn, param, controlMethodEnum);
// 判断机场是否在线,如果不在线,直接报错
boolean isExist = deviceRedisService.checkDeviceOnline(sn);
if (!isExist) {
return HttpResultResponse.error("The dock is offline.");
}
TopicServicesResponse response;
switch (controlMethodEnum) {
case RETURN_HOME:
// 一键返航的调试指令,通过调用sdk包封装好的mqtt消息发送给机场
response = abstractWaylineService.returnHome(SDKManager.getDeviceSDK(sn));
break;
case RETURN_HOME_CANCEL:
// 取消一键返航的调试指令,通过调用sdk包封装好的mqtt消息发送给机场
response = abstractWaylineService.returnHomeCancel(SDKManager.getDeviceSDK(sn));
break;
default:
// 其他情况,如机场重启的远程调试,将指令参数实例化为对应的DTO后下发给机场
response = abstractDebugService.remoteDebug(SDKManager.getDeviceSDK(sn), methodEnum,
Objects.nonNull(methodEnum.getClazz()) ? mapper.convertValue(data, methodEnum.getClazz()) : null);
}
// 如果发起远程调试指令后,机场返回报错,则返回给前端报错信息
ServicesReplyData serviceReply = (ServicesReplyData) response.getData();
if (!serviceReply.getResult().isSuccess()) {
return HttpResultResponse.error(serviceReply.getResult());
}
return HttpResultResponse.success();
}
2. 任务(job)类型的调试指令,如机场重启指令下发给机场后,机场会通过thing/product/{gateway_sn}/events主题持续上报云端。以机场重启的调试指令为例:
所有mqtt消息都会经过InboundMessageRouter#determineTargetChannels方法,在该方法中找到通过DeviceTopicEnum#find方法找到消息对应的主题,如果是 thing/product/{device_sn}/events 主题的消息会被选择到ChannelName.INBOUND_EVENTS通道中。
public class InboundMessageRouter extends AbstractMessageRouter {
@Override
@Router(inputChannel = ChannelName.INBOUND)
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
// 获取消息头
MessageHeaders headers = message.getHeaders();
String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString();
byte[] payload = (byte[])message.getPayload();
log.debug("received topic :{} \t payload :{}", topic, new String(payload));
// 根据消息topic不同,将消息转发到不同的channel中。
// thing/product/{gateway_sn}/events会被转发到ChannelName.INBOUND_EVENTS通道中。
DeviceTopicEnum topicEnum = DeviceTopicEnum.find(topic);
MessageChannel bean = (MessageChannel) SpringBeanUtils.getBean(topicEnum.getBeanName());
return Collections.singleton(bean);
}
}
消息被转发到ChannelName.INBOUND_EVENTS中后,通过
EventsRouter#stateDataRouterFlow方法中定义的流程处理
@Bean
public IntegrationFlow eventsMethodRouterFlow() {
return IntegrationFlows
// 处理ChannelName.INBOUND_EVENT中的消息
.from(ChannelName.INBOUND_EVENTS)
.transform(Message.class, source -> {
try {
// 将上报的消息序列化为Request实体
TopicEventsRequest data = Common.getObjectMapper().readValue((byte[]) source.getPayload(), TopicEventsRequest.class);
String topic = String.valueOf(source.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
// 设置上报消息的sn
return data.setFrom(topic.substring((THING_MODEL_PRE + PRODUCT).length(), topic.indexOf(EVENTS_SUF)))
.setData(Common.getObjectMapper().convertValue(data.getData(), EventsMethodEnum.find(data.getMethod()).getClassType()));
} catch (IOException e) {
throw new CloudSDKException(e);
}
}, null)
.<TopicEventsRequest, EventsMethodEnum>route(
// 机场重启的消息经过该方法会将消息推送给ChannelName.INBOUND_EVENTS_CONTROL_PROGRESS通道处理
response -> EventsMethodEnum.find(response.getMethod()),
mapping -> Arrays.stream(EventsMethodEnum.values()).forEach(
methodEnum -> mapping.channelMapping(methodEnum, methodEnum.getChannelName())))
.get();
}
ChannelName.INBOUND_EVENTS_CONTROL_PROGRESS的消息会交给AbstractDebugService#remoteDebugProgress处理,在sdk中默认实现为抛出异常信息,开发者需要定义
.AbstractDebugService的实现类,实现remoteDebugProgress方法。示例代码中有默认实现:
SDKRemoteDebug#remoteDebugProgress
@Override
public TopicEventsResponse<MqttReply> remoteDebugProgress(TopicEventsRequest<EventsDataRequest<RemoteDebugProgress>> request, MessageHeaders headers) {
// 消息发送的网关sn
String sn = request.getGateway();
// 将上报的消息转化为另一个实体
EventsReceiver<RemoteDebugProgress> eventsReceiver = new EventsReceiver<RemoteDebugProgress>()
.setOutput(request.getData().getOutput()).setResult(request.getData().getResult());
eventsReceiver.setBid(request.getBid());
eventsReceiver.setSn(sn);
//
log.info("SN: {}, {} ===> Control progress: {}", sn, request.getMethod(), eventsReceiver.getOutput().getProgress());
if (!eventsReceiver.getResult().isSuccess()) {
log.error("SN: {}, {} ===> Error: {}", sn, request.getMethod(), eventsReceiver.getResult());
}
// 查询网关设备是否在线,如果不在线抛出异常
Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(sn);
if (deviceOpt.isEmpty()) {
throw new RuntimeException("The device is offline.");
}
DeviceDTO device = deviceOpt.get();
// 使用websocket连接,通过前端任务执行进度
webSocketMessageService.sendBatch(device.getWorkspaceId(), UserTypeEnum.WEB.getVal(),
request.getMethod(), eventsReceiver);
return new TopicEventsResponse<MqttReply>().setData(MqttReply.success());
}
AbstractDebugService#remoteDebugProgress处理后的消息会交给
ChannelName.OUTBOUND_EVENTS通道处理,最终交给EventsRouter#replySuccessEvents处理并回复机场。
@Bean
public IntegrationFlow replySuccessEvents() {
return IntegrationFlows
.from(ChannelName.OUTBOUND_EVENTS)
.handle(this::publish)
.nullChannel();
}
private TopicEventsResponse publish(TopicEventsResponse request, MessageHeaders headers) {
if (Objects.isNull(request) || Objects.isNull(request.getData())) {
return null;
}
gatewayPublish.publishReply(request, headers);
return request;
}
评论
0 条评论
请登录写评论。