本例使用boost::asio模拟jmeter做分布式压测网络部分的原理,虽然比较简单,但基本可以实现分布式压测的效果。
原理图如下,
为了实现方便,master只做一个总控用,没有用来执行性能测试了。
还有一点妥协是,master必须等所有node全部ready之后才能发送起测命令。为了少发几个消息[通过再多发送一对消息还是可以实现的],就没有刻意实现了。
所以启动的顺序是,先启动server,
接着启动所有node, node1, node2, node3..
最后启动master。
另外recent_msgs不需要保存,可以去掉,简化一些代码,也没做。主要是没时间。
整体代码如下,
CMakeLists.txt
cmake_minimum_required(VERSION 2.6)
project(perf_tool)
add_definitions(-std=c++14)
find_package(Boost REQUIRED COMPONENTS
system
filesystem
serialization
program_options
)
include_directories(${Boost_INCLUDE_DIRS} ${CMAKE_CURRENT_SOURCE_DIR}/../../include)
file( GLOB APP_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp)
foreach( sourcefile ${APP_SOURCES} )
file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${sourcefile})
if( NOT ${filename} MATCHES "parse_msg.cpp" )
string(REPLACE ".cpp" "" file ${filename})
add_executable(${file} ${sourcefile} "parse_msg.cpp")
target_link_libraries(${file} ${Boost_LIBRARIES})
target_link_libraries(${file} pthread)
endif( NOT ${filename} MATCHES "struct_header.cpp" )
endforeach( sourcefile ${APP_SOURCES} )
server.cpp
#include "chat_message.h"
#include <boost/asio.hpp>
#include <deque>
#include <iostream>
#include <memory>
#include <set>
#include <list>
#include <utility>
#include <cassert>
#include <cstdlib>
using boost::asio::ip::tcp;
using chat_message_queue = std::deque<chat_message>;
class chat_session;
using chat_session_ptr = std::shared_ptr<chat_session>;
std::string master_name = "";
// 聊天室类的声明
class chat_room {
public:
void join(chat_session_ptr);
void leave(chat_session_ptr);
void deliver(const chat_message&);
void deliver_to(const chat_message&, const std::string& paticipant_name);
private:
std::set<chat_session_ptr> participants_;
enum { max_recent_msgs = 100 };
chat_message_queue recent_msgs_;
};
class chat_session: public std::enable_shared_from_this<chat_session> {
public:
chat_session(tcp::socket socket, chat_room& room): socket_(std::move(socket)), room_(room) {}
void start() {
room_.join(shared_from_this());
// 启动服务时开始读取消息头
do_read_header();
}
void deliver(const chat_message& msg) {
bool write_in_progress = !write_msgs_.empty();
write_msgs_.push_back(msg);
// 为了保护do_write线程里面的deque,避免两个线程同时写
if(!write_in_progress) {
do_write();
}
}
std::string& get_client_name() {
return m_name;
}
private:
// 读取消息头
void do_read_header() {
auto self(shared_from_this());
boost::asio::async_read(
socket_,
boost::asio::buffer(read_msg_.data(), chat_message::header_length),
[this, self] (boost::system::error_code ec, std::size_t length) {
// 头部解析成功,获取到body_length
if(!ec && read_msg_.decode_header()) {
do_read_body();
} else {
room_.leave(shared_from_this());
}
}
);
}
void do_read_body() {
auto self(shared_from_this());
boost::asio::async_read(
socket_,
boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),
[this, self] (boost::system::error_code ec, std::size_t length) {
// 如果读取消息成功,没有error
if(!ec) {
// room_的deliver msg,会先更新recent_message queue,
// 然后调用各;个Session的Deliver message
// 将消息发给对应的client
// room_.deliver(read_msg_);
handleMessage();
// 接着读头,形成事件循环
do_read_header();
}else {
room_.leave(shared_from_this());
}
}
);
}
json to_json() {
std::string buffer(read_msg_.body(), read_msg_.body() + read_msg_.body_length());
std::cout << "raw message server: " << buffer << std::endl;
std::stringstream ss(buffer);
json json_obj;
try {
json_obj = json::parse(ss.str());
}catch(std::exception& ex) {
std::cerr << "解析 json对象 失败!!" << std::endl;
std::cerr << ex.what() << std::endl;
}
return json_obj;
}
// 处理接收到的客户端的消息的函数
void handleMessage() {
// master 和 slave都会发这个,注册自己的名字
if(read_msg_.type() == MT_BIND_NAME) {
auto json_obj = to_json();
m_name = json_obj["name"].get<std::string>();
// 只有master会发launch task message
} else if(read_msg_.type() == MT_LAUNCH_TASK_MSG) {
master_name = m_name;
std::cerr << "Master name: " << master_name << std::endl;
auto json_obj = to_json();
m_chatInformation = json_obj["information"].get<std::string>();
auto rinfo = buildRoomInfo();
chat_message msg;
msg.setMessage(MT_LAUNCH_TASK_MSG, rinfo);
room_.deliver(msg);
// master, slave执行完性能测试之后,都会发这个消息
} else if(read_msg_.type() == MT_SEND_TASK_INFO_MSG){
std::cerr << "send task info" << std::endl;
std::cerr << "Master name in task info: " << master_name << std::endl;
auto json_obj = to_json();
m_chatInformation = json_obj["information"].get<std::string>();
auto rinfo = buildRoomInfo();
chat_message msg;
msg.setMessage(MT_SEND_TASK_INFO_MSG, rinfo);
room_.deliver_to(msg, master_name);
} else {
// 不可用消息,啥也不做
}
}
// 构建一个RoomInformation信息
std::string buildRoomInfo() const {
std::ostringstream oss;
oss << R"({"name": )";
oss << R"(")" << m_name << R"(")" << ",";
oss << R"("information": )";
oss << R"(")" << m_chatInformation << R"(")";
oss << "}";
std::cout << "Room info: " << oss.str() << std::endl;
return std::move(oss.str());
}
void do_write() {
auto self(shared_from_this());
boost::asio::async_write(
socket_,
boost::asio::buffer(write_msgs_.front().data(), write_msgs_.front().length()),
[this, self] (boost::system::error_code ec, std::size_t length) {
// 如果写队头信息成功,没有错误
if(!ec) {
write_msgs_.pop_front();
// 如果还有得写,就接着写
if(!write_msgs_.empty()) {
do_write();
}
}else {
room_.leave(shared_from_this());
}
}
);
}
tcp::socket socket_;
// room的生命周期必须长于session的生命周期,
// 否则会因为持有无效的引用而翻车
chat_room& room_;
chat_message read_msg_;
chat_message_queue write_msgs_;
std::string m_name; // 客户端姓名
std::string m_chatInformation; // 客户端当前的消息
};
void chat_room::join(chat_session_ptr participant) {
participants_.insert(participant);
// 不需要广播历史消息这里
}
void chat_room::leave(chat_session_ptr participant) {
participants_.erase(participant);
}
// 消息分发函数
void chat_room::deliver(const chat_message& msg) {
recent_msgs_.push_back(msg);
// recent_msgs_调整到最大值
while(recent_msgs_.size() > max_recent_msgs) {
recent_msgs_.pop_front();
}
// 给每个群聊参与者群发消息
for(auto & participant: participants_) {
participant->deliver(msg);
}
}
void chat_room::deliver_to(const chat_message& msg, const std::string& paticipant_name) {
recent_msgs_.push_back(msg);
// recent_msgs_调整到最大值
while(recent_msgs_.size() > max_recent_msgs) {
recent_msgs_.pop_front();
}
// 给每个群聊参与者群发消息
for(auto & participant: participants_) {
if(participant->get_client_name() == paticipant_name) {
participant->deliver(msg);
}
}
}
class chat_server {
public:
chat_server(boost::asio::io_service& io_service,
const tcp::endpoint& endpoint): acceptor_(io_service, endpoint),
socket_(io_service){
do_accept();
}
// 接收来自客户端的连接的函数
void do_accept() {
acceptor_.async_accept(
socket_,
[this] (boost::system::error_code ec) {
// 如果接收连接成功,没有错误
if(!ec) {
auto session = std::make_shared<chat_session>(std::move(socket_),
room_
);
session->start();
}
// 无论成功或失败,都继续接收连接
do_accept();
}
);
}
private:
tcp::acceptor acceptor_;
tcp::socket socket_;
chat_room room_;
};
int main(int argc, char* argv[]) {
try {
if(argc < 2) {
std::cerr << "Usage: chat_server <port> [<port> ...]" << std::endl;
return 1;
}
boost::asio::io_service io_service;
std::list<chat_server> servers;
for(int i=1; i<argc; ++i) {
tcp::endpoint endpoint(tcp::v4(), std::atoi(argv[i]));
servers.emplace_back(io_service, endpoint);
}
io_service.run();
}catch(std::exception& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
return 0;
}
master.cpp
#include "chat_message.h"
#include <boost/asio.hpp>
#include <deque>
#include <iostream>
#include <thread>
#include <cstdlib>
using boost::asio::ip::tcp;
using chat_message_queue = std::deque<chat_message>;
int slave_count = 0;
class chat_client {
public:
chat_client(boost::asio::io_service& io_service,
tcp::resolver::iterator endpoint_iterator
): io_service_(io_service), socket_(io_service) {
do_connect(endpoint_iterator);
}
void write(const chat_message& msg) {
// write是由主线程往子线程写东西
// 所以需要使用post提交到子线程运行
// 使得所有io操作都由io_service的子线程掌握
io_service_.post(
[this, msg] () {
bool write_in_progress = !write_msgs_.empty();
write_msgs_.push_back(msg);
if(!write_in_progress) {
do_write();
}
}
);
}
void close() {
io_service_.post(
[this] () {
socket_.close();
}
);
}
private:
void do_connect(tcp::resolver::iterator endpoint_iterator) {
boost::asio::async_connect(
socket_,
endpoint_iterator,
[this] (boost::system::error_code ec, tcp::resolver::iterator it) {
if(!ec) {
// 如果连接成功,读取消息头
do_read_header();
}
}
);
}
void do_read_header() {
boost::asio::async_read(
socket_,
boost::asio::buffer(read_msg_.data(), chat_message::header_length),
[this] (boost::system::error_code ec, std::size_t length) {
if(!ec && read_msg_.decode_header()) {
// 如果没有错误,并且Decode_header成功,成功读取到body_length
do_read_body();
}else {
// 读取失败时关闭与服务端的连接,退出事件循环
socket_.close();
}
}
);
}
json to_json() {
std::string buffer(read_msg_.body(), read_msg_.body() + read_msg_.body_length());
std::stringstream ss(buffer);
auto json_obj = json::parse(ss.str());
return json_obj;
}
void do_read_body() {
boost::asio::async_read(
socket_,
boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),
[this] (boost::system::error_code ec, std::size_t length) {
if(!ec) {
// 校验一下消息长度和消息类型,
// 证明确实发过来的是RomInformation消息
if(read_msg_.type() == MT_SEND_TASK_INFO_MSG) {
auto json_obj = to_json();
std::cout << "client ";
std::cout << json_obj["name"].get<std::string>();
std::cout << " says: ";
std::cout << json_obj["information"].get<std::string>();
std::cout << "\n";
++receive_slave_cout;
if(receive_slave_cout == slave_count) {
//TODO: 汇总计算结果
std::cerr << "开始汇总计算性能测试结果" << std::endl;
close();
}
}
// 调用do_read_header函数串联起事件链,接着读
do_read_header();
}else {
socket_.close();
}
}
);
}
// 向服务端真正发送消息的函数
void do_write() {
boost::asio::async_write(
socket_,
boost::asio::buffer(
write_msgs_.front().data(),
write_msgs_.front().length()
),
[this] (boost::system::error_code ec, std::size_t length) {
if(!ec) {
// 一直写直到写完
write_msgs_.pop_front();
if(!write_msgs_.empty()) {
do_write();
}
}else {
socket_.close();
}
}
);
}
// 注意使用了引用类型,
// io_service对象的生命周期必须要大于chat_client对象的生命周期
// 否则会出现引用失效,导致异常
boost::asio::io_service& io_service_;
tcp::socket socket_;
chat_message read_msg_;
chat_message_queue write_msgs_;
int receive_slave_cout {0};
};
int main(int argc, char* argv[]) {
try {
if(argc != 3) {
std::cerr << "Usage: chat_client <host> <port>" << std::endl;
return 1;
}
//TODO: 读配置文件或者命令行参数,获取SLAVE_COUNT
slave_count = 2;
boost::asio::io_service io_service;
tcp::resolver resolver(io_service);
auto endpoint_iterator = resolver.resolve({argv[1], argv[2]});
chat_client c(io_service, endpoint_iterator);
std::thread t([&io_service]() {io_service.run(); });
char line[chat_message::max_body_length + 1];
chat_message msg;
auto type = 0;
std::string input = "BindName master";
std::string output;
if(parseMessage(input, &type, output)) {
msg.setMessage(type, output.data(), output.size());
c.write(msg);
}
input = "LaunchTask task1";
if(parseMessage(input, &type, output)) {
msg.setMessage(type, output.data(), output.size());
c.write(msg);
}
t.join();
}catch(std::exception& ex) {
std::cerr << "Exception: " << ex.what() << std::endl;
}
return 0;
}
slave.cpp
#include "chat_message.h"
#include <boost/asio.hpp>
#include <deque>
#include <iostream>
#include <thread>
#include <cstdlib>
using boost::asio::ip::tcp;
using chat_message_queue = std::deque<chat_message>;
class chat_client {
public:
chat_client(boost::asio::io_service& io_service,
tcp::resolver::iterator endpoint_iterator
): io_service_(io_service), socket_(io_service) {
do_connect(endpoint_iterator);
}
void write(const chat_message& msg) {
// write是由主线程往子线程写东西
// 所以需要使用post提交到子线程运行
// 使得所有io操作都由io_service的子线程掌握
io_service_.post(
[this, msg] () {
bool write_in_progress = !write_msgs_.empty();
write_msgs_.push_back(msg);
if(!write_in_progress) {
do_write();
}
}
);
}
void close() {
io_service_.post(
[this] () {
socket_.close();
}
);
}
private:
void do_connect(tcp::resolver::iterator endpoint_iterator) {
boost::asio::async_connect(
socket_,
endpoint_iterator,
[this] (boost::system::error_code ec, tcp::resolver::iterator it) {
if(!ec) {
// 如果连接成功,读取消息头
do_read_header();
}
}
);
}
void do_read_header() {
boost::asio::async_read(
socket_,
boost::asio::buffer(read_msg_.data(), chat_message::header_length),
[this] (boost::system::error_code ec, std::size_t length) {
if(!ec && read_msg_.decode_header()) {
// 如果没有错误,并且Decode_header成功,成功读取到body_length
do_read_body();
}else {
// 读取失败时关闭与服务端的连接,退出事件循环
socket_.close();
}
}
);
}
json to_json() {
std::string buffer(read_msg_.body(), read_msg_.body() + read_msg_.body_length());
std::stringstream ss(buffer);
auto json_obj = json::parse(ss.str());
return json_obj;
}
void do_read_body() {
boost::asio::async_read(
socket_,
boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),
[this] (boost::system::error_code ec, std::size_t length) {
if(!ec) {
// 校验一下消息长度和消息类型,
// 证明确实发过来的是RomInformation消息
if(read_msg_.type() == MT_LAUNCH_TASK_MSG) {
// TODO: 启动性能测试,完事以后发送
// send_task_info_msg
auto json_obj = to_json();
std::cout << "client ";
std::cout << json_obj["name"].get<std::string>();
std::cout << " says: ";
std::cout << json_obj["information"].get<std::string>();
std::cout << "\n";
std::cerr << "开始做性能测试..." << std::endl;
std::cerr << "结束做性能测试..." << std::endl;
chat_message msg;
auto type = 0;
std::string input("SendTaskInfo TaskSuccess");
std::string output;
if(parseMessage(input, &type, output)) {
msg.setMessage(type, output.data(), output.size());
write(msg);
}
close();
}
// 调用do_read_header函数串联起事件链,接着读
do_read_header();
}else {
socket_.close();
}
}
);
}
// 向服务端真正发送消息的函数
void do_write() {
boost::asio::async_write(
socket_,
boost::asio::buffer(
write_msgs_.front().data(),
write_msgs_.front().length()
),
[this] (boost::system::error_code ec, std::size_t length) {
if(!ec) {
// 一直写直到写完
write_msgs_.pop_front();
if(!write_msgs_.empty()) {
do_write();
}
}else {
socket_.close();
}
}
);
}
// 注意使用了引用类型,
// io_service对象的生命周期必须要大于chat_client对象的生命周期
// 否则会出现引用失效,导致异常
boost::asio::io_service& io_service_;
tcp::socket socket_;
chat_message read_msg_;
chat_message_queue write_msgs_;
};
int main(int argc, char* argv[]) {
try {
if(argc != 3) {
std::cerr << "Usage: chat_client <host> <port>" << std::endl;
return 1;
}
boost::asio::io_service io_service;
tcp::resolver resolver(io_service);
auto endpoint_iterator = resolver.resolve({argv[1], argv[2]});
chat_client c(io_service, endpoint_iterator);
chat_message msg;
auto type = 0;
std::string slave_name {};
std::cout << "Pls input name: " << std::endl;
std::cin >> slave_name;
std::string input = "BindName " + slave_name;
std::string output;
if(parseMessage(input, &type, output)) {
msg.setMessage(type, output.data(), output.size());
c.write(msg);
}
std::thread t([&io_service]() {io_service.run(); });
t.join();
}catch(std::exception& ex) {
std::cerr << "Exception: " << ex.what() << std::endl;
}
return 0;
}
chat_message.h
#ifndef _CHAT_MESSAGE_H_
#define _CHAT_MESSAGE_H_
#include "parse_msg.h"
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <cassert>
#include <iostream>
class chat_message {
public:
// Header的大小变为8个字节,使用sizeof关键字进行计算
enum { header_length = sizeof(Header) };
enum { max_body_length = 512 };
chat_message() {}
// 这里返回的data不可以修改
const char* data() const { return data_; }
char* data() { return data_; }
// 计算总长度时,需要通过m_header获取到bodySize
std::size_t length() const { return header_length + m_header.bodySize; }
// body为 data_往后面移动 head_length个字节
const char* body() const { return data_ + header_length; }
char* body() { return data_ + header_length; }
int type() const { return m_header.type; }
std::size_t body_length() const { return m_header.bodySize; }
void setMessage(int messageType, const void* buffer, size_t bufferSize) {
// 确认body大小未超过限制
assert(bufferSize < max_body_length);
m_header.bodySize = bufferSize;
m_header.type = messageType;
std::memcpy(body(), buffer, bufferSize);
std::memcpy(data(), &m_header, sizeof(m_header));
}
void setMessage(int messageType, const std::string& buffer) {
setMessage(messageType, buffer.data(), buffer.size());
}
bool decode_header() {
std::memcpy(&m_header, data(), header_length);
if(m_header.bodySize > max_body_length) {
std::cout <<"body size: " << m_header.bodySize << " header type:" << m_header.type << std::endl;
return false;
}
return true;
}
private:
char data_[header_length+max_body_length];
Header m_header;
};
#endif
parse_msg.h
#ifndef _PARSE_MSG_H_
#define _PARSE_MSG_H_
#include "json/json.hpp"
#include <sstream>
#include <cstdlib>
#include <string>
#include <iostream>
#include <cstring>
using json = nlohmann::json;
struct Header {
int bodySize; // 包体大小
int type; // 消息类型
};
enum MessageType {
MT_BIND_NAME = 1,
MT_LAUNCH_TASK_MSG = 2,
MT_SEND_TASK_INFO_MSG = 3,
};
bool parseMessage(const std::string& input, int* type, std::string& outbuffer);
#endif
parse_msg.cpp
#include "parse_msg.h"
#include <sstream>
// 消息解析函数
// input 输入的消息字符串
// type 传出的消息类型指针
// outbuffer 输出的用于发送的消息内容字符串
bool parseMessage(const std::string& input, int* type, std::string& outbuffer) {
auto pos = input.find_first_of(" ");
// 消息中没找到空格
if(pos == std::string::npos) {
return false;
}
if(pos == 0) {
return false;
}
auto command = input.substr(0, pos);
// Bind姓名消息
if(command == "BindName") {
std::string name = input.substr(pos+1);
if(name.size()>32) {
std::cerr << "姓名的长度大于32个字节!" << std::endl;
return false;
}
if(type) {
*type = MT_BIND_NAME;
}
std::ostringstream oss;
oss << R"({"name": )";
oss << R"(")" << name << R"(")" << "}";
auto json_obj = json::parse(oss.str());
outbuffer = json_obj.dump();
return true;
// 聊天消息
}else if(command == "LaunchTask") {
std::string task = input.substr(pos+1);
if(task.size() > 1000000) {
std::cerr << "消息的长度大于1000000个字节!" << std::endl;
return false;
}
std::ostringstream oss;
oss << R"({"information" :)";
oss << R"(")" << task << R"(")" << "}";
auto json_obj = json::parse(oss.str());
outbuffer = json_obj.dump();
if(type) {
*type = MT_LAUNCH_TASK_MSG;
}
return true;
} else if(command == "SendTaskInfo") {
std::string task_res = input.substr(pos+1);
if(task_res.size() > 1000000) {
std::cerr << "消息的长度大于1000000个字节!" << std::endl;
return false;
}
std::ostringstream oss;
oss << R"({"information" :)";
oss << R"(")" << task_res << R"(")" << "}";
auto json_obj = json::parse(oss.str());
outbuffer = json_obj.dump();
if(type) {
*type = MT_SEND_TASK_INFO_MSG;
}
return true;
}
// 不支持的消息类型,返回false
return false;
}
程序输出如下,