(13)udp服务与客户端(Reactor部分)-【Lars-基于C++负载均衡远程服务器调度系统教程】

【Lars教程目录】

Lars源代码
https://github.com/aceld/Lars


【Lars系统概述】
第1章-概述
第2章-项目目录构建


【Lars系统之Reactor模型服务器框架模块】
第1章-项目结构与V0.1雏形
第2章-内存管理与Buffer封装
第3章-事件触发EventLoop
第4章-链接与消息封装
第5章-Client客户端模型
第6章-连接管理及限制
第7章-消息业务路由分发机制
第8章-链接创建/销毁Hook机制
第9章-消息任务队列与线程池
第10章-配置文件读写功能
第11章-udp服务与客户端
第12章-数据传输协议protocol buffer
第13章-QPS性能测试
第14章-异步消息任务机制
第15章-链接属性设置功能


【Lars系统之DNSService模块】
第1章-Lars-dns简介
第2章-数据库创建
第3章-项目目录结构及环境构建
第4章-Route结构的定义
第5章-获取Route信息
第6章-Route订阅模式
第7章-Backend Thread实时监控


【Lars系统之Report Service模块】
第1章-项目概述-数据表及proto3协议定义
第2章-获取report上报数据
第3章-存储线程池及消息队列


【Lars系统之LoadBalance Agent模块】
第1章-项目概述及构建
第2章-主模块业务结构搭建
第3章-Report与Dns Client设计与实现
第4章-负载均衡模块基础设计
第5章-负载均衡获取Host主机信息API
第6章-负载均衡上报Host主机信息API
第7章-过期窗口清理与过载超时(V0.5)
第8章-定期拉取最新路由信息(V0.6)
第9章-负载均衡获取Route信息API(0.7)
第10章-API初始化接口(V0.8)
第11章-Lars Agent性能测试工具
第12章- Lars启动工具脚本


12) udp服务与客户端

​ 接下来为了让Reactor框架功能更加丰富,结合之前的功能,再加上udpserver的服务接口。udp我们暂时不考虑加线程池实现,只是单线程的处理方式。

12.1 udp_server服务端功能实现

lars_reactor/include/udp_server.h

#pragma  once

#include <netinet/in.h>
#include "event_loop.h"
#include "net_connection.h"
#include "message.h"

class udp_server :public net_connection 
{
public:
    udp_server(event_loop *loop, const char *ip, uint16_t port);

    virtual int send_message(const char *data, int msglen, int msgid);

    //注册消息路由回调函数
    void add_msg_router(int msgid, msg_callback* cb, void *user_data = NULL);

    ~udp_server();

    //处理消息业务
    void do_read();
    
private:
    int _sockfd;

    char _read_buf[MESSAGE_LENGTH_LIMIT];
    char _write_buf[MESSAGE_LENGTH_LIMIT];

    //事件触发
    event_loop* _loop;

    //服务端ip
    struct sockaddr_in _client_addr;
    socklen_t _client_addrlen;
    
    //消息路由分发
    msg_router _router;
};

​ 对应的方法实现方式如下:

lars_reactor/src/udp_server.cpp

#include <signal.h>
#include <unistd.h>
#include <strings.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <string.h>
#include "udp_server.h"


void read_callback(event_loop *loop, int fd, void *args)
{
    udp_server *server = (udp_server*)args;

    //处理业务函数
    server->do_read();
}

void udp_server::do_read()
{
    while (true) {
        int pkg_len = recvfrom(_sockfd, _read_buf, sizeof(_read_buf), 0, (struct sockaddr *)&_client_addr, &_client_addrlen);

        if (pkg_len == -1) {
            if (errno == EINTR) {
                continue;
            }
            else if (errno == EAGAIN) {
                break;
            }
            else {
                perror("recvfrom\n");
                break;
            }
        }

        //处理数据
        msg_head head; 
        memcpy(&head, _read_buf, MESSAGE_HEAD_LEN);
        if (head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0 || head.msglen + MESSAGE_HEAD_LEN != pkg_len) {
            //报文格式有问题
            fprintf(stderr, "do_read, data error, msgid = %d, msglen = %d, pkg_len = %d\n", head.msgid, head.msglen, pkg_len);
            continue;
        }

        //调用注册的路由业务
        _router.call(head.msgid, head.msglen, _read_buf+MESSAGE_HEAD_LEN, this);
    }
}


udp_server::udp_server(event_loop *loop, const char *ip, uint16_t port)
{
    //1 忽略一些信号
    if (signal(SIGHUP, SIG_IGN) == SIG_ERR) {
        perror("signal ignore SIGHUB");
        exit(1);
    }
    if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
        perror("signal ignore SIGPIPE");
        exit(1);
    }
    
    //2 创建套接字
    //SOCK_CLOEXEC在execl中使用该socket则关闭,在fork中使用该socket不关闭
    _sockfd = socket(AF_INET, SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_UDP);
    if (_sockfd == -1) {
        perror("create udp socket");
        exit(1);
    }

    //3 设置服务ip+port
    struct sockaddr_in servaddr;
    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    inet_aton(ip, &servaddr.sin_addr);//设置ip
    servaddr.sin_port = htons(port);//设置端口

    //4 绑定
    bind(_sockfd, (const struct sockaddr*)&servaddr, sizeof(servaddr));
    
    //3 添加读业务事件
    _loop = loop;

    bzero(&_client_addr, sizeof(_client_addr));
    _client_addrlen = sizeof(_client_addr);
    

    printf("server on %s:%u is running...\n", ip, port);

    _loop->add_io_event(_sockfd, read_callback, EPOLLIN, this);
    
}

int udp_server::send_message(const char *data, int msglen, int msgid)
{
    if (msglen > MESSAGE_LENGTH_LIMIT) {
        fprintf(stderr, "too large message to send\n");
        return -1;
    }

    msg_head head;
    head.msglen = msglen;
    head.msgid = msgid;

    memcpy(_write_buf,  &head, MESSAGE_HEAD_LEN);
    memcpy(_write_buf + MESSAGE_HEAD_LEN, data, msglen);

    int ret = sendto(_sockfd, _write_buf, msglen + MESSAGE_HEAD_LEN, 0, (struct sockaddr*)&_client_addr, _client_addrlen);
    if (ret == -1) {
        perror("sendto()..");
        return -1;
    }

    return ret;
}

//注册消息路由回调函数
void udp_server::add_msg_router(int msgid, msg_callback* cb, void *user_data)
{
    _router.register_msg_router(msgid, cb, user_data);
}

udp_server::~udp_server()
{
    _loop->del_io_event(_sockfd);
    close(_sockfd);
}

​ 这里面实现的方式和tcp_server的实现方式几乎一样,需要注意的是,udp的socket编程是不需要listen的,而且也不需要accept。所以recvfrom就能够得知每个包的对应客户端是谁,然后回执消息给对应的客户端就可以。因为没有连接,所以都是以包为单位来处理的,一个包一个包处理。可能相邻的两个包来自不同的客户端。

12.2 udp_client客户端功能实现

lars_reactor/include/udp_client.h

#pragma once

#include "net_connection.h"
#include "message.h"
#include "event_loop.h"

class udp_client: public net_connection
{
public:
    udp_client(event_loop *loop, const char *ip, uint16_t port);
    ~udp_client();

    void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL);

    virtual int send_message(const char *data, int msglen, int msgid);

    //处理消息
    void do_read();

private:

    int _sockfd;

    char _read_buf[MESSAGE_LENGTH_LIMIT];
    char _write_buf[MESSAGE_LENGTH_LIMIT];

    //事件触发
    event_loop *_loop;

    //消息路由分发
    msg_router _router;
};

lars_reactor/src/udp_client.cpp

#include "udp_client.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <strings.h>
#include <string.h>
#include <stdio.h>


void read_callback(event_loop *loop, int fd, void *args)
{
    udp_client *client = (udp_client*)args;
    client->do_read();
}



udp_client::udp_client(event_loop *loop, const char *ip, uint16_t port)
{
    //1 创建套接字
    _sockfd = socket(AF_INET, SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_UDP);
    if (_sockfd == -1) {
        perror("create socket error");
        exit(1);
    }

    struct sockaddr_in servaddr;
    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    inet_aton(ip, &servaddr.sin_addr);
    servaddr.sin_port = htons(port);

    //2 链接
    int ret = connect(_sockfd, (const struct sockaddr*)&servaddr, sizeof(servaddr));
    if (ret  == -1) {
        perror("connect");
        exit(1);
    }


    //3 添加读事件
    _loop = loop; 
    _loop->add_io_event(_sockfd, read_callback, EPOLLIN, this);
}

udp_client::~udp_client()
{
    _loop->del_io_event(_sockfd);
    close(_sockfd);
}

//处理消息
void udp_client::do_read()
{
    while (true) {
        int pkt_len = recvfrom(_sockfd, _read_buf, sizeof(_read_buf), 0, NULL, NULL);
        if (pkt_len == -1) {
            if (errno == EINTR) {
                continue;
            }
            else if (errno == EAGAIN) {
                break;
            }
            else {
                perror("recvfrom()");
                break;
            }
        }

        //处理客户端包
        msg_head head; 
        memcpy(&head, _read_buf, MESSAGE_HEAD_LEN);
        if (head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0 || head.msglen + MESSAGE_HEAD_LEN != pkt_len) {
            //报文格式有问题
            fprintf(stderr, "do_read, data error, msgid = %d, msglen = %d, pkt_len = %d\n", head.msgid, head.msglen, pkt_len);
            continue;
        }

        //调用注册的路由业务
        _router.call(head.msgid, head.msglen, _read_buf+MESSAGE_HEAD_LEN, this);
    }
}
    
void udp_client::add_msg_router(int msgid, msg_callback *cb, void *user_data)
{
    _router.register_msg_router(msgid, cb, user_data);
}

int udp_client::send_message(const char *data, int msglen, int msgid)
{
    if (msglen > MESSAGE_LENGTH_LIMIT) {
        fprintf(stderr, "too large message to send\n");
        return -1;
    }

    msg_head head;
    head.msglen = msglen;
    head.msgid = msgid;

    memcpy(_write_buf,  &head, MESSAGE_HEAD_LEN);
    memcpy(_write_buf + MESSAGE_HEAD_LEN, data, msglen);

    int ret = sendto(_sockfd, _write_buf, msglen + MESSAGE_HEAD_LEN, 0, NULL, 0);
    if (ret == -1) {
        perror("sendto()..");
        return -1;
    }

    return ret;
}

​ 客户端和服务端代码除了构造函数不同,其他基本差不多。接下来我们可以测试一下udp的通信功能

12.3 完成Lars Reactor V0.10开发

服务端

server.cpp

#include <string>
#include <string.h>
#include "config_file.h"
#include "udp_server.h"

//回显业务的回调函数
void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
    printf("callback_busi ...\n");
    //直接回显
    conn->send_message(data, len, msgid);
}

int main() 
{
    event_loop loop;

    //加载配置文件
    config_file::setPath("./serv.conf");
    std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
    short port = config_file::instance()->GetNumber("reactor", "port", 8888);

    printf("ip = %s, port = %d\n", ip.c_str(), port);

    udp_server server(&loop, ip.c_str(), port);

    //注册消息业务路由
    server.add_msg_router(1, callback_busi);

    loop.event_process();

    return 0;
}

客户端

client.cpp

#include <stdio.h>
#include <string.h>

#include "udp_client.h"


//客户端业务
void busi(const char *data, uint32_t len, int msgid, net_connection  *conn, void *user_data)
{
    //得到服务端回执的数据 
    char *str = NULL;
    
    str = (char*)malloc(len+1);
    memset(str, 0, len+1);
    memcpy(str, data, len);
    printf("recv server: [%s]\n", str);
    printf("msgid: [%d]\n", msgid);
    printf("len: [%d]\n", len);
}


int main() 
{
    event_loop loop;

    //创建udp客户端
    udp_client client(&loop, "127.0.0.1", 7777);


    //注册消息路由业务
    client.add_msg_router(1, busi);

    //发消息
    int msgid = 1; 
    const char *msg = "Hello Lars!";

    client.send_message(msg, strlen(msg), msgid);

    //开启事件监听
    loop.event_process();

    return 0;
}

启动服务端和客户端并允许,结果如下:

server

$ ./server 
ip = 127.0.0.1, port = 7777
msg_router init...
server on 127.0.0.1:7777 is running...
add msg cb msgid = 1
call msgid = 1
call data = Hello Lars!
call msglen = 11
callback_busi ...
=======

client

$ ./client 
msg_router init...
add msg cb msgid = 1
call msgid = 1
call data = Hello Lars!
call msglen = 11
recv server: [Hello Lars!]
msgid: [1]
len: [11]
=======

关于作者:

作者:Aceld(刘丹冰)

mail: danbing.at@gmail.com
github: https://github.com/aceld
原创书籍gitbook: http://legacy.gitbook.com/@aceld

原创声明:未经作者允许请勿转载, 如果转载请注明出处

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,816评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,729评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,300评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,780评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,890评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,084评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,151评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,912评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,355评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,666评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,809评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,504评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,150评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,121评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,628评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,724评论 2 351

推荐阅读更多精彩内容