Suricata-流的处理

Thank Zhihao Tao for your hard work. The document spent countless nights and weekends, using his hard work to make it convenient for everyone.
If you have any questions, please send a email to zhihao.tao@outlook.com


1. 流的内存处理

在suricata中跟踪流就需要使用内存。流越多,所需的内存就越多。
因此我们要保持对内存使用的控制,有几个选项:

  • 用于设置流引擎将使用的最大字节数的memcap选项
  • 用于设置哈希表大小的哈希大小
  • 用于以下内容的预分配:
    • 对于还不属于流的数据包,Suricata创建了一个新的流。这是一个相对昂贵的行动。
    • 由此带来的风险是,攻击者/黑客可以在此部分攻击引擎系统。
    • 当它们确保一台计算机获得许多具有不同元组的数据包时,引擎必须生成许多新的流。
    • 这样,攻击者就可以淹没系统。
    • 为了减轻引擎过载,此选项指示Suricata在内存中保持多个流就绪。这样一来,Suricata就不那么容易受到此类攻击。

流引擎有一个独立于包处理的管理线程。这个线程称为流管理器。该线程确保尽可能在Memcap内。将准备10000个流。

flow:
  memcap: 33554432              #The maximum amount of bytes the flow-engine will make use of.
  hash_size: 65536              #Flows will be organized in a hash-table. With this option you can set the
                                #size of the hash-table.
  Prealloc: 10000               #The amount of flows Suricata has to keep ready in memory.
  emergency_recovery: 30                  #Percentage of 1000 prealloc'd flows.
  prune_flows: 5                          #Amount of flows being terminated during the emergency mode.

1.1 memcap选项

memcap选项用于设置流引擎将使用的最大字节数。

  • 默认memcap为32M,即33554432字节。
#define FLOW_DEFAULT_MEMCAP      (32 * 1024 * 1024) /* 32 MB */
SC_ATOMIC_SET(flow_config.memcap, FLOW_DEFAULT_MEMCAP);
  • 通过FLOW_CHECK_MEMCAP来检查内存分配的字节数是否超过了memcap。
/** \brief check if a memory alloc would fit in the memcap
 *
 *  \param size memory allocation size to check
 *
 *  \retval 1 it fits
 *  \retval 0 no fit
 */
#define FLOW_CHECK_MEMCAP(size) \
    ((((uint64_t)SC_ATOMIC_GET(flow_memuse) + (uint64_t)(size)) <= SC_ATOMIC_GET(flow_config.memcap)))

1.1.1 流的快速分配

  1. 当一条新流到达而spare队列中没有剩余的空闲流,进入快速分配流程。(紧急处理参见emergency_recovery选项)。
static Flow *FlowGetNew(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p)
{
...
    f = FlowDequeue(&flow_spare_q);
    if (f == NULL) {
        /* If we reached the max memcap, we get a used flow */
        if (!(FLOW_CHECK_MEMCAP(sizeof(Flow) + FlowStorageSize()))) {
  1. 直接分配一条新的流。
        } else {
            /* now see if we can alloc a new flow */
            f = FlowAlloc();

1.2 hash_size选项

hash_size选项用于设置哈希表大小的哈希大小。

  • hash种子是一个随机数。
  • hash大小默认为65536。
#define FLOW_DEFAULT_HASHSIZE    65536
flow_config.hash_rand   = (uint32_t)RandomGet();
flow_config.hash_size   = FLOW_DEFAULT_HASHSIZE;

1.3 prealloc选项

prealloc选项用于设置内存中预分配流的数量。

#define FLOW_DEFAULT_PREALLOC    10000
flow_config.prealloc    = FLOW_DEFAULT_PREALLOC;

1.3.1 预分配的初始化

/* pre allocate flows */
for (i = 0; i < flow_config.prealloc; i++) {
    if (!(FLOW_CHECK_MEMCAP(sizeof(Flow) + FlowStorageSize()))) {
        SCLogError(SC_ERR_FLOW_INIT, "preallocating flows failed: "
                "max flow memcap reached. Memcap %"PRIu64", "
                "Memuse %"PRIu64".", SC_ATOMIC_GET(flow_config.memcap),
                ((uint64_t)SC_ATOMIC_GET(flow_memuse) + (uint64_t)sizeof(Flow)));
        exit(EXIT_FAILURE);
    }

    Flow *f = FlowAlloc();
    if (f == NULL) {
        SCLogError(SC_ERR_FLOW_INIT, "preallocating flow failed: %s", strerror(errno));
        exit(EXIT_FAILURE);
    }

    FlowEnqueue(&flow_spare_q,f);
}

1.3.1 预分配的管理

流管理器会定时对于预分配的流的数量进行管理。

  • 少则补之
int FlowUpdateSpareFlows(void)
{
    SCEnter();
    uint32_t toalloc = 0, tofree = 0, len;

    FQLOCK_LOCK(&flow_spare_q);
    len = flow_spare_q.len;
    FQLOCK_UNLOCK(&flow_spare_q);

    if (len < flow_config.prealloc) {
        toalloc = flow_config.prealloc - len;

        uint32_t i;
        for (i = 0; i < toalloc; i++) {
            Flow *f = FlowAlloc();
            if (f == NULL)
                return 0;

            FlowEnqueue(&flow_spare_q,f);
        }
  • 多则删之
    } else if (len > flow_config.prealloc) {
        tofree = len - flow_config.prealloc;

        uint32_t i;
        for (i = 0; i < tofree; i++) {
            /* FlowDequeue locks the queue */
            Flow *f = FlowDequeue(&flow_spare_q);
            if (f == NULL)
                return 1;

            FlowFree(f);
        }
    }

    return 1;
}

1.4 emergency_recovery选项

emergency_recovery选项使得流引擎进入紧急模式。在此模式下,引擎将利用较短的超时时间。其让流利用较短的超时时间,它使流以更积极的方式过期,因此将有更多空间容纳新的流。

  • 紧急恢复。紧急恢复设置为30。这是预分配流的百分比,在此百分比之后,流引擎将恢复正常(当10000个流中的30%完成时)。
  • 修剪流。如果在紧急模式中,过度超时没有所需的结果,则此选项是最终的解决方案。它结束了一些流,即使他们还没有达到他们的超时时间。修剪流选项显示每次设置新流时将终止的流的数量。
#define FLOW_DEFAULT_EMERGENCY_RECOVERY 30
flow_config.emergency_recovery = FLOW_DEFAULT_EMERGENCY_RECOVERY;

1.4.1 紧急模式进入

  1. 获取新的Flow
static Flow *FlowGetNew(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p)
{
...
    f = FlowDequeue(&flow_spare_q);
    if (f == NULL) {
        /* If we reached the max memcap, we get a used flow */
  1. 如果达到MEMCAP后,进入紧急模式,超时时间改为紧急超时时间。
        if (!(FLOW_CHECK_MEMCAP(sizeof(Flow) + FlowStorageSize()))) {
            /* declare state of emergency */
            if (!(SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)) {
                SC_ATOMIC_OR(flow_flags, FLOW_EMERGENCY);

                FlowTimeoutsEmergency();

                /* under high load, waking up the flow mgr each time leads
                 * to high cpu usage. Flows are not timed out much faster if
                 * we check a 1000 times a second. */
                FlowWakeupFlowManagerThread();
            }

            f = FlowGetUsedFlow(tv, dtv);
  1. 遍历哈希,直到可以释放流。
    • 不要修剪包或流消息在使用的流。
    • 输出日志。
    • flow_prune_idx确保我们不会每次都从顶部开始,因为那样会清除散列的顶部,从而导致在高压下搜索时间越来越长。
static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv)
{
...
        if (SC_ATOMIC_GET(f->use_cnt) > 0) {
            FBLOCK_UNLOCK(fb);
            FLOWLOCK_UNLOCK(f);
            continue;
        }
  1. 从hash中删除,设置FORCED和EMERGENCY标志。
        /* remove from the hash */

        f->flow_end_flags |= FLOW_END_FLAG_FORCED;

        if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
            f->flow_end_flags |= FLOW_END_FLAG_EMERGENCY;
  1. log记录,清除旧内存,初始为新状态,增加flow_prune_idx
        /* invoke flow log api */
        if (dtv && dtv->output_flow_thread_data)
            (void)OutputFlowLog(tv, dtv->output_flow_thread_data, f);

        FlowClearMemory(f, f->protomap);

        FlowUpdateState(f, FLOW_STATE_NEW);

        FLOWLOCK_UNLOCK(f);

        (void) SC_ATOMIC_ADD(flow_prune_idx, (flow_config.hash_size - cnt));

1.4.1 紧急模式退出

  1. 获取spare队列中的flow数
static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
{
...
        uint32_t len = 0;
        FQLOCK_LOCK(&flow_spare_q);
        len = flow_spare_q.len;
        FQLOCK_UNLOCK(&flow_spare_q);
        StatsSetUI64(th_v, ftd->flow_mgr_spare, (uint64_t)len);

  1. 如果可用flow与预分配流的百分比大于emergency_recovery选项的配置。
            if (len * 100 / flow_config.prealloc > flow_config.emergency_recovery) {
                SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY);
  1. 恢复正常的超时时间,退出紧急状态。
                FlowTimeoutsReset();

2. 流管理器

2.1 流状态

流可以处于不同的状态。Suricata区分TCP流状态和UDP流状态。

enum FlowState {
    FLOW_STATE_NEW = 0,
    FLOW_STATE_ESTABLISHED,
    FLOW_STATE_CLOSED,
    FLOW_STATE_LOCAL_BYPASSED,
#ifdef CAPTURE_OFFLOAD
    FLOW_STATE_CAPTURE_BYPASSED,
#endif
};

2.1.1 TCP流状态

  • New: 三次握手期间的时间段。
  • Established: 三次握手期完成后的状态。
  • Closed: 关闭状态。有几种方法可以结束流程。这是通过复位或四次FIN挥手进行的。
static void StreamTcpPacketSetState(Packet *p, TcpSession *ssn,
                                           uint8_t state)
{
...
    /* update the flow state */
    switch(ssn->state) {
        case TCP_ESTABLISHED:
        case TCP_FIN_WAIT1:
        case TCP_FIN_WAIT2:
        case TCP_CLOSING:
        case TCP_CLOSE_WAIT:
            FlowUpdateState(p->flow, FLOW_STATE_ESTABLISHED);
            break;
        case TCP_LAST_ACK:
        case TCP_TIME_WAIT:
        case TCP_CLOSED:
            FlowUpdateState(p->flow, FLOW_STATE_CLOSED);
            break;
    }
}

2.1.2 UDP流状态

  • New: 流刚刚创建后的状态。
  • Established: 数据包从两个方向发送接收。
void FlowHandlePacketUpdate(Flow *f, Packet *p)
{
...
    if (SC_ATOMIC_GET(f->flow_state) == FLOW_STATE_ESTABLISHED) {
        SCLogDebug("pkt %p FLOW_PKT_ESTABLISHED", p);
        p->flowflags |= FLOW_PKT_ESTABLISHED;

    } else if ((f->flags & (FLOW_TO_DST_SEEN|FLOW_TO_SRC_SEEN)) ==
            (FLOW_TO_DST_SEEN|FLOW_TO_SRC_SEEN)) {
        SCLogDebug("pkt %p FLOW_PKT_ESTABLISHED", p);
        p->flowflags |= FLOW_PKT_ESTABLISHED;

        if (f->proto != IPPROTO_TCP) {
            FlowUpdateState(f, FLOW_STATE_ESTABLISHED);
        }
    }

  • Local_bypassed: 仅从一个方向发送数据包的状态。
    如果数据包到来的时间超过1/2超时时间,则降级到Local_bypassed状态。
void FlowHandlePacketUpdate(Flow *f, Packet *p)
{
#ifdef CAPTURE_OFFLOAD
    int state = SC_ATOMIC_GET(f->flow_state);

    if (state != FLOW_STATE_CAPTURE_BYPASSED) {
#endif
        /* update the last seen timestamp of this flow */
        COPY_TIMESTAMP(&p->ts, &f->lastts);
#ifdef CAPTURE_OFFLOAD
    } else {
        /* still seeing packet, we downgrade to local bypass */
        if (p->ts.tv_sec - f->lastts.tv_sec > FLOW_BYPASSED_TIMEOUT / 2) {
            SCLogDebug("Downgrading flow to local bypass");
            COPY_TIMESTAMP(&p->ts, &f->lastts);
            FlowUpdateState(f, FLOW_STATE_LOCAL_BYPASSED);
        }
...

2.2 流超时

Suricata将流保持在内存中的时间由流超时时间确定。

flow-timeouts:

  default:
    new: 30                     #Time-out in seconds after the last activity in this flow in a New state.
    established: 300            #Time-out in seconds after the last activity in this flow in a Established
                                #state.
    emergency_new: 10           #Time-out in seconds after the last activity in this flow in a New state
                                #during the emergency mode.
    emergency_established: 100  #Time-out in seconds after the last activity in this flow in a Established
                                #state in the emergency mode.
  tcp:
    new: 60
    established: 3600
    closed: 120
    emergency_new: 10
    emergency_established: 300
    emergency_closed: 20
  udp:
    new: 30
    established: 300
    emergency_new: 10
    emergency_established: 100
  icmp:
    new: 30
    established: 300
    emergency_new: 10
    emergency_established: 100

2.2.1 超时处理

流的超时管理是在流管理器(FlowManager线程)实现的。

static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
...
        FlowTimeoutHash(&ts, 0 /* check all */, ftd->min, ftd->max, &counters);
  1. 遍历整个流的hash桶,对桶中的所有流进行检查。
static uint32_t FlowTimeoutHash(struct timeval *ts, 
...
if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
        emergency = 1;

    for (idx = hash_min; idx < hash_max; idx++) {
        FlowBucket *fb = &flow_hash[idx];

        counters->rows_checked++;

        int32_t check_ts = SC_ATOMIC_GET(fb->next_ts);
        if (check_ts > (int32_t)ts->tv_sec) {
            counters->rows_skipped++;
            continue;
        }
  1. 获取Row Lock(桶的锁)之前,先要确保报文池中至少有9个包。
        /* before grabbing the row lock, make sure we have at least
         * 9 packets in the pool */
        PacketPoolWaitForN(9);

        if (FBLOCK_TRYLOCK(fb) != 0) {
            counters->rows_busy++;
            continue;
        }

        /* flow hash bucket is now locked */

        if (fb->tail == NULL) {
            SC_ATOMIC_SET(fb->next_ts, INT_MAX);
            counters->rows_empty++;
            goto next;
        }

        int32_t next_ts = 0;
        /* we have a flow, or more than one */
        cnt += FlowManagerHashRowTimeout(fb->tail, ts, emergency, counters, &next_ts);
  1. 对hash桶中的flow进行检查。
    • 根据lastts状态检查流超时。
    • 获取Flow Lock之前,先要确保报文池中至少有3个包。
static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts,
        int emergency, FlowTimeoutCounters *counters, int32_t *next_ts)
{
...
        /* timeout logic goes here */
        if (FlowManagerFlowTimeout(f, state, ts, next_ts) == 0) {
...
        /* before grabbing the flow lock, make sure we have at least
         * 3 packets in the pool */
        PacketPoolWaitForN(3);

        FLOWLOCK_WRLOCK(f);
  1. 检查流是否已完全超时,如果超时进行放弃,并把流放入flow_recycle_q队列中。
        /* check if the flow is fully timed out and
         * ready to be discarded. */
        if (FlowManagerFlowTimedOut(f, ts, counters) == 1) {
...
            f->flow_end_flags |= FLOW_END_FLAG_TIMEOUT;
...
            FlowEnqueue(&flow_recycle_q, f);

  1. 检查use_cnt引用计数,进行流的强制重组。
static inline int FlowManagerFlowTimedOut(Flow *f, struct timeval *ts, 
...
    if (SC_ATOMIC_GET(f->use_cnt) > 0) {
        return 0;
    }
...
    if (!(f->flags & FLOW_TIMEOUT_REASSEMBLY_DONE) &&
#ifdef CAPTURE_OFFLOAD
            SC_ATOMIC_GET(f->flow_state) != FLOW_STATE_CAPTURE_BYPASSED &&
#endif
            SC_ATOMIC_GET(f->flow_state) != FLOW_STATE_LOCAL_BYPASSED &&
            FlowForceReassemblyNeedReassembly(f, &server, &client) == 1) {
        FlowForceReassemblyForFlow(f, server, client);
        return 0;
    }
...
    return 1;
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,919评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,567评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,316评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,294评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,318评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,245评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,120评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,964评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,376评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,592评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,764评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,460评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,070评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,697评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,846评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,819评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,665评论 2 354