- sample代码架构
- sample代码实现
- 开发注意事项
4.6.1 sample代码架构
远程日志功能只在机场上支持,Pilot2上云不支持。sample对远程日志的功能实现主要包含如下步骤:
- 前端调用接口获取指定设备的日志文件列表,后端收到请求后,下发请求设备日志文件列表的mqtt消息给机场,机场收到后,在回复的消息中返回设备的日志文件列表。
- 前端选中需要上传的远程日志,并将发起日志上传的请求发送到后端,后端收到请求上传日志文件的消息后,调用开启日志文件上传的mqtt消息给机场;机场收到消息后,返回云端调用结果。
- 日志文件上传是一个持续时间比较长的动作,机场会通过 thing/product/{gateway_sn}/events 主题持续上报文件上传进度。
4.6.2 sample代码实现
1. 前端通过 GET ${url.manage.prefix}${url.manage.version}/workspaces/{workspace_id}/devices/{device_sn}/logs请求后端日志文件列表。对应后端代码接口为DeviceLogsController#getLogsBySn:
@GetMapping("/{workspace_id}/devices/{device_sn}/logs")
public HttpResultResponse getLogsBySn(@PathVariable("workspace_id") String workspaceId,
@PathVariable("device_sn") String deviceSn,
DeviceLogsGetParam param) {
// 调用deviceLogsService#getRealTimeLogs获取实时的日志列表
return deviceLogsService.getRealTimeLogs(deviceSn, param.getDomainList());
}
deviceLogsService#getRealTimeLogs实现如下:
@Override
public HttpResultResponse getRealTimeLogs(String deviceSn, List<LogModuleEnum> domainList) {
// 检查设备是否在线,设备不在线无法拉取日志
boolean exist = deviceRedisService.checkDeviceOnline(deviceSn);
if (!exist) {
return HttpResultResponse.error("Device is offline.");
}
// 调用sdk包封装好的获取日志列表接口abstractLogService#fileuploadList。
// 该接口中会发送mqtt消息给对应设备
TopicServicesResponse<ServicesReplyData<FileUploadListResponse>> response = abstractLogService
.fileuploadList(SDKManager.getDeviceSDK(deviceSn), new FileUploadListRequest().setModuleList(domainList));
// 如果获取到的device_sn为空,后端进行填充
for (FileUploadListFile file : response.getData().getOutput().getFiles()) {
if (file.getDeviceSn().isBlank()) {
file.setDeviceSn(deviceSn);
}
}
return HttpResultResponse.success(response.getData().getOutput());
}
abstractLogService#fileuploadList:
public TopicServicesResponse<ServicesReplyData<FileUploadListResponse>> fileuploadList(GatewayManager gateway, FileUploadListRequest request) {
// 发送mqtt消息给对应设备获取日志文件列表
return servicesPublish.publish(
new TypeReference<FileUploadListResponse>() {},
gateway.getGatewaySn(),
LogMethodEnum.FILE_UPLOAD_LIST.getMethod(),
request);
}
2. 前端选择需要上传的日志,通过Post ${url.manage.prefix}${url.manage.version}/workspaces/{workspace_id}/devices/{device_sn}/logs请求后端开启日志文件上传,对应后端的接口为DeviceLogsController#uploadLogs:
@PostMapping("/{workspace_id}/devices/{device_sn}/logs")
public HttpResultResponse uploadLogs(@PathVariable("workspace_id") String workspaceId,
@PathVariable("device_sn") String deviceSn,
HttpServletRequest request, @RequestBody DeviceLogsCreateParam param) {
// 获取当前登录用户信息
CustomClaim customClaim = (CustomClaim)request.getAttribute(TOKEN_CLAIM);
// 调用deviceLogsService#pushFileUpload发起日志上传。
return deviceLogsService.pushFileUpload(customClaim.getUsername(), deviceSn, param);
}
deviceLogsService#pushFileUpload实现如下:
@Override
public HttpResultResponse pushFileUpload(String username, String deviceSn, DeviceLogsCreateParam param) {
// 获取上传临时凭证
StsCredentialsResponse stsCredentials = storageService.getSTSCredentials();
stsCredentials.getCredentials().setExpire(System.currentTimeMillis() + (stsCredentials.getCredentials().getExpire() - 60) * 1000);
LogsUploadCredentialsDTO credentialsDTO = new LogsUploadCredentialsDTO(stsCredentials);
// 设备日志文件存储的名称
List<FileUploadStartFile> files = param.getFiles();
files.forEach(file -> file.setObjectKey(credentialsDTO.getObjectKeyPrefix() + "/" + UUID.randomUUID().toString() + LOGS_FILE_SUFFIX));
credentialsDTO.setParams(new FileUploadStartParam().setFiles(files));
// 调用sdk包中封装好的接口abstractLogService#fileuploadStart,发起日志文件上传
TopicServicesResponse<ServicesReplyData> response = abstractLogService.fileuploadStart(
SDKManager.getDeviceSDK(deviceSn), new FileUploadStartRequest()
.setCredentials(stsCredentials.getCredentials())
.setBucket(stsCredentials.getBucket())
.setEndpoint(stsCredentials.getEndpoint())
.setFileStoreDir(stsCredentials.getObjectKeyPrefix())
.setProvider(stsCredentials.getProvider())
.setRegion(stsCredentials.getRegion())
.setParams(new FileUploadStartParam().setFiles(files)));
// 发起日志文件上传不成功,直接返回给前端报错
if (!response.getData().getResult().isSuccess()) {
return HttpResultResponse.error(response.getData().getResult());
}
// 将发起日志上传记录到数据库中
String id = this.insertDeviceLogs(response.getBid(), username, deviceSn, param);
// Redis中保存日志文件上传的记录
RedisOpsUtils.hashSet(RedisConst.LOGS_FILE_PREFIX + deviceSn, id, LogsOutputProgressDTO.builder().logsId(id).build());
return HttpResultResponse.success();
}
abstractLogService#fileuploadStart:
public TopicServicesResponse<ServicesReplyData> fileuploadStart(GatewayManager gateway, FileUploadStartRequest request) {
// 下发mqtt消息给设备,发起日志文件上传
return servicesPublish.publish(
gateway.getGatewaySn(),
LogMethodEnum.FILE_UPLOAD_START.getMethod(),
request);
}
3.机场会通过thing/product/{gateway_sn}/events主题持续上报云端日志文件上传结果。所有mqtt消息都会经过InboundMessageRouter#determineTargetChannels方法,在该方法中找到通过DeviceTopicEnum#find方法找到消息对应的主题,如果是 thing/product/{device_sn}/events 主题的消息会被选择到ChannelName.INBOUND_EVENTS通道中。
public class InboundMessageRouter extends AbstractMessageRouter {
@Override
@Router(inputChannel = ChannelName.INBOUND)
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
// 获取消息头
MessageHeaders headers = message.getHeaders();
String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString();
byte[] payload = (byte[])message.getPayload();
log.debug("received topic :{} \t payload :{}", topic, new String(payload));
// 根据消息topic不同,将消息转发到不同的channel中。
// thing/product/{gateway_sn}/events会被转发到ChannelName.INBOUND_EVENTS通道中。
DeviceTopicEnum topicEnum = DeviceTopicEnum.find(topic);
MessageChannel bean = (MessageChannel) SpringBeanUtils.getBean(topicEnum.getBeanName());
return Collections.singleton(bean);
}
}
消息被转发到ChannelName.INBOUND_EVENTS中后,通过
@Bean
public IntegrationFlow eventsMethodRouterFlow() {
return IntegrationFlows
// 处理ChannelName.INBOUND_EVENT中的消息
.from(ChannelName.INBOUND_EVENTS)
.transform(Message.class, source -> {
try {
// 将上报的消息序列化为Request实体
TopicEventsRequest data = Common.getObjectMapper().readValue((byte[]) source.getPayload(), TopicEventsRequest.class);
String topic = String.valueOf(source.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
// 设置上报消息的sn
return data.setFrom(topic.substring((THING_MODEL_PRE + PRODUCT).length(), topic.indexOf(EVENTS_SUF)))
.setData(Common.getObjectMapper().convertValue(data.getData(), EventsMethodEnum.find(data.getMethod()).getClassType()));
} catch (IOException e) {
throw new CloudSDKException(e);
}
}, null)
.<TopicEventsRequest, EventsMethodEnum>route(
// 日志文件上报的消息,经过该方法会将消息推送给ChannelName.INBOUND_EVENTS_FILEUPLOAD_PROGRESS通道处理
response -> EventsMethodEnum.find(response.getMethod()),
mapping -> Arrays.stream(EventsMethodEnum.values()).forEach(
methodEnum -> mapping.channelMapping(methodEnum, methodEnum.getChannelName())))
.get();
}
ChannelName.INBOUND_EVENTS_FILEUPLOAD_PROGRESS的消息会交给AbstractLogService#fileuploadProgress处理,在sdk中默认实现为抛出异常信息,开发者需要定义
.AbstractLogService的实现类,实现fileuploadProgress方法。示例代码中有默认实现:
DeviceLogsServiceImpl#fileuploadProgress
@Override
public TopicEventsResponse<MqttReply> fileuploadProgress(TopicEventsRequest<EventsDataRequest<FileUploadProgress>> request, MessageHeaders headers) {
EventsReceiver<LogsOutputProgressDTO> webSocketData = new EventsReceiver<>();
webSocketData.setBid(request.getBid());
webSocketData.setSn(request.getGateway());
// 设备不在线,直接返回
Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(request.getGateway());
if (deviceOpt.isEmpty()) {
return null;
}
DeviceDTO device = deviceOpt.get();
String key = RedisConst.LOGS_FILE_PREFIX + request.getGateway();
try {
FileUploadProgress output = request.getData().getOutput();
log.info("Logs upload progress: {}", output.toString());
LogsOutputProgressDTO progress;
// 检查Redis中是否保存过之前开启的日志上传
boolean exist = RedisOpsUtils.checkExist(key);
if (!exist && !output.getStatus().isEnd()) {
// 如果Redis中不存在之前开启的记录,并且日志没有上传完。将正在上传日志的任务保存到Redis中
progress = LogsOutputProgressDTO.builder().logsId(request.getBid()).build();
RedisOpsUtils.hashSet(key, request.getBid(), progress);
} else if (exist) {
progress = (LogsOutputProgressDTO) RedisOpsUtils.hashGet(key, request.getBid());
} else {
progress = LogsOutputProgressDTO.builder().build();
}
progress.setStatus(output.getStatus());
// 如果上传日志进度的消息没有,那么将Redis中缓存的任务删除
List<FileUploadProgressFile> fileReceivers = output.getExt().getFiles();
if (CollectionUtils.isEmpty(fileReceivers)) {
RedisOpsUtils.del(key);
}
// 更新缓存数据
List<LogsProgressDTO> fileProgressList = new ArrayList<>();
fileReceivers.forEach(file -> {
LogFileProgress logsProgress = file.getProgress();
if (!StringUtils.hasText(file.getDeviceSn())) {
if (LogModuleEnum.DOCK == file.getModule()) {
file.setDeviceSn(request.getGateway());
} else if (LogModuleEnum.DRONE == file.getModule()) {
file.setDeviceSn(device.getChildDeviceSn());
}
}
fileProgressList.add(LogsProgressDTO.builder()
.deviceSn(file.getDeviceSn())
.deviceModelDomain(file.getModule().getDomain())
.result(logsProgress.getResult())
.status(logsProgress.getStatus().getStatus())
.uploadRate(logsProgress.getUploadRate())
.progress(((logsProgress.getCurrentStep() - 1) * 100 + logsProgress.getProgress()) / logsProgress.getTotalStep())
.build());
});
progress.setFiles(fileProgressList);
webSocketData.setOutput(progress);
RedisOpsUtils.hashSet(RedisConst.LOGS_FILE_PREFIX + request.getGateway(), request.getBid(), progress);
// 如果上传文件任务结束,将Redis缓存清除、数据库状态更新
if (output.getStatus().isEnd()) {
RedisOpsUtils.del(key);
updateLogsStatus(request.getBid(), DeviceLogsStatusEnum.find(output.getStatus()).getVal());
fileReceivers.forEach(file -> logsFileService.updateFile(request.getBid(), file));
}
} catch (NullPointerException e) {
this.updateLogsStatus(request.getBid(), DeviceLogsStatusEnum.FAILED.getVal());
RedisOpsUtils.del(key);
}
// 通过websocket协议将文件上传进度消息推送给前端
webSocketMessageService.sendBatch(device.getWorkspaceId(), UserTypeEnum.WEB.getVal(),
BizCodeEnum.FILE_UPLOAD_PROGRESS.getCode(), webSocketData);
return new TopicEventsResponse<MqttReply>().setData(MqttReply.success());
}
AbstractLogService#fileuploadProgress处理后的消息会交给
@Bean
public IntegrationFlow replySuccessEvents() {
return IntegrationFlows
.from(ChannelName.OUTBOUND_EVENTS)
.handle(this::publish)
.nullChannel();
}
private TopicEventsResponse publish(TopicEventsResponse request, MessageHeaders headers) {
if (Objects.isNull(request) || Objects.isNull(request.getData())) {
return null;
}
gatewayPublish.publishReply(request, headers);
return request;
}
4. 上传文件日志过程中,云端可以随时取消文件上传。前端通过调用Delete
@DeleteMapping("/{workspace_id}/devices/{device_sn}/logs")
public HttpResultResponse cancelUploadedLogs(@PathVariable("workspace_id") String workspaceId,
@PathVariable("device_sn") String deviceSn,
@RequestBody FileUploadUpdateRequest param) {
// 调用deviceLogsService#pushUpdateFile取消日志文件上传
return deviceLogsService.pushUpdateFile(deviceSn, param);
}
deviceLogsService#pushUpdateFile:
@Override
public HttpResultResponse pushUpdateFile(String deviceSn, FileUploadUpdateRequest param) {
// 调用sdk包装好的接口abstractLogService#fileuploadUpdate,取消日志文件上传
// 该方法会发送mqtt给对应设备,取消日志文件上传
TopicServicesResponse<ServicesReplyData> response = abstractLogService.fileuploadUpdate(SDKManager.getDeviceSDK(deviceSn), param);
if (!response.getData().getResult().isSuccess()) {
return HttpResultResponse.error(response.getData().getResult());
}
return HttpResultResponse.success();
}
abstractLogService#fileuploadUpdate:
public TopicServicesResponse<ServicesReplyData> fileuploadUpdate(GatewayManager gateway, FileUploadUpdateRequest request) {
return servicesPublish.publish(
gateway.getGatewaySn(),
LogMethodEnum.FILE_UPLOAD_UPDATE.getMethod(),
request);
}
4.6.3 开发注意事项
1. 取消日志文件上传,不直接更新日志文件上传任务的状态,同一由设备上报进度状态更新。
评论
0 条评论
请登录写评论。