ps-lite源码分析: include/ps/internal/message.h

/**
 *  Copyright (c) 2015 by Contributors
 */
#ifndef PS_INTERNAL_MESSAGE_H_
#define PS_INTERNAL_MESSAGE_H_
#include <vector>
#include <limits>
#include <string>
#include <sstream>
#include "ps/sarray.h"
namespace ps {

/** \brief data type */
enum DataType {
  CHAR, INT8, INT16, INT32, INT64,
  UINT8, UINT16, UINT32, UINT64,
  FLOAT, DOUBLE, OTHER
};

/** \brief data type name */
static const char* DataTypeName[] = {
  "CHAR", "INT8", "INT16", "INT32", "INT64",
  "UINT8", "UINT16", "UINT32", "UINT64",
  "FLOAT", "DOUBLE", "OTHER"
};

/**
 * \brief compare if V and W are the same type
 */
template<typename V, typename W>
inline bool SameType() {
  return std::is_same<typename std::remove_cv<V>::type, W>::value;
}

/**
 * \brief return the DataType of V
 */
template<typename V>
DataType GetDataType() {
  if (SameType<V, int8_t>()) {
    return INT8;
  } else if (SameType<V, int16_t>()) {
    return INT16;
  } else if (SameType<V, int32_t>()) {
    return INT32;
  } else if (SameType<V, int64_t>()) {
    return INT64;
  } else if (SameType<V, uint8_t>()) {
    return UINT8;
  } else if (SameType<V, uint16_t>()) {
    return UINT16;
  } else if (SameType<V, uint32_t>()) {
    return UINT32;
  } else if (SameType<V, uint64_t>()) {
    return UINT64;
  } else if (SameType<V, float>()) {
    return FLOAT;
  } else if (SameType<V, double>()) {
    return DOUBLE;
  } else {
    return OTHER;
  }
}

/** 分布式节点类 */
/**
 * \brief information about a node
 */
struct Node {
  /** 空ID默认值,用于判断是否空节点 */
  /** \brief the empty value */
  static const int kEmpty;

  /** \brief default constructor */
  Node() : id(kEmpty), port(kEmpty), is_recovery(false) {}

  /** 节点角色:服务器、工作者、调度者 */
  /** \brief node roles */
  enum Role { SERVER, WORKER, SCHEDULER };
  
  /** 调试信息 */
  /** \brief get debug string */
  std::string DebugString() const {
    std::stringstream ss;
    ss << "role=" << (role == SERVER ? "server" : (role == WORKER ? "worker" : "scheduler"))
       << (id != kEmpty ? ", id=" + std::to_string(id) : "")
       << ", ip=" << hostname << ", port=" << port << ", is_recovery=" << is_recovery;

    return ss.str();
  }

  /** 简洁调试信息 */
  /** \brief get short debug string */
  std::string ShortDebugString() const {
    std::string str = role == SERVER ? "S" : (role == WORKER ? "W" : "H");
    if (id != kEmpty) str += "[" + std::to_string(id) + "]";
    return str;
  }

  /** 节点角色 */
  /** \brief the role of this node */
  Role role;

  /** 节点编号 */
  /** \brief node id */
  int id;

  /** 客户编号 */
  /** \brief customer id */
  int customer_id;

  /** 主机名或IP */
  /** \brief hostname or ip */
  std::string hostname;

  /** 端口号 */
  /** \brief the port this node is binding */
  int port;

  /** 节点失败恢复重建标记 */
  /** \brief whether this node is created by failover */
  bool is_recovery;
};


/** 系统控制消息的元信息类 */
/**
 * \brief meta info of a system control message
 */
struct Control {
  /** \brief empty constructor */
  Control() : cmd(EMPTY) { }

  /** 空命令 */
  /** \brief return true is empty */
  inline bool empty() const { return cmd == EMPTY; }

  /**  调试信息 */
  /** \brief get debug string */
  std::string DebugString() const {
    if (empty()) return "";
    std::vector<std::string> cmds = {
      "EMPTY", "TERMINATE", "ADD_NODE", "BARRIER", "ACK", "HEARTBEAT"};
    std::stringstream ss;
    ss << "cmd=" << cmds[cmd];
    if (node.size()) {
      ss << ", node={";
      for (const Node& n : node) ss << " " << n.DebugString();
      ss << " }";
    }
    if (cmd == BARRIER) ss << ", barrier_group=" << barrier_group;
    if (cmd == ACK) ss << ", msg_sig=" << msg_sig;
    return ss.str();
  }

  /** 支持的所有控制命令 */
  /** \brief all commands */
  enum Command { EMPTY, TERMINATE, ADD_NODE, BARRIER, ACK, HEARTBEAT };
  
  /** 控制命令 */
  /** \brief the command */
  Command cmd;
  
  /** 节点信息 */
  /** \brief node infos */
  std::vector<Node> node;
  
  /** 所属节点组 */
  /** \brief the node group for a barrier, such as kWorkerGroup */
  int barrier_group;
  
 /** 消息签名 */
  /** message signature */
  uint64_t msg_sig;
};

/** 消息的元信息 */
/**
 * \brief meta info of a message
 */
struct Meta {
  /** 空标志 */
  /** \brief the empty value */
  static const int kEmpty;

  /** \brief default constructor */
  Meta() : head(kEmpty), app_id(kEmpty), customer_id(kEmpty),
           timestamp(kEmpty), sender(kEmpty), recver(kEmpty),
           request(false), push(false), pull(false), simple_app(false) {}
  
  /** 调试信息 */
  std::string DebugString() const {
    std::stringstream ss;
    if (sender == Node::kEmpty) {
      ss << "?";
    } else {
      ss << sender;
    }
    ss <<  " => " << recver;
    ss << ". Meta: request=" << request;
    if (timestamp != kEmpty) ss << ", timestamp=" << timestamp;
    if (!control.empty()) {
      ss << ", control={ " << control.DebugString() << " }";
    } else {
      ss << ", app_id=" << app_id
         << ", customer_id=" << customer_id
         << ", simple_app=" << simple_app
         << ", push=" << push;
    }
    if (head != kEmpty) ss << ", head=" << head;
    if (body.size()) ss << ", body=" << body;
    if (data_type.size()) {
      ss << ", data_type={";
      for (auto d : data_type) ss << " " << DataTypeName[static_cast<int>(d)];
      ss << " }";
    }
    return ss.str();
  }
  
 /** 头信息 */
 /** \brief an int head */
  int head;
  
  /** 消息应用方的唯一ID */
  /** \brief the unique id of the application of messsage is for*/
  int app_id;
  
  /** 客户ID */
  /** \brief customer id*/
  int customer_id;
  
  /** 消息的时间戳 */
  /** \brief the timestamp of this message */
  int timestamp;
  
  /** 消息发送方的节点ID */
  /** \brief the node id of the sender of this message */
  int sender;
  
  /** 消息接收方的节点ID */
  /** \brief the node id of the receiver of this message */
  int recver;
  
  /** 是否是请求消息 */
  /** \brief whether or not this is a request message*/
  bool request;
  
  /** 是否属于推送消息 */
  /** \brief whether or not a push message */
  bool push;
  
  /** 是否属于拉取消息 */
  /** \brief whether or not a pull message */
  bool pull;
  
  /** 标志  */
  /** \brief whether or not it's for SimpleApp */
  bool simple_app;
 
  /** 消息内容 */
  /** \brief an string body */
  std::string body;
  
  /**  消息中数据内容的数据类型 */
  /** \brief data type of message.data[i] */
  std::vector<DataType> data_type;
  
  /**  控制信息 */
  /** \brief system control message */
  Control control;
  
  /** 数据内容大小 */
  /** \brief the byte size */
  int data_size = 0;
  
  /** 消息优先级 */
  /** \brief message priority */
  int priority = 0;
};


/** 节点之间通信的消息类 */
/**
 * \brief messages that communicated amaong nodes.
 */
struct Message {
  /** 消息元数据:描述消息数据域的类型、大小信息以及消息自身相关信息 */
  /** \brief the meta info of this message */
  Meta meta;
  
  /** 消息数据域 */
  /** \brief the large chunk of data of this message */
  std::vector<SArray<char> > data;
  
 /** 填充消息:数组字节化后填入数据域,并记录数据类型、大小到消息元数据中 */
 /**
   * \brief push array into data, and add the data type
   */
  template <typename V>
  void AddData(const SArray<V>& val) {
    CHECK_EQ(data.size(), meta.data_type.size());
    meta.data_type.push_back(GetDataType<V>());
    SArray<char> bytes(val);
    meta.data_size += bytes.size();
    data.push_back(bytes);
  }

  /** 调试信息 */
  std::string DebugString() const {
    std::stringstream ss;
    ss << meta.DebugString();
    if (data.size()) {
      ss << " Body:";
      for (const auto& d : data) ss << " data_size=" << d.size();
    }
    return ss.str();
  }
};
}  // namespace ps
#endif  // PS_INTERNAL_MESSAGE_H_
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容