这篇笔记想写很久了,但是作者最近人生大事不少,耽搁了一阵子。本篇不续接上篇streetscape.gl学习笔记(三)
内容,上篇未完内容将在原篇上进行更新。本篇主要介绍streetscape.gl Loaders。
为何突然转到该主题上,皆因作者最近倒腾的一个小案例。使用React+TypeScript,搭建一个捕鱼船舶看板。这是一个非常典型的物联网展示应用,自己以前也的项目重构记中也介绍了如何通过Kafka+WebSocket实现后台消息实时推送到前端展示。于是作者就想看看streetscape.gl其作为车辆信号展示看板它是如何实现的。
首先从我们最常见的场景,实时显示小车定位的XVIZLiveLoader看起。官网文档是这么讲的:
XVIZLiveLoader
Connects to a live XVIZ system using a WebSocket. Implements XVIZLoaderInterface.
XVIZLiveLoader是通过WebSocket来连接一个实时传输XVIZ协议数据的类,其继承自XVIZLoaderInterface.
A live XVIZ system describes a running system does not have a start and end time available in the metadata and will send XVIZ data immediately upon connection.
一个实时的XVIZ系统描述了一个正在运行的系统,其元数据中没有可以获得的起止时间,并且一旦连接成功就会发送XVIZ数据。
The XVIZLiveLoader will also immediately update the scene to the timestamp of the latest received XVIZ message.
当接收到最新的XVIZ消息时,XVIZLiveLoader也将立即更新场景。
Constructor
import {XVIZLiveLoader} from 'streetscape.gl';
new XVIZLiveLoader({
serverConfig: {
serverUrl: 'ws://localhost:8081'
}
});
Options
serverConfig (Object)
serverConfig.serverUrl (String) - url of the WebSocket server
serverConfig.queryParams (Object, optional) - additional query parameters to use when connecting to the server
serverConfig.defaultLogLength (Number, optional) - fallback value if the duration option is not specified.
serverConfig.retryAttempts (Number, optional) - number of retries if a connection error is encountered. Default 3.
logProfile (String, optional) - Name of the profile to load the log with
bufferLength (Number, optional) - the length of the buffer to keep in memory. Uses the same unit as timestamp. If specified, older frames may be discarded during playback, to avoid crashes due to excessive memory usage. Default 30 seconds.
worker (String|Boolean, optional) - Use a worker for message processing. Default true.
Type Boolean: enable/disable default worker
Type String: the worker URL to use
maxConcurrency (Number, optional) - the maximum number of worker threads to spawn. Default 3.
如何使用
在examples/get-started中看看
app.js中
// __IS_STREAMING__ and __IS_LIVE__ are defined in webpack.config.js
const exampleLog = require(__IS_STREAMING__
? './log-from-stream'
: __IS_LIVE__
? './log-from-live'
: './log-from-file').default;
通过webpack.config.js中的IS_STREAMING 和 IS_LIVE配置来判断该次使用的Loader,这里我们看看log-from-live。
log-from-live.js
import {XVIZLiveLoader} from 'streetscape.gl';
export default new XVIZLiveLoader({
logGuid: 'mock',
bufferLength: 10,
serverConfig: {
defaultLogLength: 30,
serverUrl: 'ws://localhost:8081'
},
worker: true,
maxConcurrency: 4
});
从这个示例中可以看出XVIZLiveLoader如何使用,需要配置一些参数。
logGuid:该数据源的guid号
bufferLength:在内存存放多长buffer的数据,和时间戳有相同的时间单位。如果指定的话,当回放时旧帧将会被抛弃以避免浪费内存。默认值是30s。
serverConfig.defaultLogLength:当duration参数没被指定时,设定回退的值。
serverConfig.serverUrl:WebSocket server的url地址
worker:是否使用worker线程来进行消息处理
maxConcurrency:最大worker线程数
exampleLog获得一个XVIZLiveLoader类型的实例后,如何使用的呢。在app.js中我们可以看到
class Example extends PureComponent {
state = {
log: exampleLog,
settings: {
viewMode: 'PERSPECTIVE',
showTooltip: false
}
};
componentDidMount() {
this.state.log.on('error', console.error).connect();
}
...
render() {
const {log, settings} = this.state;
return (
<div id="container">
<div id="control-panel">
<XVIZPanel log={log} name="Metrics" />
<hr />
<XVIZPanel log={log} name="Camera" />
<hr />
<Form
data={APP_SETTINGS}
values={this.state.settings}
onChange={this._onSettingsChange}
/>
<StreamSettingsPanel log={log} />
</div>
...
</div>
)}
}
它被传入state中,当组件挂载完成时,log开始尝试连接并监听error消息。
随后,在render中又作为属性传递给其他组件。那其他组件拿到log后做了些什么呢。我们可以看一下核心组件LogViewer是怎么对待log的。
在modules/core/src/components/log-viewer/index.js中我们可以看到
const getLogState = log => ({
frame: log.getCurrentFrame(),
metadata: log.getMetadata(),
streamsMetadata: log.getStreamsMetadata()
});
export default connectToLog({getLogState, Component: LogViewer});
其从log中获取当前帧、元数据及数据流的元数据,并将其与组件进行绑定,生成一个高阶组件。
如何实现
看完如何使用,我们不禁想问如何实现的。我们从获取当前帧开始,在这之前我们要了解新建一个XVIZLiveLoader实例、组件挂载后实现connect连接做了什么
XVIZLiveLoader实例
new XVIZLiveLoader({
logGuid: 'mock',
bufferLength: 10,
serverConfig: {
defaultLogLength: 30,
serverUrl: 'ws://localhost:8081'
},
worker: true,
maxConcurrency: 4
});
定位到xviz-live-loader.js文件中
/*
* Handle connecting to XVIZ socket and negotiation of the XVIZ protocol version
*
* This loader is used when connecting to a "live" XVIZ websocket.
* This implies that the metadata does not have a start or end time
* and that we want to display the latest message as soon as it arrives.
*/
export default class XVIZLiveLoader extends XVIZWebsocketLoader {
/**
* constructor
* @params serverConfig {object}
* - serverConfig.serverUrl {string}
* - serverConfig.defaultLogLength {number, optional} - default 30
* - serverConfig.queryParams {object, optional}
* - serverConfig.retryAttempts {number, optional} - default 3
* @params worker {string|function, optional}
* @params maxConcurrency {number, optional} - default 3
* @params logProfile {string, optional}
* @params bufferLength {number, optional}
*/
constructor(options = {}) {
super(options);
// Construct websocket connection details from parameters
this.requestParams = getSocketRequestParams(options);
assert(this.requestParams.bufferLength, 'bufferLength must be provided');
this.retrySettings = {
retries: this.requestParams.retryAttempts,
minTimeout: 500,
randomize: true
};
// Setup relative stream buffer storage by splitting bufferLength 1/3 : 2/3
const bufferChunk = this.requestParams.bufferLength / 3;
// Replace base class object
this.streamBuffer = new XVIZStreamBuffer({
startOffset: -2 * bufferChunk,
endOffset: bufferChunk
});
}
...
}
可以看到XVIZLiveLoader继承自XVIZWebsocketLoader
- 其构造函数首先调用父类的构造函数
- 通过getSocketRequestParams构造websocket连接参数
- 设置websocket尝试连接次数、时长
- 设置相关流缓冲存储,讲缓冲长度切割成1/3 : 2/3
- 新建一个XVIZStreamBuffer以代替基类对象
这里可以看看XVIZStreamBuffer,其定义是
XVIZStreamBuffer
The XVIZStreamBuffer
class manages loaded XVIZ timeslices in memory for easy access.
Constructor
const streamBuffer = new XVIZStreamBuffer();
Parameters:
-
options (Object)
-
startOffset (Number) - offset in seconds. if provided, will not keep timeslices earlier than
currentTime - startOffset
. Defaultnull
. -
endOffset (Number) - offset in seconds. if provided, will not keep timeslices later than
currentTime + endOffset
. Defaultnull
. -
maxLength (Number) - length in seconds. if provided, the buffer will be forced to be no
bigger than the specified length. Defaultnull
.
-
startOffset (Number) - offset in seconds. if provided, will not keep timeslices earlier than
There are three types of buffer: unlimited, offset, and fixed. Use the constructor options to set
an offset buffer (relative to playhead). To set a fixed buffer with absolute timestamps, see
setFixedBuffer
.
可以看到XVIZStreamBuffer是用以加载XVIZ数据时间片到内存中以方便访问,而这里的startOffset、endOffset用以控制在内存中驻留的时间片长度。
再定位到xviz-websocket-loader.js看看XVIZWebsocketLoader的定义
/**
* Connect to XVIZ 2 websocket manage storage of XVIZ data into a XVIZStreamBuffer
*
* This class is a Websocket base class and is expected to be subclassed with
* the following methods overridden:
*
* - _onOpen()
*/
export default class XVIZWebsocketLoader extends XVIZLoaderInterface {
/**
* constructor
* @params serverConfig {object}
* - serverConfig.serverUrl {string}
* - serverConfig.defaultLogLength {number, optional} - default 30
* - serverConfig.queryParams {object, optional}
* - serverConfig.retryAttempts {number, optional} - default 3
* @params worker {string|function, optional}
* @params maxConcurrency {number, optional} - default 3
* @params debug {function} - Debug callback for the XVIZ parser.
* @params logGuid {string}
* @params logProfile {string, optional}
* @params duration {number, optional}
* @params timestamp {number, optional}
* @params bufferLength {number, optional}
*/
constructor(options = {}) {
super(options);
this.socket = null;
this.retrySettings = {
retries: 3,
minTimeout: 500,
randomize: true
};
this.streamBuffer = new XVIZStreamBuffer();
// Handler object for the websocket events
// Note: needs to be last due to member dependencies
this.WebSocketClass = options.WebSocketClass || WebSocket;
}
...
}
可以看到XVIZWebsocketLoader 继承自XVIZLoaderInterface
- 其构造函数首先调用父类的构造函数
- 设置websocket尝试连接次数、时长
- 定义处理websocket事件的类
再定位到xviz-loader-interface.js看看XVIZLoaderInterface的定义
export default class XVIZLoaderInterface {
constructor(options = {}) {
this.options = options;
this._debug = options.debug || (() => {});
this.callbacks = {};
this.listeners = [];
this.state = {};
this._updates = 0;
this._version = 0;
this._updateTimer = null;
}
...
}
这个构造函数相对简单,主要是初始化一些参数,如:事件回调、监听数组、数据版本等
connect
在组件挂载函数中,调用了connect和on函数
componentDidMount() {
this.state.log.on('error', console.error).connect();
}
这里我们定位到xviz-websocket-loader.js中connect函数定义
/**
* Open an XVIZ socket connection with automatic retry
*
* @returns {Promise} WebSocket connection
*/
connect() {
assert(this.socket === null, 'Socket Manager still connected');
this._debug('stream_start');
const {url} = this.requestParams;
// Wrap retry logic around connection
return PromiseRetry(retry => {
return new Promise((resolve, reject) => {
try {
const ws = new this.WebSocketClass(url);
ws.binaryType = 'arraybuffer';
ws.onmessage = message => {
const hasMetadata = Boolean(this.getMetadata());
return parseStreamMessage({
message: message.data,
onResult: this.onXVIZMessage,
onError: this.onError,
debug: this._debug.bind(this, 'parse_message'),
worker: hasMetadata && this.options.worker,
maxConcurrency: this.options.maxConcurrency
});
};
ws.onerror = this.onError;
ws.onclose = event => {
this._onWSClose(event);
reject(event);
};
// On success, resolve the promise with the now ready socket
ws.onopen = () => {
this.socket = ws;
this._onWSOpen();
resolve(ws);
};
} catch (err) {
reject(err);
}
}).catch(event => {
this._onWSError(event);
const isAbnormalClosure = event.code > 1000 && event.code !== 1005;
// Retry if abnormal or connection never established
if (isAbnormalClosure || !this.socket) {
retry();
}
});
}, this.retrySettings).catch(this._onWSError);
}
函数头上的解释是:自动尝试打开一个XVIZ socket连接,返回Promise类型的WebSocket连接
其中用到promise-retry这个包,该包的介绍是:
Retries a function that returns a promise, leveraging the power of the retry module to the promises world.
这是一个非常好的实践。因为物联网应用会偶发网络断联,通过设置一定容差的连接尝试,实现续联。
其中WebSocket的使用可以参考WebSocket
ws.onmessage = message => {
const hasMetadata = Boolean(this.getMetadata());
return parseStreamMessage({
message: message.data,
onResult: this.onXVIZMessage,
onError: this.onError,
debug: this._debug.bind(this, 'parse_message'),
worker: hasMetadata && this.options.worker,
maxConcurrency: this.options.maxConcurrency
});
};
websocket当收到消息时,首先判断是否有元数据。然后调用parseStreamMessage来处理StreamMessage。
其定义如下:
parseStreamMessage will parse the data and handle GLB encoded
XVIZ as well as other formats of the data.
XVIZ parsing functions will decode the binary container, parse the JSON and resolve binary
references. The application will get a "patched" JSON structure, with the difference from the basic
JSON protocol format being that certain arrays will be compact typed arrays instead of classic
JavaScript arrays.
If an attribute has been hydrated from binary then it will be transformed into the corresponding
TypeArray. Typed arrays do not support nesting so all numbers will be laid out flat and the
application needs to know how many values represent one element, for instance 3 values represent the
x, y, z
coordinates of a point.
parseXVIZMessage
import {parseXVIZMessage, XVIZ_MESSAGE} from '@xviz/parser';
parseXVIZMessage({
message,
onResult: data => {
switch (data.type) {
case XVIZ_MESSAGE.METADATA: // do something
case XVIZ_MESSAGE.TIMESLICE: // do something
case XVIZ_MESSAGE.INCOMPLETE: // do something
}
},
onError: console.error,
worker: true
maxConcurrency: 4
});
Parameters:
-
opts
(Object)-
message
(Object|String|ArrayBuffer) - XVIZ message to decode. -
onResult
(Function) - callback if the message is parsed successfully. Receives a single
argumentdata
.data.type
is one ofXVIZ_MESSAGE
. -
onError
(Function) - callback if the parser encouters an error. -
debug
(Function) - callback to log debug info. -
worker
(Boolean|String) - use Web Wroker to parse the message. Enabling worker is recommended
to improve loading performance in production. Defaultfalse
.- boolean: whether to use the default worker. Note that callbacks in XVIZ config are ignored by
the default worker. If you need to inject custom hooks into the parsing process, create a
custom worker using streamDataWorker. - string: a custom worker URL to use.
- boolean: whether to use the default worker. Note that callbacks in XVIZ config are ignored by
-
maxConcurrency
(Number) - the max number of workers to use. Has no effect ifworker
is set
tofalse
. Default4
. -
capacity
(Number) - the limit on the number of messages to queue for the workers to process,
has no effect if set otnull
. Defaultnull
.
-
XVIZ_MESSAGE
Enum of stream message types.
METADATA
TIMESLICE
ERROR
INCOMPLETE
-
DONE
这里this.onXVIZMessage在xviz-loader-interface.js定义如下
onXVIZMessage = message => {
switch (message.type) {
case LOG_STREAM_MESSAGE.METADATA:
this._onXVIZMetadata(message);
this.emit('ready', message);
break;
case LOG_STREAM_MESSAGE.TIMESLICE:
this._onXVIZTimeslice(message);
this.emit('update', message);
break;
case LOG_STREAM_MESSAGE.DONE:
this.emit('finish', message);
break;
default:
this.emit('error', message);
}
};
当解析出的消息类型是METADATA则调用内部函数_onXVIZMetadata,并发出ready消息
_onXVIZMetadata(metadata) {
this.set('metadata', metadata);
if (metadata.streams && Object.keys(metadata.streams).length > 0) {
this.set('streamSettings', metadata.streams);
}
if (!this.streamBuffer) {
throw new Error('streamBuffer is missing');
}
this.logSynchronizer = this.logSynchronizer || new StreamSynchronizer(this.streamBuffer);
const timestamp = this.get('timestamp');
const newTimestamp = Number.isFinite(timestamp) ? timestamp : metadata.start_time;
if (Number.isFinite(newTimestamp)) {
this.seek(newTimestamp);
}
}
这里主要设置一些参数,并定义了一个logSynchronizer其为StreamSynchronizer的一个实例。用以获取实际的stream数据。
StreamSynchronizer
The StreamSynchronizer
class looks into a XVIZStreamBuffer and retrieves the most relevant datum from each stream that "matches" the current timestamp.
当解析出的消息类型是TIMESLICE则调用内部函数_onXVIZTimeslice,并发出update消息
_onXVIZTimeslice(timeslice) {
const oldStreamCount = this.streamBuffer.streamCount;
const bufferUpdated = this.streamBuffer.insert(timeslice);
if (bufferUpdated) {
this._bumpDataVersion();
}
if (getXVIZConfig().DYNAMIC_STREAM_METADATA && this.streamBuffer.streamCount > oldStreamCount) {
const streamsMetadata = {};
const streamSettings = this.get('streamSettings');
for (const streamName in timeslice.streams) {
streamsMetadata[streamName] = timeslice.streams[streamName].__metadata;
// Add new stream name to stream settings (default on)
if (!(streamName in streamSettings)) {
streamSettings[streamName] = true;
}
}
this.set('streamsMetadata', streamsMetadata);
}
return bufferUpdated;
}
其主要作用是将timeslice加入到当前的streamBuffer中,并当timeslice有新的stream且其中的metadata是DYNAMIC_STREAM_METADATA时在streamsMetadata添加该metadata,并更新streamsMetadata
自此一个XVIZLiveLoader实例化完毕并实现connect连接
getCurrentFrame
接下来看看如何获取当前帧的,getCurrentFrame的源码可以定位到xviz-loader-interface.js文件
getCurrentFrame = createSelector(
this,
[this.getStreamSettings, this.getCurrentTime, this.getLookAhead, this._getDataVersion],
// `dataVersion` is only needed to trigger recomputation.
// The logSynchronizer has access to the timeslices.
(streamSettings, timestamp, lookAhead) => {
const {logSynchronizer} = this;
if (logSynchronizer && Number.isFinite(timestamp)) {
logSynchronizer.setTime(timestamp);
logSynchronizer.setLookAheadTimeOffset(lookAhead);
return logSynchronizer.getCurrentFrame(streamSettings);
}
return null;
}
);
这里有个很好的实践——createSelector,其在/modules/src/utils/create-selector.js中定义
import {createSelector} from 'reselect';
// reselect selectors do not update if called with the same arguments
// to support calling them without arguments, pass logLoader version
export default function createLogSelector(logLoader, ...args) {
const selector = createSelector(...args);
return () => selector(logLoader._version);
}
这里使用了一个reselect的库,
Reselect
Simple “selector” library for Redux (and others) inspired by getters in NuclearJS, subscriptions in re-frame and this proposal from speedskater.
- Selectors can compute derived data, allowing Redux to store the minimal possible state.
- Selectors are efficient. A selector is not recomputed unless one of its arguments changes.
- Selectors are composable. They can be used as input to other selectors.
通过介绍可以看出reselect库可以精简state结构,并且抑制不必要的计算。
作者为了防止因参数相同不触发reselect的重新计算,通过dataVersion来进行重新计算的控制,以达到减少state的更改、节约前端的绘制。
而获取当前数据帧则通过logSynchronizer.getCurrentFrame(streamSettings)得到。
至此一个完整的获取当前数据帧的流程讲完了,其中有很多细节还需要消化,有一同研究的小伙伴可以交流,欢迎留言!