起因
之前的博客写过通过inotify 加文件的形式来实现多进程队列的文章。这种方式在通常情况下表现不错,但是这里存在一个问题就是当消费者过慢,会产生大量的击穿内核高速缓冲区io,导致消费者卡在读取数据的瓶颈上,无法使用负载均衡等手段来提高处理能力。
为了解决上述问题,引入了共享内存,众所周知,这是所有ipc中最快的通信方式,从根本上解决这个问题。下面通过实现一个producer 和 consumer 程序,来展示我的设计思路。
producer
由于物理内存有限,生产者会使用一个环形缓冲区来保证热点数据始终在内存中。同时为了保证消费者的接入配置最小化,生产者将配置通过一个固定大小的结构体映射到内存中,消费者首先映射结构体读取配置信息,从结构体中的得知缓冲区大小后执行mremap进行重新调整大小,这样消费者只需要知道共享内存的地址(一个文件名),就可以实现消费。同时采用了消息计数,来标识消费者是否已经处理所有消息,触发等待。当消费者在等待新数据时,唤醒消费者我们选择了通过向指定文件写入一个字节的内容触发inotify,虽然通过信号量也可以实现,但是使用信号量会导致生产者要多开一个线程实现管理,引入额外的复杂度。
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/inotify.h>
#include <functional>
#include <unistd.h>
#include <cstring>
#include <string>
#include <sys/mman.h>
#include <sys/time.h>
#include <iostream>
#include <semaphore.h>
#include <gflags/gflags.h>
DEFINE_int64(shm_size, 6, "shm_size m");
DEFINE_string(inotify_file, "/tmp/writer.txt", "inotify file path");
DEFINE_string(shm_file, "/test", "shm file path");
DEFINE_string(shm_key, "", "shm key");
class Producer
{
public:
Producer(const std::string &inotify_path, const std::string &shm_path) : inotify_path_(inotify_path), shm_path_(shm_path)
{
shm_size_ = FLAGS_shm_size * 1024 * 1024; // 1g; // 1g
}
bool Init(const std::string &key)
{
fd_ = open(inotify_path_.c_str(), O_WRONLY | O_APPEND | O_CREAT | O_TRUNC, 0644);
if (fd_ < 0)
{
printf("open path failed\n");
return false;
}
// 打开共享内存
shm_fd_ = shm_open(shm_path_.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0777);
if (shm_fd_ < 0)
{
printf("shm_open failed\n");
return false;
}
uint64_t size = shm_size_ + sizeof(SHM_Data);
if (ftruncate(shm_fd_, size) == -1)
{
printf("ftruncate failed\n");
return false;
}
shm_data_ = (SHM_Data *)mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd_, 0);
if (shm_data_ == MAP_FAILED)
{
printf("mmap failed\n");
return false;
}
shm_data_->total = 0;
shm_data_->size = shm_size_;
memcpy(shm_data_->inotify_name, inotify_path_.c_str(), inotify_path_.size());
memcpy(shm_data_->key, key.c_str(), key.size());
return true;
}
void Write(const char *line)
{
for (int i = 0; line[i] != '\0'; i++)
{
if (current_offset_ >= shm_size_)
{
current_offset_ = 0;
}
shm_data_->buffer[current_offset_++] = line[i];
}
if (current_offset_ >= shm_size_)
{
current_offset_ = 0;
}
shm_data_->buffer[current_offset_++] = '\0';
shm_data_->total++;
write(fd_, "8", 1);
private:
struct SHM_Data
{
uint64_t total; // 记录消息总数
char inotify_name[512]; // inotify 文件名
char key[64]; // 当前数据标识
uint64_t size; // 环形缓冲区大小
char buffer[]; // 环形缓冲区
};
SHM_Data *shm_data_ = nullptr; // 共享内存
int fd_;
int shm_fd_;
uint64_t shm_size_ = 0;
uint64_t buffer_size_ = 0;
uint64_t total_read = 0;
uint64_t current_offset_ = 0;
std::string inotify_path_;
std::string shm_path_;
};
int main(int argc, char *argv[])
{
gflags::ParseCommandLineFlags(&argc, &argv, true);
Producer producer(FLAGS_inotify_file, FLAGS_shm_file);
Producer.Init(FLAGS_shm_key);
}
消费者
消费者实现就相对简单一些,读取配置结构体,执行mremap调整大小, 如果机器性能足够,可以选择不等待inotify,类似自旋锁的方式。这种方式测试发现新消息能在10us左右被消费者感知,使用inoitfy新消息感知需要40us左右。
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/inotify.h>
#include <functional>
#include <unistd.h>
#include <cstring>
#include <string>
#include <sys/mman.h>
#include <sys/time.h>
#include <semaphore.h>
#include <gflags/gflags.h>
DEFINE_string(shm_file, "/test", "shm file path");
DEFINE_bool(shm_nowait, false, "shm no wait mode");
#define EVENT_SIZE (sizeof(struct inotify_event))
#define BUF_LEN (10 * (EVENT_SIZE + FILENAME_MAX + 1))
class Consumer
{
public:
Consumer(int tag, const std::string &shm_path)
: tag_(tag), shm_path_(shm_path)
{
line_size_ = 1024;
inotify_buffer_ = new char[BUF_LEN];
line_ = new char[line_size_];
}
~Tail()
{
delete[] line_;
delete[] inotify_buffer_;
}
bool Init(const std::string& key)
{
shm_fd_ = shm_open(shm_path_.c_str(), O_RDWR, 0777);
if (shm_fd_ < 0)
{
printf("shm_open failed\n");
return false;
}
SHM_Data *shm_info_ = (SHM_Data *)mmap(NULL, sizeof(SHM_Data), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd_, 0);
if (shm_info_ == MAP_FAILED)
{
printf("mmap info failed\n");
return false;
}
printf("info size=%ld inotify_name=%s,key=%s\n", shm_info_->size, shm_info_->inotify_name, shm_info_->key);
if (strcasecmp(shm_info_->key, key.c_str()) != 0)
{
printf("key not match \n");
return false;
}
// 开始监听文件变化
inotify_fd_ = inotify_init();
if (inotify_fd_ < 0)
{
printf("inotify_init failed\n");
return false;
}
shm_size_ = shm_info_->size;
uint64_t real_size = shm_info_->size + sizeof(SHM_Data);
inotify_add_watch(inotify_fd_, shm_info_->inotify_name, IN_MODIFY | IN_CREATE | IN_DELETE);
shm_data_ = (SHM_Data *)mremap(shm_info_, sizeof(SHM_Data), real_size, MREMAP_MAYMOVE);
if (shm_data_ == MAP_FAILED)
{
printf("mmap data failed\n");
return false;
}
return true;
}
void Loop()
{
while (true)
{
while (total_read < shm_data_->total)
{
for (int i = 0; i < line_size_; i++)
{
if (current_offset_ >= shm_size_)
{
current_offset_ = 0;
}
line_[i] = shm_data_->buffer[current_offset_++];
if (line_[i] == '\0')
{
break;
}
}
total_read++;
printf("current_offset=%d, total=%d, read=%d, %s", current_offset_, shm_data_->total, total_read, line_);
}
if (!FLAGS_shm_nowait){
read(inotify_fd_, inotify_buffer_, BUF_LEN);
}
}
}
private:
struct SHM_Data
{
uint64_t total; // 记录消息总数
char inotify_name[512]; // inotify 文件名
char key[64]; // 当前数据标识
uint64_t size; // 环形缓冲区大小
char buffer[]; // 环形缓冲区
};
SHM_Data *shm_data_ = nullptr; // 共享对象指针
uint64_t shm_size_ = 0; // 共享内存大小
uint64_t line_size_ = 0; // 每条数据最大值
uint64_t total_read = 0; // 当前读取总记录数
uint64_t current_offset_ = 0; // 当前读取的偏移量
std::string shm_path_;
int inotify_fd_;
int shm_fd_;
int tag_;
char *line_;
char *inotify_buffer_;
};
DEFINE_string(shm_key, "", "shm key");
int main(int argc, char *argv[])
{
gflags::ParseCommandLineFlags(&argc, &argv, true);
Consumer consumer(2, FLAGS_shm_file);
if(!consumer.Init(FLAGS_shm_key)) {
return 1;
}
consumer.Loop();
}