- sample代码架构
- sample代码实现
- 获取上传临时凭证
- 上传媒体文件
- 上擦混回调云端
- 开发注意事项
4.3.1 sample代码架构
媒体管理功能Pilot2和机场的处理稍有不同,大致实现都是相同的。步骤如下:
步骤一:Pilot2或者机场请求云端获取上传临时凭证信息(STS)。
步骤二:Pilot2或者机场根据获取的临时凭证信息上传媒体文件。
步骤三:Pilot2或者机场将上传媒体文件的结果上报给云端。
4.3.2 sample代码实现
获取上传临时凭证
获取上传临时凭证Pilot2和机场的处理方式不同,在示例代码中的实现也有一些区别。
1.Pilot2获取上传临时凭证
Pilot2通过 POST /storage/api/v1/workspaces/{workspace_id}/sts 请求后端获取临时上传凭证,对应后端接口方法:IHttpStorageService#getTemporaryCredential。
@PostMapping(PREFIX + "/workspaces/{workspace_id}/sts")
HttpResultResponse<StsCredentialsResponse> getTemporaryCredential(
@PathVariable(name = "workspace_id") String workspaceId,
HttpServletRequest req, HttpServletResponse rsp);
IHttpStorageService#getTemporaryCredential方法在示例代码中的实现为StorageController#getTemporaryCredential,具体代码如下:
@Override
public HttpResultResponse<StsCredentialsResponse> getTemporaryCredential(String workspaceId, HttpServletRequest req, HttpServletResponse rsp) {
// 调用storageService#getSTSCredentials获取临时凭证
StsCredentialsResponse stsCredentials = storageService.getSTSCredentials();
return HttpResultResponse.success(stsCredentials);
}
storageService#getSTSCredentials方法如下:将配置文件中配置好的endpoint、bucket、provider、objectKeyPrefix、region信息填充到临时凭证信息中。配置文件中配置的对象存储不同,会示例化不同的对象存储类,通过调用对应的对象存储实现获取到临时凭证信息。
@Override
public StsCredentialsResponse getSTSCredentials() {
return new StsCredentialsResponse()
.setEndpoint(OssConfiguration.endpoint)
.setBucket(OssConfiguration.bucket)
// 调用具体的对象存储类获取临时凭证信息
.setCredentials(ossService.getCredentials())
.setProvider(OssConfiguration.provider)
.setObjectKeyPrefix(OssConfiguration.objectDirPrefix)
.setRegion(OssConfiguration.region);
}
2.机场获取上传临时凭证
机场是通过 thing/product/{gateway_sn}/requests 主题,method:storage_config_get的mqtt消息,请求云端获取上传临时凭证。所有mqtt消息都会经过InboundMessageRouter#determineTargetChannels方法,在该方法中找到通过DeviceTopicEnum#find方法找到消息对应的主题,如是上线的消息会被选择到
public class InboundMessageRouter extends AbstractMessageRouter {
@Override
@Router(inputChannel = ChannelName.INBOUND)
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
// 获取消息头
MessageHeaders headers = message.getHeaders();
String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString();
byte[] payload = (byte[])message.getPayload();
log.debug("received topic :{} \t payload :{}", topic, new String(payload));
// 根据消息topic不同,将消息转发到不同的channel中。
// 上线消息sys/product/{gateway_sn}/status会被转发到ChannelName.INBOUND_STATUS通道中。
DeviceTopicEnum topicEnum = DeviceTopicEnum.find(topic);
MessageChannel bean = (MessageChannel) SpringBeanUtils.getBean(topicEnum.getBeanName());
return Collections.singleton(bean);
}
}
请求获取上传临时凭证的消息被转发到ChannelName.INBOUND_REQUESTS中后,通过RequestsRouter#requestsMethodRouterFlow方法中定义的流程处理。通过消息中携带的method属性,将消息序列化为不同DTO类的示例,method:storage_config_get的mqtt消息对应的实体类为:StorageConfigGet。将消息序列化为实体类后,再根据消息中的method属性,将requests请求消息路由到不同的管道中,method:storage_config_get的mqtt消息路由的管道为ChannelName.INBOUND_REQUESTS_STORAGE_CONFIG_GET
@Bean
public IntegrationFlow requestsMethodRouterFlow() {
return IntegrationFlows
.from(ChannelName.INBOUND_REQUESTS)
.<byte[], TopicRequestsRequest>transform(payload -> {
try {
TopicRequestsRequest response = Common.getObjectMapper().readValue(payload, TopicRequestsRequest.class);
// 根据消息method属性,将requests消息序列化为不同的Java实体类
// method:storage_config_get的mqtt消息对应的实体类为:StorageConfigGet
return response.setData(Common.getObjectMapper().convertValue(response.getData(), RequestsMethodEnum.find(response.getMethod()).getClassType()));
} catch (IOException e) {
throw new CloudSDKException(e);
}
})
.<TopicRequestsRequest, RequestsMethodEnum>route(
receiver -> RequestsMethodEnum.find(receiver.getMethod()),
mapping -> Arrays.stream(RequestsMethodEnum.values()).forEach(
// 根据消息中的method属性,将requests请求消息路由到不同的管道中
// method:storage_config_get的mqtt消息路由的管道为ChannelName.INBOUND_REQUESTS_STORAGE_CONFIG_GET
methodEnum -> mapping.channelMapping(methodEnum, methodEnum.getChannelName())))
.get();
}
ChannelName.INBOUND_REQUESTS_STORAGE_CONFIG_GET管道中的消息会被AbstractMediaService#storageConfigGet方法进行处理,开发者可以定义AbstractMediaService类的实现类,实现storageConfigGet方法进行自己的业务处理。示例代码中,该实现类为StorageServiceImpl。此方法返回的消息会被传输到ChannelName.OUTBOUND_REQUESTS管道中,并最终发送给请求的设备。
@ServiceActivator(inputChannel = ChannelName.INBOUND_REQUESTS_STORAGE_CONFIG_GET, outputChannel = ChannelName.OUTBOUND_REQUESTS)
public TopicRequestsResponse<MqttReply<StsCredentialsResponse>> storageConfigGet(TopicRequestsRequest<StorageConfigGet> request, MessageHeaders headers) {
throw new UnsupportedOperationException("storageConfigGet not implemented");
}
StorageServiceImpl#storageConfigGet方法处理如下:调用本类getSTSCredentials()方法获得临时凭证,包装为TopicRequestsResponse<MqttReply<StsCredentialsResponse>>类型进行返回
@Override
public TopicRequestsResponse<MqttReply<StsCredentialsResponse>> storageConfigGet(TopicRequestsRequest<StorageConfigGet> response, MessageHeaders headers) {
return new TopicRequestsResponse<MqttReply<StsCredentialsResponse>>().setData(MqttReply.success(getSTSCredentials()));
}
@Override
public StsCredentialsResponse getSTSCredentials() {
return new StsCredentialsResponse()
.setEndpoint(OssConfiguration.endpoint)
.setBucket(OssConfiguration.bucket)
.setCredentials(ossService.getCredentials())
.setProvider(OssConfiguration.provider)
.setObjectKeyPrefix(OssConfiguration.objectDirPrefix)
.setRegion(OssConfiguration.region);
}
AbstractMediaService#storageConfigGet方法返回的消息会被传输到ChannelName.OUTBOUND_REQUESTS管道中,并通过RequestsRouter#replyRequestsMethod方法进行处理。
@Bean
public IntegrationFlow replyRequestsMethod() {
return IntegrationFlows
// 处理ChannelName.OUTBOUND_REQUESTS管道中的数据
.from(ChannelName.OUTBOUND_REQUESTS)
// 将数据交给本类publish方法进行处理
.handle(this::publish)
.nullChannel();
}
private TopicRequestsResponse publish(TopicRequestsResponse request, MessageHeaders headers) {
// 检查返回的数据是否为空
if (Objects.isNull(request)) {
throw new CloudSDKException(CloudSDKErrorEnum.INVALID_PARAMETER, "The return value cannot be null.");
}
// 调用封装好的消息发送方法,将requests_reply消息发送给设备端
gatewayPublish.publishReply(request, headers);
return request;
}
上传媒体文件
Pilot2或者机场获取到临时凭证信息后,满足不同条件下,会自动将媒体文件上传到指定的对象存储中。
Pilot2需要满足以下条件之一:
- Pilot2设置了媒体文件自动上传,Pilot2中检测到媒体文件后即自动上传
- Pilot2设置媒体文件手动上传,需要用户在Pilot2的媒体模块手动将媒体文件上传到对象存储中。
机场需要在执行完任务后,且无人机生成了媒体文件后,机场自动将媒体文件上传到云端。
上传结果回调云端
Pilot2通过 POST /media/api/v1/workspaces/{workspace_id}/upload-callback 请求上报云端媒体文件上传结果。对应后端请求为IHttpMediaService#mediaUploadCallback。
HttpResultResponse<String> mediaUploadCallback(
@PathVariable(name = "workspace_id") String workspaceId,
@Valid @RequestBody MediaUploadCallbackRequest request,
HttpServletRequest req, HttpServletResponse rsp);
如果对于上传结果回调云端有自定义的业务需求,可以编写IHttpMediaService的实现类,实现mediaUploadCallback方法。示例代码中的实现为MediaController#mediaUploadCallback,实现代码如下:
@Override
public HttpResultResponse<String> mediaUploadCallback(String workspaceId, @Valid MediaUploadCallbackRequest request, HttpServletRequest req, HttpServletResponse rsp) {
// 调用mediaService#saveMediaFile实现业务逻辑
mediaService.saveMediaFile(workspaceId, request);
return HttpResultResponse.success(request.getObjectKey());
}
mediaService#saveMediaFile实现如下:
评论
1 条评论
storageConfigGet 函数实现还需要修改吗 我这里报错storageConfigGet not implemented 看demo明明已经实现了啊
请登录写评论。