streetscape.gl学习笔记(四)

这篇笔记想写很久了,但是作者最近人生大事不少,耽搁了一阵子。本篇不续接上篇streetscape.gl学习笔记(三)
内容,上篇未完内容将在原篇上进行更新。本篇主要介绍streetscape.gl Loaders。
为何突然转到该主题上,皆因作者最近倒腾的一个小案例。使用React+TypeScript,搭建一个捕鱼船舶看板。这是一个非常典型的物联网展示应用,自己以前也的项目重构记中也介绍了如何通过Kafka+WebSocket实现后台消息实时推送到前端展示。于是作者就想看看streetscape.gl其作为车辆信号展示看板它是如何实现的。
首先从我们最常见的场景,实时显示小车定位的XVIZLiveLoader看起。官网文档是这么讲的:

XVIZLiveLoader

Connects to a live XVIZ system using a WebSocket. Implements XVIZLoaderInterface.
XVIZLiveLoader是通过WebSocket来连接一个实时传输XVIZ协议数据的类,其继承自XVIZLoaderInterface.
A live XVIZ system describes a running system does not have a start and end time available in the metadata and will send XVIZ data immediately upon connection.
一个实时的XVIZ系统描述了一个正在运行的系统,其元数据中没有可以获得的起止时间,并且一旦连接成功就会发送XVIZ数据。
The XVIZLiveLoader will also immediately update the scene to the timestamp of the latest received XVIZ message.
当接收到最新的XVIZ消息时,XVIZLiveLoader也将立即更新场景。

Constructor

import {XVIZLiveLoader} from 'streetscape.gl';

new XVIZLiveLoader({
  serverConfig: {
    serverUrl: 'ws://localhost:8081'
  }
});
Options

serverConfig (Object)
    serverConfig.serverUrl (String) - url of the WebSocket server
    serverConfig.queryParams (Object, optional) - additional query parameters to use when connecting to the server
    serverConfig.defaultLogLength (Number, optional) - fallback value if the duration option is not specified.
    serverConfig.retryAttempts (Number, optional) - number of retries if a connection error is encountered. Default 3.
logProfile (String, optional) - Name of the profile to load the log with
bufferLength (Number, optional) - the length of the buffer to keep in memory. Uses the same unit as timestamp. If specified, older frames may be discarded during playback, to avoid crashes due to excessive memory usage. Default 30 seconds.
worker (String|Boolean, optional) - Use a worker for message processing. Default true.
Type Boolean: enable/disable default worker
Type String: the worker URL to use
maxConcurrency (Number, optional) - the maximum number of worker threads to spawn. Default 3.

如何使用

在examples/get-started中看看
app.js中

// __IS_STREAMING__ and __IS_LIVE__ are defined in webpack.config.js
const exampleLog = require(__IS_STREAMING__
  ? './log-from-stream'
  : __IS_LIVE__
    ? './log-from-live'
    : './log-from-file').default;

通过webpack.config.js中的IS_STREAMINGIS_LIVE配置来判断该次使用的Loader,这里我们看看log-from-live。
log-from-live.js

import {XVIZLiveLoader} from 'streetscape.gl';

export default new XVIZLiveLoader({
  logGuid: 'mock',
  bufferLength: 10,
  serverConfig: {
    defaultLogLength: 30,
    serverUrl: 'ws://localhost:8081'
  },
  worker: true,
  maxConcurrency: 4
});

从这个示例中可以看出XVIZLiveLoader如何使用,需要配置一些参数。
logGuid:该数据源的guid号
bufferLength:在内存存放多长buffer的数据,和时间戳有相同的时间单位。如果指定的话,当回放时旧帧将会被抛弃以避免浪费内存。默认值是30s。
serverConfig.defaultLogLength:当duration参数没被指定时,设定回退的值。
serverConfig.serverUrl:WebSocket server的url地址
worker:是否使用worker线程来进行消息处理
maxConcurrency:最大worker线程数


exampleLog获得一个XVIZLiveLoader类型的实例后,如何使用的呢。在app.js中我们可以看到

class Example extends PureComponent {
  state = {
    log: exampleLog,
    settings: {
      viewMode: 'PERSPECTIVE',
      showTooltip: false
    }
  };

  componentDidMount() {
    this.state.log.on('error', console.error).connect();
  }

 ...
  render() {
    const {log, settings} = this.state;

    return (
      <div id="container">
        <div id="control-panel">
          <XVIZPanel log={log} name="Metrics" />
          <hr />
          <XVIZPanel log={log} name="Camera" />
          <hr />
          <Form
            data={APP_SETTINGS}
            values={this.state.settings}
            onChange={this._onSettingsChange}
          />
          <StreamSettingsPanel log={log} />
        </div>
    ...
    </div>
  )}
}

它被传入state中,当组件挂载完成时,log开始尝试连接并监听error消息。
随后,在render中又作为属性传递给其他组件。那其他组件拿到log后做了些什么呢。我们可以看一下核心组件LogViewer是怎么对待log的。
在modules/core/src/components/log-viewer/index.js中我们可以看到

const getLogState = log => ({
  frame: log.getCurrentFrame(),
  metadata: log.getMetadata(),
  streamsMetadata: log.getStreamsMetadata()
});

export default connectToLog({getLogState, Component: LogViewer});

其从log中获取当前帧、元数据及数据流的元数据,并将其与组件进行绑定,生成一个高阶组件。

如何实现

看完如何使用,我们不禁想问如何实现的。我们从获取当前帧开始,在这之前我们要了解新建一个XVIZLiveLoader实例、组件挂载后实现connect连接做了什么

XVIZLiveLoader实例

new XVIZLiveLoader({
  logGuid: 'mock',
  bufferLength: 10,
  serverConfig: {
    defaultLogLength: 30,
    serverUrl: 'ws://localhost:8081'
  },
  worker: true,
  maxConcurrency: 4
});

定位到xviz-live-loader.js文件中

/*
 * Handle connecting to XVIZ socket and negotiation of the XVIZ protocol version
 *
 * This loader is used when connecting to a "live" XVIZ websocket.
 * This implies that the metadata does not have a start or end time
 * and that we want to display the latest message as soon as it arrives.
 */
export default class XVIZLiveLoader extends XVIZWebsocketLoader {
  /**
   * constructor
   * @params serverConfig {object}
   *   - serverConfig.serverUrl {string}
   *   - serverConfig.defaultLogLength {number, optional} - default 30
   *   - serverConfig.queryParams {object, optional}
   *   - serverConfig.retryAttempts {number, optional} - default 3
   * @params worker {string|function, optional}
   * @params maxConcurrency {number, optional} - default 3
   * @params logProfile {string, optional}
   * @params bufferLength {number, optional}
   */
  constructor(options = {}) {
    super(options);

    // Construct websocket connection details from parameters
    this.requestParams = getSocketRequestParams(options);
    assert(this.requestParams.bufferLength, 'bufferLength must be provided');

    this.retrySettings = {
      retries: this.requestParams.retryAttempts,
      minTimeout: 500,
      randomize: true
    };

    // Setup relative stream buffer storage by splitting bufferLength 1/3 : 2/3
    const bufferChunk = this.requestParams.bufferLength / 3;

    // Replace base class object
    this.streamBuffer = new XVIZStreamBuffer({
      startOffset: -2 * bufferChunk,
      endOffset: bufferChunk
    });
  }
...
}

可以看到XVIZLiveLoader继承自XVIZWebsocketLoader

  • 其构造函数首先调用父类的构造函数
  • 通过getSocketRequestParams构造websocket连接参数
  • 设置websocket尝试连接次数、时长
  • 设置相关流缓冲存储,讲缓冲长度切割成1/3 : 2/3
  • 新建一个XVIZStreamBuffer以代替基类对象
    这里可以看看XVIZStreamBuffer,其定义是

XVIZStreamBuffer

The XVIZStreamBuffer class manages loaded XVIZ timeslices in memory for easy access.

Constructor

const streamBuffer = new XVIZStreamBuffer();

Parameters:

  • options (Object)
    • startOffset (Number) - offset in seconds. if provided, will not keep timeslices earlier than
      currentTime - startOffset. Default null.
    • endOffset (Number) - offset in seconds. if provided, will not keep timeslices later than
      currentTime + endOffset. Default null.
    • maxLength (Number) - length in seconds. if provided, the buffer will be forced to be no
      bigger than the specified length. Default null.

There are three types of buffer: unlimited, offset, and fixed. Use the constructor options to set
an offset buffer (relative to playhead). To set a fixed buffer with absolute timestamps, see
setFixedBuffer.

可以看到XVIZStreamBuffer是用以加载XVIZ数据时间片到内存中以方便访问,而这里的startOffset、endOffset用以控制在内存中驻留的时间片长度。
再定位到xviz-websocket-loader.js看看XVIZWebsocketLoader的定义

/**
 * Connect to XVIZ 2 websocket manage storage of XVIZ data into a XVIZStreamBuffer
 *
 * This class is a Websocket base class and is expected to be subclassed with
 * the following methods overridden:
 *
 * - _onOpen()
 */
export default class XVIZWebsocketLoader extends XVIZLoaderInterface {
  /**
   * constructor
   * @params serverConfig {object}
   *   - serverConfig.serverUrl {string}
   *   - serverConfig.defaultLogLength {number, optional} - default 30
   *   - serverConfig.queryParams {object, optional}
   *   - serverConfig.retryAttempts {number, optional} - default 3
   * @params worker {string|function, optional}
   * @params maxConcurrency {number, optional} - default 3
   * @params debug {function} - Debug callback for the XVIZ parser.
   * @params logGuid {string}
   * @params logProfile {string, optional}
   * @params duration {number, optional}
   * @params timestamp {number, optional}
   * @params bufferLength {number, optional}
   */
  constructor(options = {}) {
    super(options);

    this.socket = null;

    this.retrySettings = {
      retries: 3,
      minTimeout: 500,
      randomize: true
    };

    this.streamBuffer = new XVIZStreamBuffer();

    // Handler object for the websocket events
    // Note: needs to be last due to member dependencies
    this.WebSocketClass = options.WebSocketClass || WebSocket;
  }
...
}

可以看到XVIZWebsocketLoader 继承自XVIZLoaderInterface

  • 其构造函数首先调用父类的构造函数
  • 设置websocket尝试连接次数、时长
  • 定义处理websocket事件的类
    再定位到xviz-loader-interface.js看看XVIZLoaderInterface的定义
export default class XVIZLoaderInterface {
  constructor(options = {}) {
    this.options = options;
    this._debug = options.debug || (() => {});
    this.callbacks = {};

    this.listeners = [];
    this.state = {};
    this._updates = 0;
    this._version = 0;
    this._updateTimer = null;
  }
...
}

这个构造函数相对简单,主要是初始化一些参数,如:事件回调、监听数组、数据版本等

connect

在组件挂载函数中,调用了connect和on函数

  componentDidMount() {
    this.state.log.on('error', console.error).connect();
  }

这里我们定位到xviz-websocket-loader.js中connect函数定义

  /**
   * Open an XVIZ socket connection with automatic retry
   *
   * @returns {Promise} WebSocket connection
   */
  connect() {
    assert(this.socket === null, 'Socket Manager still connected');

    this._debug('stream_start');
    const {url} = this.requestParams;

    // Wrap retry logic around connection
    return PromiseRetry(retry => {
      return new Promise((resolve, reject) => {
        try {
          const ws = new this.WebSocketClass(url);
          ws.binaryType = 'arraybuffer';

          ws.onmessage = message => {
            const hasMetadata = Boolean(this.getMetadata());

            return parseStreamMessage({
              message: message.data,
              onResult: this.onXVIZMessage,
              onError: this.onError,
              debug: this._debug.bind(this, 'parse_message'),
              worker: hasMetadata && this.options.worker,
              maxConcurrency: this.options.maxConcurrency
            });
          };

          ws.onerror = this.onError;
          ws.onclose = event => {
            this._onWSClose(event);
            reject(event);
          };

          // On success, resolve the promise with the now ready socket
          ws.onopen = () => {
            this.socket = ws;
            this._onWSOpen();
            resolve(ws);
          };
        } catch (err) {
          reject(err);
        }
      }).catch(event => {
        this._onWSError(event);
        const isAbnormalClosure = event.code > 1000 && event.code !== 1005;

        // Retry if abnormal or connection never established
        if (isAbnormalClosure || !this.socket) {
          retry();
        }
      });
    }, this.retrySettings).catch(this._onWSError);
  }

函数头上的解释是:自动尝试打开一个XVIZ socket连接,返回Promise类型的WebSocket连接
其中用到promise-retry这个包,该包的介绍是:
Retries a function that returns a promise, leveraging the power of the retry module to the promises world.
这是一个非常好的实践。因为物联网应用会偶发网络断联,通过设置一定容差的连接尝试,实现续联。
其中WebSocket的使用可以参考WebSocket

          ws.onmessage = message => {
            const hasMetadata = Boolean(this.getMetadata());

            return parseStreamMessage({
              message: message.data,
              onResult: this.onXVIZMessage,
              onError: this.onError,
              debug: this._debug.bind(this, 'parse_message'),
              worker: hasMetadata && this.options.worker,
              maxConcurrency: this.options.maxConcurrency
            });
          };

websocket当收到消息时,首先判断是否有元数据。然后调用parseStreamMessage来处理StreamMessage。
其定义如下:
parseStreamMessage will parse the data and handle GLB encoded
XVIZ as well as other formats of the data.

XVIZ parsing functions will decode the binary container, parse the JSON and resolve binary
references. The application will get a "patched" JSON structure, with the difference from the basic
JSON protocol format being that certain arrays will be compact typed arrays instead of classic
JavaScript arrays.

If an attribute has been hydrated from binary then it will be transformed into the corresponding
TypeArray. Typed arrays do not support nesting so all numbers will be laid out flat and the
application needs to know how many values represent one element, for instance 3 values represent the
x, y, z coordinates of a point.

parseXVIZMessage

import {parseXVIZMessage, XVIZ_MESSAGE} from '@xviz/parser';

parseXVIZMessage({
  message,
  onResult: data => {
    switch (data.type) {
      case XVIZ_MESSAGE.METADATA: // do something
      case XVIZ_MESSAGE.TIMESLICE: // do something
      case XVIZ_MESSAGE.INCOMPLETE: // do something
    }
  },
  onError: console.error,
  worker: true
  maxConcurrency: 4
});

Parameters:

  • opts (Object)
    • message (Object|String|ArrayBuffer) - XVIZ message to decode.
    • onResult (Function) - callback if the message is parsed successfully. Receives a single
      argument data. data.type is one of XVIZ_MESSAGE.
    • onError (Function) - callback if the parser encouters an error.
    • debug (Function) - callback to log debug info.
    • worker (Boolean|String) - use Web Wroker to parse the message. Enabling worker is recommended
      to improve loading performance in production. Default false.
      • boolean: whether to use the default worker. Note that callbacks in XVIZ config are ignored by
        the default worker. If you need to inject custom hooks into the parsing process, create a
        custom worker using streamDataWorker.
      • string: a custom worker URL to use.
    • maxConcurrency (Number) - the max number of workers to use. Has no effect if worker is set
      to false. Default 4.
    • capacity (Number) - the limit on the number of messages to queue for the workers to process,
      has no effect if set ot null. Default null.
XVIZ_MESSAGE

Enum of stream message types.

  • METADATA
  • TIMESLICE
  • ERROR
  • INCOMPLETE
  • DONE
    这里this.onXVIZMessage在xviz-loader-interface.js定义如下
  onXVIZMessage = message => {
    switch (message.type) {
      case LOG_STREAM_MESSAGE.METADATA:
        this._onXVIZMetadata(message);
        this.emit('ready', message);
        break;

      case LOG_STREAM_MESSAGE.TIMESLICE:
        this._onXVIZTimeslice(message);
        this.emit('update', message);
        break;

      case LOG_STREAM_MESSAGE.DONE:
        this.emit('finish', message);
        break;

      default:
        this.emit('error', message);
    }
  };

当解析出的消息类型是METADATA则调用内部函数_onXVIZMetadata,并发出ready消息

  _onXVIZMetadata(metadata) {
    this.set('metadata', metadata);
    if (metadata.streams && Object.keys(metadata.streams).length > 0) {
      this.set('streamSettings', metadata.streams);
    }

    if (!this.streamBuffer) {
      throw new Error('streamBuffer is missing');
    }
    this.logSynchronizer = this.logSynchronizer || new StreamSynchronizer(this.streamBuffer);

    const timestamp = this.get('timestamp');
    const newTimestamp = Number.isFinite(timestamp) ? timestamp : metadata.start_time;
    if (Number.isFinite(newTimestamp)) {
      this.seek(newTimestamp);
    }
  }

这里主要设置一些参数,并定义了一个logSynchronizer其为StreamSynchronizer的一个实例。用以获取实际的stream数据。

StreamSynchronizer

The StreamSynchronizer class looks into a XVIZStreamBuffer and retrieves the most relevant datum from each stream that "matches" the current timestamp.

当解析出的消息类型是TIMESLICE则调用内部函数_onXVIZTimeslice,并发出update消息

  _onXVIZTimeslice(timeslice) {
    const oldStreamCount = this.streamBuffer.streamCount;
    const bufferUpdated = this.streamBuffer.insert(timeslice);
    if (bufferUpdated) {
      this._bumpDataVersion();
    }

    if (getXVIZConfig().DYNAMIC_STREAM_METADATA && this.streamBuffer.streamCount > oldStreamCount) {
      const streamsMetadata = {};
      const streamSettings = this.get('streamSettings');

      for (const streamName in timeslice.streams) {
        streamsMetadata[streamName] = timeslice.streams[streamName].__metadata;

        // Add new stream name to stream settings (default on)
        if (!(streamName in streamSettings)) {
          streamSettings[streamName] = true;
        }
      }
      this.set('streamsMetadata', streamsMetadata);
    }

    return bufferUpdated;
  }

其主要作用是将timeslice加入到当前的streamBuffer中,并当timeslice有新的stream且其中的metadata是DYNAMIC_STREAM_METADATA时在streamsMetadata添加该metadata,并更新streamsMetadata

自此一个XVIZLiveLoader实例化完毕并实现connect连接

getCurrentFrame

接下来看看如何获取当前帧的,getCurrentFrame的源码可以定位到xviz-loader-interface.js文件

  getCurrentFrame = createSelector(
    this,
    [this.getStreamSettings, this.getCurrentTime, this.getLookAhead, this._getDataVersion],
    // `dataVersion` is only needed to trigger recomputation.
    // The logSynchronizer has access to the timeslices.
    (streamSettings, timestamp, lookAhead) => {
      const {logSynchronizer} = this;
      if (logSynchronizer && Number.isFinite(timestamp)) {
        logSynchronizer.setTime(timestamp);
        logSynchronizer.setLookAheadTimeOffset(lookAhead);
        return logSynchronizer.getCurrentFrame(streamSettings);
      }
      return null;
    }
  );

这里有个很好的实践——createSelector,其在/modules/src/utils/create-selector.js中定义

import {createSelector} from 'reselect';

// reselect selectors do not update if called with the same arguments
// to support calling them without arguments, pass logLoader version
export default function createLogSelector(logLoader, ...args) {
  const selector = createSelector(...args);
  return () => selector(logLoader._version);
}

这里使用了一个reselect的库,

Reselect

Simple “selector” library for Redux (and others) inspired by getters in NuclearJS, subscriptions in re-frame and this proposal from speedskater.

  • Selectors can compute derived data, allowing Redux to store the minimal possible state.
  • Selectors are efficient. A selector is not recomputed unless one of its arguments changes.
  • Selectors are composable. They can be used as input to other selectors.
    通过介绍可以看出reselect库可以精简state结构,并且抑制不必要的计算。
    作者为了防止因参数相同不触发reselect的重新计算,通过dataVersion来进行重新计算的控制,以达到减少state的更改、节约前端的绘制。
    而获取当前数据帧则通过logSynchronizer.getCurrentFrame(streamSettings)得到。
    至此一个完整的获取当前数据帧的流程讲完了,其中有很多细节还需要消化,有一同研究的小伙伴可以交流,欢迎留言!
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343