Thank Zhihao Tao for your hard work. The document spent countless nights and weekends, using his hard work to make it convenient for everyone.
If you have any questions, please send a email to zhihao.tao@outlook.com
1. 流的内存处理
在suricata中跟踪流就需要使用内存。流越多,所需的内存就越多。
因此我们要保持对内存使用的控制,有几个选项:
- 用于设置流引擎将使用的最大字节数的memcap选项
- 用于设置哈希表大小的哈希大小
- 用于以下内容的预分配:
- 对于还不属于流的数据包,Suricata创建了一个新的流。这是一个相对昂贵的行动。
- 由此带来的风险是,攻击者/黑客可以在此部分攻击引擎系统。
- 当它们确保一台计算机获得许多具有不同元组的数据包时,引擎必须生成许多新的流。
- 这样,攻击者就可以淹没系统。
- 为了减轻引擎过载,此选项指示Suricata在内存中保持多个流就绪。这样一来,Suricata就不那么容易受到此类攻击。
流引擎有一个独立于包处理的管理线程。这个线程称为流管理器。该线程确保尽可能在Memcap内。将准备10000个流。
flow:
memcap: 33554432 #The maximum amount of bytes the flow-engine will make use of.
hash_size: 65536 #Flows will be organized in a hash-table. With this option you can set the
#size of the hash-table.
Prealloc: 10000 #The amount of flows Suricata has to keep ready in memory.
emergency_recovery: 30 #Percentage of 1000 prealloc'd flows.
prune_flows: 5 #Amount of flows being terminated during the emergency mode.
1.1 memcap选项
memcap选项用于设置流引擎将使用的最大字节数。
- 默认memcap为32M,即33554432字节。
#define FLOW_DEFAULT_MEMCAP (32 * 1024 * 1024) /* 32 MB */
SC_ATOMIC_SET(flow_config.memcap, FLOW_DEFAULT_MEMCAP);
- 通过
FLOW_CHECK_MEMCAP
来检查内存分配的字节数是否超过了memcap。
/** \brief check if a memory alloc would fit in the memcap
*
* \param size memory allocation size to check
*
* \retval 1 it fits
* \retval 0 no fit
*/
#define FLOW_CHECK_MEMCAP(size) \
((((uint64_t)SC_ATOMIC_GET(flow_memuse) + (uint64_t)(size)) <= SC_ATOMIC_GET(flow_config.memcap)))
1.1.1 流的快速分配
- 当一条新流到达而spare队列中没有剩余的空闲流,进入快速分配流程。(紧急处理参见
emergency_recovery选项
)。
static Flow *FlowGetNew(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p)
{
...
f = FlowDequeue(&flow_spare_q);
if (f == NULL) {
/* If we reached the max memcap, we get a used flow */
if (!(FLOW_CHECK_MEMCAP(sizeof(Flow) + FlowStorageSize()))) {
- 直接分配一条新的流。
} else {
/* now see if we can alloc a new flow */
f = FlowAlloc();
1.2 hash_size选项
hash_size选项用于设置哈希表大小的哈希大小。
- hash种子是一个随机数。
- hash大小默认为65536。
#define FLOW_DEFAULT_HASHSIZE 65536
flow_config.hash_rand = (uint32_t)RandomGet();
flow_config.hash_size = FLOW_DEFAULT_HASHSIZE;
1.3 prealloc选项
prealloc选项用于设置内存中预分配流的数量。
#define FLOW_DEFAULT_PREALLOC 10000
flow_config.prealloc = FLOW_DEFAULT_PREALLOC;
1.3.1 预分配的初始化
/* pre allocate flows */
for (i = 0; i < flow_config.prealloc; i++) {
if (!(FLOW_CHECK_MEMCAP(sizeof(Flow) + FlowStorageSize()))) {
SCLogError(SC_ERR_FLOW_INIT, "preallocating flows failed: "
"max flow memcap reached. Memcap %"PRIu64", "
"Memuse %"PRIu64".", SC_ATOMIC_GET(flow_config.memcap),
((uint64_t)SC_ATOMIC_GET(flow_memuse) + (uint64_t)sizeof(Flow)));
exit(EXIT_FAILURE);
}
Flow *f = FlowAlloc();
if (f == NULL) {
SCLogError(SC_ERR_FLOW_INIT, "preallocating flow failed: %s", strerror(errno));
exit(EXIT_FAILURE);
}
FlowEnqueue(&flow_spare_q,f);
}
1.3.1 预分配的管理
流管理器会定时对于预分配的流的数量进行管理。
- 少则补之
int FlowUpdateSpareFlows(void)
{
SCEnter();
uint32_t toalloc = 0, tofree = 0, len;
FQLOCK_LOCK(&flow_spare_q);
len = flow_spare_q.len;
FQLOCK_UNLOCK(&flow_spare_q);
if (len < flow_config.prealloc) {
toalloc = flow_config.prealloc - len;
uint32_t i;
for (i = 0; i < toalloc; i++) {
Flow *f = FlowAlloc();
if (f == NULL)
return 0;
FlowEnqueue(&flow_spare_q,f);
}
- 多则删之
} else if (len > flow_config.prealloc) {
tofree = len - flow_config.prealloc;
uint32_t i;
for (i = 0; i < tofree; i++) {
/* FlowDequeue locks the queue */
Flow *f = FlowDequeue(&flow_spare_q);
if (f == NULL)
return 1;
FlowFree(f);
}
}
return 1;
}
1.4 emergency_recovery选项
emergency_recovery
选项使得流引擎进入紧急模式。在此模式下,引擎将利用较短的超时时间。其让流利用较短的超时时间,它使流以更积极的方式过期,因此将有更多空间容纳新的流。
- 紧急恢复。紧急恢复设置为30。这是预分配流的百分比,在此百分比之后,流引擎将恢复正常(当10000个流中的30%完成时)。
- 修剪流。如果在紧急模式中,过度超时没有所需的结果,则此选项是最终的解决方案。它结束了一些流,即使他们还没有达到他们的超时时间。
修剪流
选项显示每次设置新流时将终止的流的数量。
#define FLOW_DEFAULT_EMERGENCY_RECOVERY 30
flow_config.emergency_recovery = FLOW_DEFAULT_EMERGENCY_RECOVERY;
1.4.1 紧急模式进入
- 获取新的Flow
static Flow *FlowGetNew(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p)
{
...
f = FlowDequeue(&flow_spare_q);
if (f == NULL) {
/* If we reached the max memcap, we get a used flow */
- 如果达到MEMCAP后,进入紧急模式,超时时间改为紧急超时时间。
if (!(FLOW_CHECK_MEMCAP(sizeof(Flow) + FlowStorageSize()))) {
/* declare state of emergency */
if (!(SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)) {
SC_ATOMIC_OR(flow_flags, FLOW_EMERGENCY);
FlowTimeoutsEmergency();
/* under high load, waking up the flow mgr each time leads
* to high cpu usage. Flows are not timed out much faster if
* we check a 1000 times a second. */
FlowWakeupFlowManagerThread();
}
f = FlowGetUsedFlow(tv, dtv);
- 遍历哈希,直到可以释放流。
- 不要修剪包或流消息在使用的流。
- 输出日志。
-
flow_prune_idx
确保我们不会每次都从顶部开始,因为那样会清除散列的顶部,从而导致在高压下搜索时间越来越长。
static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv)
{
...
if (SC_ATOMIC_GET(f->use_cnt) > 0) {
FBLOCK_UNLOCK(fb);
FLOWLOCK_UNLOCK(f);
continue;
}
- 从hash中删除,设置FORCED和EMERGENCY标志。
/* remove from the hash */
f->flow_end_flags |= FLOW_END_FLAG_FORCED;
if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
f->flow_end_flags |= FLOW_END_FLAG_EMERGENCY;
- log记录,清除旧内存,初始为新状态,增加
flow_prune_idx
。
/* invoke flow log api */
if (dtv && dtv->output_flow_thread_data)
(void)OutputFlowLog(tv, dtv->output_flow_thread_data, f);
FlowClearMemory(f, f->protomap);
FlowUpdateState(f, FLOW_STATE_NEW);
FLOWLOCK_UNLOCK(f);
(void) SC_ATOMIC_ADD(flow_prune_idx, (flow_config.hash_size - cnt));
1.4.1 紧急模式退出
- 获取spare队列中的flow数
static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
{
...
uint32_t len = 0;
FQLOCK_LOCK(&flow_spare_q);
len = flow_spare_q.len;
FQLOCK_UNLOCK(&flow_spare_q);
StatsSetUI64(th_v, ftd->flow_mgr_spare, (uint64_t)len);
- 如果可用flow与预分配流的百分比大于
emergency_recovery
选项的配置。
if (len * 100 / flow_config.prealloc > flow_config.emergency_recovery) {
SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY);
- 恢复正常的超时时间,退出紧急状态。
FlowTimeoutsReset();
2. 流管理器
2.1 流状态
流可以处于不同的状态。Suricata区分TCP流状态和UDP流状态。
enum FlowState {
FLOW_STATE_NEW = 0,
FLOW_STATE_ESTABLISHED,
FLOW_STATE_CLOSED,
FLOW_STATE_LOCAL_BYPASSED,
#ifdef CAPTURE_OFFLOAD
FLOW_STATE_CAPTURE_BYPASSED,
#endif
};
2.1.1 TCP流状态
- New: 三次握手期间的时间段。
- Established: 三次握手期完成后的状态。
- Closed: 关闭状态。有几种方法可以结束流程。这是通过复位或四次FIN挥手进行的。
static void StreamTcpPacketSetState(Packet *p, TcpSession *ssn,
uint8_t state)
{
...
/* update the flow state */
switch(ssn->state) {
case TCP_ESTABLISHED:
case TCP_FIN_WAIT1:
case TCP_FIN_WAIT2:
case TCP_CLOSING:
case TCP_CLOSE_WAIT:
FlowUpdateState(p->flow, FLOW_STATE_ESTABLISHED);
break;
case TCP_LAST_ACK:
case TCP_TIME_WAIT:
case TCP_CLOSED:
FlowUpdateState(p->flow, FLOW_STATE_CLOSED);
break;
}
}
2.1.2 UDP流状态
- New: 流刚刚创建后的状态。
- Established: 数据包从两个方向发送接收。
void FlowHandlePacketUpdate(Flow *f, Packet *p)
{
...
if (SC_ATOMIC_GET(f->flow_state) == FLOW_STATE_ESTABLISHED) {
SCLogDebug("pkt %p FLOW_PKT_ESTABLISHED", p);
p->flowflags |= FLOW_PKT_ESTABLISHED;
} else if ((f->flags & (FLOW_TO_DST_SEEN|FLOW_TO_SRC_SEEN)) ==
(FLOW_TO_DST_SEEN|FLOW_TO_SRC_SEEN)) {
SCLogDebug("pkt %p FLOW_PKT_ESTABLISHED", p);
p->flowflags |= FLOW_PKT_ESTABLISHED;
if (f->proto != IPPROTO_TCP) {
FlowUpdateState(f, FLOW_STATE_ESTABLISHED);
}
}
- Local_bypassed: 仅从一个方向发送数据包的状态。
如果数据包到来的时间超过1/2超时时间,则降级到Local_bypassed状态。
void FlowHandlePacketUpdate(Flow *f, Packet *p)
{
#ifdef CAPTURE_OFFLOAD
int state = SC_ATOMIC_GET(f->flow_state);
if (state != FLOW_STATE_CAPTURE_BYPASSED) {
#endif
/* update the last seen timestamp of this flow */
COPY_TIMESTAMP(&p->ts, &f->lastts);
#ifdef CAPTURE_OFFLOAD
} else {
/* still seeing packet, we downgrade to local bypass */
if (p->ts.tv_sec - f->lastts.tv_sec > FLOW_BYPASSED_TIMEOUT / 2) {
SCLogDebug("Downgrading flow to local bypass");
COPY_TIMESTAMP(&p->ts, &f->lastts);
FlowUpdateState(f, FLOW_STATE_LOCAL_BYPASSED);
}
...
2.2 流超时
Suricata将流保持在内存中的时间由流超时时间
确定。
flow-timeouts:
default:
new: 30 #Time-out in seconds after the last activity in this flow in a New state.
established: 300 #Time-out in seconds after the last activity in this flow in a Established
#state.
emergency_new: 10 #Time-out in seconds after the last activity in this flow in a New state
#during the emergency mode.
emergency_established: 100 #Time-out in seconds after the last activity in this flow in a Established
#state in the emergency mode.
tcp:
new: 60
established: 3600
closed: 120
emergency_new: 10
emergency_established: 300
emergency_closed: 20
udp:
new: 30
established: 300
emergency_new: 10
emergency_established: 100
icmp:
new: 30
established: 300
emergency_new: 10
emergency_established: 100
2.2.1 超时处理
流的超时管理是在流管理器(FlowManager线程)实现的。
static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
...
FlowTimeoutHash(&ts, 0 /* check all */, ftd->min, ftd->max, &counters);
- 遍历整个流的hash桶,对桶中的所有流进行检查。
static uint32_t FlowTimeoutHash(struct timeval *ts,
...
if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
emergency = 1;
for (idx = hash_min; idx < hash_max; idx++) {
FlowBucket *fb = &flow_hash[idx];
counters->rows_checked++;
int32_t check_ts = SC_ATOMIC_GET(fb->next_ts);
if (check_ts > (int32_t)ts->tv_sec) {
counters->rows_skipped++;
continue;
}
- 获取Row Lock(桶的锁)之前,先要确保报文池中至少有
9
个包。
/* before grabbing the row lock, make sure we have at least
* 9 packets in the pool */
PacketPoolWaitForN(9);
if (FBLOCK_TRYLOCK(fb) != 0) {
counters->rows_busy++;
continue;
}
/* flow hash bucket is now locked */
if (fb->tail == NULL) {
SC_ATOMIC_SET(fb->next_ts, INT_MAX);
counters->rows_empty++;
goto next;
}
int32_t next_ts = 0;
/* we have a flow, or more than one */
cnt += FlowManagerHashRowTimeout(fb->tail, ts, emergency, counters, &next_ts);
- 对hash桶中的flow进行检查。
- 根据
lastts
和状态
检查流超时。 - 获取Flow Lock之前,先要确保报文池中至少有
3
个包。
- 根据
static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts,
int emergency, FlowTimeoutCounters *counters, int32_t *next_ts)
{
...
/* timeout logic goes here */
if (FlowManagerFlowTimeout(f, state, ts, next_ts) == 0) {
...
/* before grabbing the flow lock, make sure we have at least
* 3 packets in the pool */
PacketPoolWaitForN(3);
FLOWLOCK_WRLOCK(f);
- 检查流是否已完全超时,如果超时进行放弃,并把流放入
flow_recycle_q
队列中。
/* check if the flow is fully timed out and
* ready to be discarded. */
if (FlowManagerFlowTimedOut(f, ts, counters) == 1) {
...
f->flow_end_flags |= FLOW_END_FLAG_TIMEOUT;
...
FlowEnqueue(&flow_recycle_q, f);
- 检查
use_cnt
引用计数,进行流的强制重组。
static inline int FlowManagerFlowTimedOut(Flow *f, struct timeval *ts,
...
if (SC_ATOMIC_GET(f->use_cnt) > 0) {
return 0;
}
...
if (!(f->flags & FLOW_TIMEOUT_REASSEMBLY_DONE) &&
#ifdef CAPTURE_OFFLOAD
SC_ATOMIC_GET(f->flow_state) != FLOW_STATE_CAPTURE_BYPASSED &&
#endif
SC_ATOMIC_GET(f->flow_state) != FLOW_STATE_LOCAL_BYPASSED &&
FlowForceReassemblyNeedReassembly(f, &server, &client) == 1) {
FlowForceReassemblyForFlow(f, server, client);
return 0;
}
...
return 1;
}