apisix-1.5支持4层kafka日志采集

修订:
2022.12.01 初始版本
2022.12.13 增加了apisix/router.lua文件的修改方法

输出格式

[{
    "upstream_bytes_received": "132",
    "upstream_connect_time": "0.002",
    "protocol": "TCP",
    "service_id": "",
    "status": "200",
    "session_time": "0.007",
    "bytes_sent": "132",
    "client_ip": "127.0.0.1",
    "route_id": "790206901602246656",
    "upstream_addr": "10.209.151.28:8099",
    "upstream_bytes_sent": "79",
    "bytes_received": "79"
}]

代码修改

apisix/init.lua

原代码

function _M.stream_log_phase()
    core.log.info("enter stream_log_phase")
    -- core.ctx.release_vars(api_ctx)
    run_plugin("log")
end

修改后代码

function _M.stream_log_phase()
    local ngx_ctx = ngx.ctx
    local api_ctx = ngx_ctx.api_ctx
    core.ctx.set_vars_meta(api_ctx)
    if router.global_rules and router.global_rules.values
       and #router.global_rules.values > 0 then
        local plugins = core.tablepool.fetch("plugins", 32, 0)
        local values = router.global_rules.values
        for _, global_rule in config_util.iterate_values(values) do
            api_ctx.conf_type = "global_rule"
            api_ctx.conf_version = global_rule.modifiedIndex
            api_ctx.conf_id = global_rule.value.id
            api_ctx.plugins = plugin.stream_filter(global_rule, plugins)
            
            run_plugin("log", plugins, api_ctx)
        end

        core.tablepool.release("log", plugins)

        api_ctx.global_rules = router.global_rules
    end
    run_plugin("log")
end

apisix/router.lua

源代码

function _M.stream_init_worker()
    local router_stream = require("apisix.stream.router.ip_port")
    router_stream.stream_init_worker(filter)
    _M.router_stream = router_stream
end

修改后代码

function _M.stream_init_worker()
    local router_stream = require("apisix.stream.router.ip_port")
    router_stream.stream_init_worker(filter)
    _M.router_stream = router_stream
    local global_rules, err = core.config.new("/global_rules", {
            automatic = true,
            item_schema = core.schema.global_rule
        })
    if not global_rules then
        error("failed to create etcd instance for fetching /global_rules : "
              .. err)
    end
    _M.global_rules = global_rules
end

apisix/utils/log-util.lua

增加函数及导出

local function get_stream_full_log(ngx, conf)
    local ctx = ngx.ctx.api_ctx
    local var = ctx.var
    local service_id
    local route_id
  
    local matched_route = ctx.matched_route and ctx.matched_route.value

    if matched_route then
        service_id = matched_route.service_id or ""
        route_id = matched_route.id
    else
        service_id = var.host
    end

    local log =  {
        upstream_addr = var.upstream_addr,
        service_id = service_id,
        route_id = route_id,
        consumer = ctx.consumer,
        client_ip = var.remote_addr,
        session_time = var.session_time,
        protocol = var.protocol,
        status = var.status,
        bytes_sent = var.bytes_sent,
        bytes_received = var.bytes_received,
        upstream_bytes_sent = var.upstream_bytes_sent,
        upstream_bytes_received = var.upstream_bytes_received,
        upstream_connect_time = var.upstream_connect_time
      
    }

    return log
end


_M.get_stream_full_log = get_stream_full_log

apisix/stream/plugins/kafka-logger.lua(新增)

--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements.  See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License.  You may obtain a copy of the License at
--
--     http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core     = require("apisix.core")
local log_util = require("apisix.utils.log-util")
local producer = require ("resty.kafka.producer")
local batch_processor = require("apisix.utils.batch-processor")
local pairs    = pairs
local type     = type
local table    = table
local ipairs   = ipairs
local plugin_name = "kafka-logger"
local stale_timer_running = false;
local timer_at = ngx.timer.at
local tostring = tostring
local ngx = ngx
local buffers = {}

local schema = {
    type = "object",
    properties = {
        broker_list = {
            type = "object"
        },
        kafka_topic = {type = "string"},
        key = {type = "string"},
        timeout = {type = "integer", minimum = 1, default = 3},
        name = {type = "string", default = "kafka logger"},
        max_retry_count = {type = "integer", minimum = 0, default = 0},
        retry_delay = {type = "integer", minimum = 0, default = 1},
        buffer_duration = {type = "integer", minimum = 1, default = 60},
        inactive_timeout = {type = "integer", minimum = 1, default = 5},
        batch_max_size = {type = "integer", minimum = 1, default = 1000},
        include_req_body = {type = "boolean", default = false}
    },
    required = {"broker_list", "kafka_topic", "key"}
}

local _M = {
    version = 0.1,
    priority = 403,
    name = plugin_name,
    schema = schema,
}


function _M.check_schema(conf)
    return core.schema.check(schema, conf)
end


local function send_kafka_data(conf, log_message)
    if core.table.nkeys(conf.broker_list) == 0 then
        core.log.error("failed to identify the broker specified")
    end

    local broker_list = {}
    local broker_config = {}

    for host, port  in pairs(conf.broker_list) do
        if type(host) == 'string'
            and type(port) == 'number' then

            local broker = {
                host = host, port = port
            }
            table.insert(broker_list,broker)
        end
    end

    broker_config["request_timeout"] = conf.timeout * 1000

    local prod, err = producer:new(broker_list,broker_config)
    if err then
        return nil, "failed to identify the broker specified: " .. err
    end

    local ok, err = prod:send(conf.kafka_topic, conf.key, log_message)
    if not ok then
        return nil, "failed to send data to Kafka topic" .. err
    end
end

-- remove stale objects from the memory after timer expires
local function remove_stale_objects(premature)
    if premature then
        return
    end

    for key, batch in ipairs(buffers) do
        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
            core.log.debug("removing batch processor stale object, route id:", tostring(key))
            buffers[key] = nil
        end
    end

    stale_timer_running = false
end


function _M.log(conf)
    local entry = log_util.get_stream_full_log(ngx, conf)

    if not entry.route_id then
        core.log.error("failed to obtain the route id for kafka logger")
        return
    end

    local log_buffer = buffers[entry.route_id]

    if not stale_timer_running then
        -- run the timer every 30 mins if any log is present
        timer_at(1800, remove_stale_objects)
        stale_timer_running = true
    end

    if log_buffer then
        log_buffer:push(entry)
        return
    end

    -- Generate a function to be executed by the batch processor
    local func = function(entries, batch_max_size)
        local data, err
        if batch_max_size == 1 then
            data, err = core.json.encode(entries[1]) -- encode as single {}
        else
            data, err = core.json.encode(entries) -- encode as array [{}]
        end

        if not data then
            return false, 'error occurred while encoding the data: ' .. err
        end

        return send_kafka_data(conf, data)
    end

    local config = {
        name = conf.name,
        retry_delay = conf.retry_delay,
        batch_max_size = conf.batch_max_size,
        max_retry_count = conf.max_retry_count,
        buffer_duration = conf.buffer_duration,
        inactive_timeout = conf.inactive_timeout,
    }

    local err
    log_buffer, err = batch_processor:new(func, config)

    if not log_buffer then
        core.log.error("error when creating the batch processor: ", err)
        return
    end

    buffers[entry.route_id] = log_buffer
    log_buffer:push(entry)
end

return _M

配置文件修改

config.yaml

stream_plugins:
  - mqtt-proxy

修改为

stream_plugins:
  - mqtt-proxy
  - kafka-logger
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
禁止转载,如需转载请通过简信或评论联系作者。

相关阅读更多精彩内容

  • 2022.12.13 每次都这样 一定是自己出现了问题 一定是这样 多多自我反思 多多考虑自己哪里出现了问题 客户...
    心纯见真阅读 73评论 0 0
  • 大荔县心理咨询协会郭亚婵坚持分享的743天: 学习打卡第4天: 了解问题与受督者的互动,并对焦之 通常,督...
    快乐有我_c00f阅读 99评论 0 0
  • 今日请假在家,准备赛课,可也没闲着。一早醒来督促家长上传核酸检测阴性证明,回复各种各样的问题。总算孩子们都...
    11文青青阅读 80评论 0 0
  • 今天开始绿色行程码就失效了,可以自由地移动,仿佛可以在辽阔的非洲大草原奔跑。此时头脑是松些,身体还是紧的,才下眉头...
    自在的小孩阅读 396评论 0 0
  • 综合来看,11月的金融数据,并不乐观。或者说远没有达到央行希望的宽信用效果,甚至还有点背道而驰,或者说央行把事情搞...
    唐关阅读 98评论 0 0

友情链接更多精彩内容