- sample代码架构
- sample代码实现
- 开发注意事项
4.2.1 sample代码架构
sample对于直播功能的实现主要包含:设备直播能力上报、开启直播、前端拉流播放。
- 设备直播能力上报:网关设备会通过thing/product/{device_sn}/state主题上报设备的直播能力(live_capacity),云端获取到直播能力上报的数据后,会将上报的数据保存在缓存Redis中。
- 开启直播:如果是在web端开启直播,前端会首先请求后端接口获取能开启直播的镜头信息,然后再根据“config.ts”文件中配置的直播地址信息,发送开启直播请求。后端收到开启直播请求后,会通过mqtt下发指令给设备开启直播,开启直播成功后,会返回给前端一个webrtc拉流的地址。
- 前端拉流播放:前端通过后端返回的rtc拉流地址,拉取视频流在前端进行播放。
4.2.2 sample代码实现
设备直播能力上报
设备通过mqtt主题 thing/product/{device_sn}/state 上报直播能力信息。本章节主要介绍示例代码后端如何接收设备的直播能力,以及将直播能力进行存储以供前端进行获取。
1. 所有mqtt消息都会经过InboundMessageRouter#determineTargetChannels方法,在该方法中找到通过DeviceTopicEnum#find方法找到消息对应的主题,如果是 thing/product/{device_sn}/state 主题的消息会被选择到ChannelName.INBOUND_STATE通道中。
public class InboundMessageRouter extends AbstractMessageRouter {
@Override
@Router(inputChannel = ChannelName.INBOUND)
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
// 获取消息头
MessageHeaders headers = message.getHeaders();
String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString();
byte[] payload = (byte[])message.getPayload();
log.debug("received topic :{} \t payload :{}", topic, new String(payload));
// 根据消息topic不同,将消息转发到不同的channel中。
// 上线消息sys/product/{gateway_sn}/status会被转发到ChannelName.INBOUND_STATUS通道中。
DeviceTopicEnum topicEnum = DeviceTopicEnum.find(topic);
MessageChannel bean = (MessageChannel) SpringBeanUtils.getBean(topicEnum.getBeanName());
return Collections.singleton(bean);
}
}
2. 上报直播能力的消息被转发到ChannelName.INBOUND_STATE中后,通过
public IntegrationFlow stateDataRouterFlow() {
return IntegrationFlows
.from(ChannelName.INBOUND_STATE)
.transform(Message.class, source -> {
// 将上报的state消息根据消息内容转换为Java DTO。
// 直播的state转换为DockLivestreamAbilityUpdate
try {
TopicStateRequest response = Common.getObjectMapper().readValue((byte[]) source.getPayload(), new TypeReference<TopicStateRequest>() {});
String topic = String.valueOf(source.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
String from = topic.substring((THING_MODEL_PRE + PRODUCT).length(), topic.indexOf(STATE_SUF));
return response.setFrom(from)
// 根据消息的gateway_sn和消息内容将直播消息转换为对应的Java DTO。
.setData(Common.getObjectMapper().convertValue(response.getData(), getTypeReference(response.getGateway(), response.getData())));
} catch (IOException e) {
throw new CloudSDKException(e);
}
}, null)
// 根据消息内容序列化后的DTO class类型将消息提交给不同的channel进行处理
// 直播能力上报的消息交给ChannelName.INBOUND_STATE_DOCK_LIVESTREAM_ABILITY_UPDATE通道处理
.<TopicStateRequest, StateDataKeyEnum>route(response -> StateDataKeyEnum.find(response.getData().getClass()),
mapping -> Arrays.stream(StateDataKeyEnum.values()).forEach(key -> mapping.channelMapping(key, key.getChannelName())))
.get();
}
private Class getTypeReference(String gatewaySn, Object data) {
Set<String> keys = ((Map<String, Object>) data).keySet();
// 根据gateway_sn查找上线时的缓存,从缓存中获取当前上报state消息的设备是RC还是Dock
switch (SDKManager.getDeviceSDK(gatewaySn).getType()) {
case RC:
return RcStateDataKeyEnum.find(keys).getClassType();
case DOCK:
return DockStateDataKeyEnum.find(keys).getClassType();
default:
throw new CloudSDKException(CloudSDKErrorEnum.WRONG_DATA, "Unexpected value: " + SDKManager.getDeviceSDK(gatewaySn).getType());
}
}
3. 直播能力更新的消息通过ChannelName.INBOUND_STATE_DOCK_LIVESTREAM_ABILITY_UPDATE通道后,会被AbstractLivestreamService#dockLivestreamAbilityUpdate方法处理,在官方提供的示例代码中会被其实现类SDKLivestreamService#dockLivestreamAbilityUpdate处理。
@Override
public void dockLivestreamAbilityUpdate(TopicStateRequest<DockLivestreamAbilityUpdate> request, MessageHeaders headers) {
// 调用本类的saveLiveCapacity方法处理
saveLiveCapacity(request.getData().getLiveCapacity().getDeviceList());
}
private void saveLiveCapacity(Object data) {
// 将直播能力上报的消息序列化为CapacityDeviceReceiver类
List<CapacityDeviceReceiver> devices = objectMapper.convertValue(
data, new TypeReference<List<CapacityDeviceReceiver>>() {});
// 上报的消息会包含多个设备的直播能力(网关设备机场、飞行器设备)
// 每个设备都调用capacityCameraService#saveCapacityCameraReceiverList方法进行处理
for (CapacityDeviceReceiver capacityDeviceReceiver : devices) {
capacityCameraService.saveCapacityCameraReceiverList(
capacityDeviceReceiver.getCameraList(), capacityDeviceReceiver.getSn());
}
}
4. capacityCameraService#saveCapacityCameraReceiverList方法会将直播能力转换为CapacityCameraDTO类,然后保存在Redis中。保存时的key与设备sn有关联。
@Override
public void saveCapacityCameraReceiverList(List<CapacityCameraReceiver> capacityCameraReceivers, String deviceSn) {
// 将直播能力转换为CapacityCameraDTO列表
List<CapacityCameraDTO> capacity = capacityCameraReceivers.stream()
.map(this::receiver2Dto).collect(Collectors.toList());
// 将直播能力与设备sn关联,并保存在Redis中。
RedisOpsUtils.hashSet(RedisConst.LIVE_CAPACITY, deviceSn, capacity);
}
开启直播
1. 前端通过 Get ${url.manage.prefix}${url.manage.version}/live/capacity 请求后端,获取可以开启直播的设备以及设备对应的镜头。对应后端接口为:LiveStreamController#getLiveCapacity
// 获取当前用户所在的工作空间下的所有设备的直播镜头
@GetMapping("/capacity")
public HttpResultResponse<List<CapacityDeviceDTO>> getLiveCapacity(HttpServletRequest request) {
// 获取当前登录用户信息
CustomClaim customClaim = (CustomClaim)request.getAttribute(TOKEN_CLAIM);
// 通过当前登录用户的workspaceId获取所有可直播设备的镜头
List<CapacityDeviceDTO> liveCapacity = liveStreamService.getLiveCapacity(customClaim.getWorkspaceId());
return HttpResultResponse.success(liveCapacity);
}
2. LiveStreamController#getLiveCapacity方法:首先会获取当前工作空间下所有的无人机设备和机场设备。过滤已经离线的设备,通过capacityCameraService#getCapacityCameraByDeviceSn将在线设备的直播能力获取并返回。
@Override
public List<CapacityDeviceDTO> getLiveCapacity(String workspaceId) {
// 查找当前工作空间下所用的无人机设备和机场设备
List<DeviceDTO> devicesList = deviceService.getDevicesByParams(
DeviceQueryParam.builder()
.workspaceId(workspaceId)
.domains(List.of(DeviceDomainEnum.DRONE.getDomain(), DeviceDomainEnum.DOCK.getDomain()))
.build());
return devicesList.stream()
// 只保留在线的设备,离线设备进行排除
.filter(device -> deviceRedisService.checkDeviceOnline(device.getDeviceSn()))
.map(device -> CapacityDeviceDTO.builder()
.name(Objects.requireNonNullElse(device.getNickname(), device.getDeviceName()))
.sn(device.getDeviceSn())
// 通过capacityCameraService#getCapacityCameraByDeviceSn获取设备直播能力信息
.camerasList(capacityCameraService.getCapacityCameraByDeviceSn(device.getDeviceSn()))
.build())
.collect(Collectors.toList());
}
3. capacityCameraService#getCapacityCameraByDeviceSn方法:在缓存Redis中通过设备sn获取到设备的直播能力。
@Override
public List<CapacityCameraDTO> getCapacityCameraByDeviceSn(String deviceSn) {
// 在缓存Redis中获取到设备的直播能力
return (List<CapacityCameraDTO>) RedisOpsUtils.hashGet(RedisConst.LIVE_CAPACITY, deviceSn);
}
4. 前端获取到设备直播能力后,用户可以在前端选择一个设备进行开启直播。通过Post ${url.manage.prefix}${url.manage.version}/live/streams/start请求开启直播。
@PostMapping("/streams/start")
public HttpResultResponse liveStart(@RequestBody LiveTypeDTO liveParam) {
// 调用liveStreamService#liveStart开启直播
return liveStreamService.liveStart(liveParam);
}
5. liveStreamService#liveStart方法:先检查镜头能否开启直播,如果不能开启直播(如无人机或者网关设备已经离线等情况),直接返回。通过sdk包封装好的abstractLivestreamService#liveStartPush发送消息给网关设备开启直播。如果直播协议是:rtmp、GB28181,则需要在流媒体服务器端将视频流转换webrtc流;声网Agora直播,前端直接通过声网SDK播放视频,不需要进行转换;rtsp也不需要转换,可以直接播放。
@Override
public HttpResultResponse liveStart(LiveTypeDTO liveParam) {
// 检查直播镜头是否可以开启直播
HttpResultResponse<DeviceDTO> responseResult = this.checkBeforeLive(liveParam.getVideoId());
if (HttpResultResponse.CODE_SUCCESS != responseResult.getCode()) {
// 当前镜头不能开启直播,直接返回
return responseResult;
}
// 通过abstractLivestreamService#liveStartPush发送开启直播的mqtt消息,开启直播。
TopicServicesResponse<ServicesReplyData<String>> response = abstractLivestreamService.liveStartPush(
SDKManager.getDeviceSDK(responseResult.getData().getDeviceSn()), new LiveStartPushRequest()
.setUrl(liveParam.getUrl())
.setUrlType(liveParam.getUrlType())
.setVideoId(new VideoId(liveParam.getVideoId()))
.setVideoQuality(liveParam.getVideoQuality()));
// 如果开启直播失败,将开启直播失败的报错信息返回给前端
if (!response.getData().getResult().isSuccess()) {
return HttpResultResponse.error(response.getData().getResult());
}
// 程序运行到这,证明开启直播成功
LiveDTO live = new LiveDTO();
switch (liveParam.getUrlType()) {
case AGORA:
// 声网Agora直播,前端调用声网SDK拉流,不需要转换拉流地址
break;
case RTMP:
// rtmp协议开启直播,前端无法直接播放rtmp视频流,需要流媒体服务器将rtmp视频流转换为webrtc流
// 需要开发者自己部署流媒体服务器,并在流媒体服务器中转换视频流
live.setUrl(liveParam.getUrl().replace("rtmp", "webrtc"));
break;
case GB28181:
// 直播协议为GB28181,需要流媒体处将视频流进行转换
LiveUrlGB28181DTO gb28181 = urlToGB28181(liveParam.getUrl());
live.setUrl(new StringBuilder()
.append("webrtc://")
.append(gb28181.getServerIP())
.append("/live/")
.append(gb28181.getAgentID())
.append("@")
.append(gb28181.getChannel())
.toString());
break;
case RTSP:
// rtsp协议开启直播,会在返回的消息中包含拉流的url地址,需要将该地址返回给前端
String url = response.getData().getOutput();
this.resolveUrlUser(url, live);
break;
default:
return HttpResultResponse.error(LiveErrorCodeEnum.URL_TYPE_NOT_SUPPORTED);
}
return HttpResultResponse.success(live);
}
前端拉流播放
前端通过Post ${url.manage.prefix}${url.manage.version}/live/streams/start开启直播后,后端会返回拉流的地址,前端根据后端返回的url进行拉流播放。
4.2.3 开发注意事项
1. 前端无法直接播放rtmp、GB28181视频流,需要在流媒体服务器端将视频流转换为webrtc流,前端播放webrtc视频流。
评论
1 条评论
你好,GB28181有对接没?
请登录写评论。