如何写一个转发日志的fluentd插件?

如何写一个转发日志的fluentd插件?

上篇介绍了logging-operator依赖于自定义的fluentd插件,实现了根据指定的namespaceslabels转发日志,本篇将从以下几个方面介绍如何编写一个具有该功能集成的fluentd插件:

  • 确定要扩展的插件类型

  • 相关语法词法介绍

  • 学习如何编写一个fluentd插件

确定要扩展的插件类型

根据我们的需求, 需要按照namespaceslabels来完成日志的转发,这依赖于kubernetes元数据。kubernetes元数据的获取并不在fluentd阶段配置,而是在转发给fluentd之前,依赖于fluent-bit的配置。

https://docs.fluentbit.io/manual/pipeline/filters/kubernetes#workflow-of-tail-kubernetes-filter

$ kubectl get secrets defaultlogging-fluentbit  -o json | jq '.data."fluent-bit.conf"' | xargs echo | base64 --decode
[SERVICE]
    Flush        1
    Grace        5
    Daemon       Off
    Log_Level    info
    Parsers_File parsers.conf
    Coro_Stack_Size    24576
    storage.path  /buffers

[INPUT]
    Name         tail
    DB  /tail-db/tail-containers-state.db
    Mem_Buf_Limit  5MB
    Parser  docker
    Path  /var/log/containers/*.log
    Refresh_Interval  5
    Skip_Long_Lines  On
    Tag  kubernetes.*
[FILTER]
    Name        kubernetes
    Buffer_Size  0
    Kube_CA_File  /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
    Kube_Tag_Prefix  kubernetes.var.log.containers
    Kube_Token_File  /var/run/secrets/kubernetes.io/serviceaccount/token
    Kube_URL  https://kubernetes.default.svc:443
    Match  kubernetes.*
    Merge_Log  On

在确定好该配置后激活后,我们来到fluentd这一层,需要编写一个output插件来完成过滤、转发功能。

相关语法词法介绍

详细样例参考:https://docs.fluentd.org/plugin-development/api-plugin-output

上面链接中搬运过来就是这样的:

require 'fluent/plugin/output'

module Fluent::Plugin
  class SomeOutput < Output
    # First, register the plugin. 'NAME' is the name of this plugin
    # and identifies the plugin in the configuration file.
    Fluent::Plugin.register_output('NAME', self)

    # Enable threads if you are writing an async buffered plugin.
    helpers :thread

    # Define parameters for your plugin.
    config_param :path, :string

    #### Non-Buffered Output #############################
    # Implement `process()` if your plugin is non-buffered.
    # Read "Non-Buffered output" for details.
    ######################################################
    def process(tag, es)
      es.each do |time, record|
        # output events to ...
      end
    end

    #### Sync Buffered Output ##############################
    # Implement `write()` if your plugin uses normal buffer.
    # Read "Sync Buffered Output" for details.
    ########################################################
    def write(chunk)
      real_path = extract_placeholders(@path, chunk)

      log.debug 'writing data to file', chunk_id: dump_unique_id_hex(chunk.unique_id)

      # For standard chunk format (without `#format()` method)
      chunk.each do |time, record|
        # output events to ...
      end

      # For custom format (when `#format()` implemented)
      # File.open(real_path, 'w+')

      # or `#write_to(io)` is available
      # File.open(real_path, 'w+') do |file|
      #   chunk.write_to(file)
      # end
    end

    #### Async Buffered Output #############################
    # Implement `try_write()` if you want to defer committing
    # chunks. Read "Async Buffered Output" for details.
    ########################################################
    def try_write(chunk)
      real_path = extract_placeholders(@path, chunk)

      log.debug 'sending data to server', chunk_id: dump_unique_id_hex(chunk.unique_id)

      send_data_to_server(@host, real_path, chunk.read)

      chunk_id = chunk.unique_id

      # Create a thread for deferred commit.
      thread_create(:check_send_result) do
        while thread_current_running?
          sleep SENDDATA_CHECK_INTERVAL # == 5

          if check_data_on_server(real_path, chunk_id)
            # commit chunk
            # chunk will be deleted and not be retried anymore by this call
            commit_write(chunk_id)
            break
          end
        end
      end
    end

    # Override `#format` if you want to customize how Fluentd stores
    # events. Read the section "How to Customize the Serialization
    # Format for Chunks" for details.
    def format(tag, time, record)
      [tag, time, record].to_json
    end
  end
end

我将一个插件的编写规范整理为两类,一类是骨架定义,一类是子类逻辑实现:

  • 骨架定义部分包括requiremoduleclass definition
  • 子类逻辑实现又包括插件注册、参数定义、激活配置等前置逻辑和具体接口实现和内置方法调用的逻辑。
require

根据需要编写的插件类型导入依赖:

require 'fluent/plugin/output' # input, filter, output, parser, formatter, storage or buffer
subclass

所有的插件都是Fluent::Plugin::Base的子类。

class definition
module Fluent::Plugin
    class SomeOutput < Output
        ...
    end
end
register

注册插件的名称类别,需要根据这个来识别该插件,这里我们注册了一个名为NAME类别的output插件

Fluent::Plugin.register_output('NAME', self)
helpers

https://docs.fluentd.org/plugin-helper-overview

以下的语法激活了线程helper, 可以调用 thread_create(:check_send_result)thread_current_running?

# Load thread helper
helpers :thread
----
 thread_create(:check_send_result) do
   while thread_current_running?
     sleep SENDDATA_CHECK_INTERVAL # == 5
     if check_data_on_server(real_path, chunk_id)
        # commit chunk
        # chunk will be deleted and not be retried anymore by this call
        commit_write(chunk_id)
        break
     end
   end
 end
----
config_param && desc

config_param定义插件的参数, desc定义描述:

desc 'The port number'
# `config_param` Defines a parameter. You can refer the following parameter via @port instance variable.
# Without `:default`, a parameter is required.
config_param :port, :integer
config_section

定义一个可以嵌套的参数结构:

name: 名称.

options:

  • root: 是否激活为root配置区域,内部使用;
  • param_name: 子区域的名称;
  • final: 激活后子类无法修改, buffer配置区域就是通过这种方法实现。
  • init:激活后,必须要有初始默认值;
  • required: 激活后,整个配置区域会被设为必须配置项, 否则会报错;
  • multi: 激活后可以多次配置该配置区域;
  • alias: Alias for this section.

参考:

config_section :user, param_name: :users, multi: true, required: false do
  desc 'Username for authentication'
  config_param :username, :string
  desc 'Password for authentication'
  config_param :password, :string, secret: true
end
接口实现和内置方法调用

如果output没有使用buffer就需要实现process(tag, es)方法,反之,则需要实现write(同步)和try_write方法(异步)。

#### Non-Buffered Output #############################
# Implement `process()` if your plugin is non-buffered.
# Read "Non-Buffered output" for details.
######################################################
def process(tag, es)

#### Sync Buffered Output ##############################
# Implement `write()` if your plugin uses normal buffer.
# Read "Sync Buffered Output" for details.
########################################################
def write(chunk)

#### Async Buffered Output #############################
# Implement `try_write()` if you want to defer committing
# chunks. Read "Async Buffered Output" for details.
########################################################
def try_write(chunk)

# Override `#format` if you want to customize how Fluentd stores
# events. Read the section "How to Customize the Serialization
# Format for Chunks" for details.
def format(tag, time, record)

更多接口实现和内置方法可以访问上文提到的链接。

补充介绍下configure(conf)方法, confFluent::Config::Element的一个实例,实例变量和可访问的方法需要super调用之后才能可用。

def configure(conf)
  super

  # cache_default_value is created/configured by config_param
  @in_memory_cache = Hash.new(@cache_default_value)
end

学习如何编写一个fluentd插件

掌握相关语法后,我们试着分析下上篇文章提到的fluentd插件如何实现根据namespaceslabels转发日志的功能。

https://github.com/banzaicloud/fluent-plugin-label-router/blob/master/lib/fluent/plugin/out_label_router.rb#L22:11

require
require "fluent/plugin/output"
require 'prometheus/client'
class定义

按照官方的说法, 这里继承Output即可,如果不是做了巨大的改变,一般不推荐直接继承BareOutput

class LabelRouterOutput < BareOutput
register

注册了一个名为label_routertype

Fluent::Plugin.register_output("label_router", self)
helpers

激活event_emitterrecord_accessor两个helper api

helpers :event_emitter, :record_accessor
---
# event_emitter
# 1. emit event
router.emit(tag, time, record)
# 2. emit event stream
router.emit_stream(tag, es)
---
# record_accessor
# 1. Call `record_accessor_create` to create object
 @accessor = record_accessor_create('$.user.name')
# 2. Call `call` method to get value
value = @accessor.call(record) # With `$.user.name`, access to record["user"]["name"]
---
config_param

emit_mode: list类型,可选值为batch或者record;

sticky_tags: bool类型,默认为true, 相同的tag使用相同的方法;

default_routestring类型,默认为空,无法匹配时使用默认标签;

default_tagstring类型,默认为空, 无法匹配时使用默认tag

metrics: bool类型,默认为false,是否激活监控;

config_section

定义了两层嵌套配置区域。

image-20210703114719912

第一层,子嵌套配置区域名称为routes,可以配置多个routeroute详细参数如下:

@label: 类型为string,默认为nil,如果子区域的选择器命中匹配到,则会新建一个名为@label值的label给该record;

tag: 类型为string, 如果子区域匹配到,则会新建一个名为tag值的tag给给该record,前提是这个新tag不为空;

metrics_labels: 类型为string, 配置额外的metrics labels;

第二层子嵌套配置区域名称为matches,可以配置多个matchmatch详细参数如下:

labels : hash 类型, 例如app:nginx

namespaces: array类型,默认是[], 需要过滤的命名空间在这里定义;

hostsarray类型,默认是[], 需要过滤的hosts在这里定义;

container_names: array类型,默认是[], 需要过滤的container_names在这里定义;

negate: bool类型,用来标记为反选,默认为false;

接口实现和内置方法

首先,定义了一个Route类共给初始化配置时调用,具体的逻辑可以不用看,只需要注意它实现了两个方法,分别用于逐个处理和批处理,处理完毕后将计数器增加size个计数:

image-20210703134141141

下面直接看configure(conf)部分:

def configure(conf)
    super
    @registry = (::Prometheus::Client.registry if @metrics)
    @route_map = Hash.new { |h, k| h[k] = Set.new }
    @mutex = Mutex.new
    @routers = []
    @default_router = nil
    @routes.each do |rule|
       route_router = event_emitter_router(rule['@label'])
       @routers << Route.new(rule, route_router, @registry)
    end

    if @default_route != '' or @default_tag != ''
       default_rule = { 'matches' => nil, 'tag' => @default_tag, '@label' => @default_route}
       @default_router = Route.new(default_rule, event_emitter_router(@default_route), @registry)
    end

    @access_to_labels = record_accessor_create("$.kubernetes.labels")
    @access_to_namespace = record_accessor_create("$.kubernetes.namespace_name")
    @access_to_host = record_accessor_create("$.kubernetes.host")
    @access_to_container_name = record_accessor_create("$.kubernetes.container_name")

    @batch = @emit_mode == :batch
end

这里定义了一些初始化默认值和实例变量,需要注意的是routers这个数组的值,存放的是定义的Route实例, 其中, event_emitter_routerhelpers api导入的函数。

https://github.com/fluent/fluentd/blob/5844f7209fec154a4e6807eb1bee6989d3f3297f/lib/fluent/plugin_helper/event_emitter.rb#L71

 @routes.each do |rule|
    route_router = event_emitter_router(rule['@label'])
    @routers << Route.new(rule, route_router, @registry)
 end

参考上文,由于没有定义buffer组件,只需要实现process方法即可:

image-20210703133736264

上面这个函数基本上囊括了整个处理逻辑,无非是做一些匹配以及根据参数做一些控制流,来触发router实例中emitemit_es方法。

整个逻辑很简单的。如果开启了强制匹配tag的模式,会在route_map中寻找该tag,做一次快速处理,否则会拿着组装的input_metadata去做匹配,如果匹配到则触发上面的两个emit方法,没有一个批次全部没匹配到就会判断有没有默认router来触发,最后,会触发一次批量emit_es

至此,我们探讨了一下如果编写fluentd插件的流程,希望对你有所帮助!

PS: 码字不易,欢迎点赞收藏~

本文由博客一文多发平台 OpenWrite 发布!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容