logstash input插件开发

logstash input插件开发

logstash作为一个数据管道中间件,支持对各种类型数据的采集与转换,并将数据发送到各种类型的存储库,比如实现消费kafka数据并且写入到Elasticsearch, 日志文件同步到对象存储S3等,mysql数据同步到Elasticsearch等。

logstash内部主要包含三个模块:

input: 从数据源获取数据
filter: 过滤、转换数据
output: 输出数据
image

不同类型的数据都可以通过对应的input-plugin, output-plugin完成数据的输入与输出。如需要消费kafka中的数据并写入到Elasticsearch中,则需要使用logstash的kafka-input-plugin完成数据输入,logstash-output-elasticsearch完成数据输出。如果需要对输入数据进行过滤或者转换,比如根据关键词过滤掉不需要的内容,或者时间字段的格式转换,就需要又filter-plugin完成了。

logstash的input插件目前已经有几十种了,支持大多数比较通用或开源的数据源的输入。但如果公司内部开发的数据库或其它存储类的服务不能和开源产品在接口协议上兼容,比如腾讯自研的消息队列服务CMQ不依赖于其它的开源消息队列产品,所以不能直接使用logstash的logstash-input-kafka或logstash-input-rabbitmq同步CMQ中的数据;腾讯云对象存储服务COS, 在鉴权方式上和AWS的S3存在差异,也不能直接使用logstash-input-s3插件从COS中读取数据,对于这种情况,就需要自己开发logstash的input插件了。

本文以开发logstash的cos input插件为例,介绍如何开发logstash的input插件。

logstash官方提供了有个简单的input plugin example可供参考:
https://github.com/logstash-plugins/logstash-input-example/

环境准备

logstash使用jruby开发,首先要配置jruby环境:

  1. 安装rvm:

    rvm是一个ruby管理器,可以安装并管理ruby环境,也可以通过命令行切换到不同的ruby版本。

    gpg --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3 7D2BAF1CF37B13E2069D6956105BD0E739499BDB
    
    \curl -sSL https://get.rvm.io | bash -s stable
    
    source /etc/profile.d/rvm.sh
    
  2. 安装jruby

    rvm install jruby
    
    rvm use jruby
    
  3. 安装包管理工具bundle和测试工具rspec

    gem install bundle
    gem install rspec
    

从example开始

  1. clone logstash-input-example

    git clone https://github.com/logstash-plugins/logstash-input-example.git
    
  2. 将clone出来的logstash-input-example源码copy到logstash-input-cos目录,并删除.git文件夹,目的是以logstash-input-example的源码为参考进行开发,同时把需要改动名称的地方修改一下:

     mv logstash-input-example.gemspec logstash-input-cos.gemspec
     mv lib/logstash/inputs/example.rb lib/logstash/inputs/cos.rb
     mv spec/inputs/example_spec.rb spec/inputs/cos_spec.rb
    
  3. 建立的源码目录结构如图所示:

image

其中,重要文件的作用说明如下:

  • cos.rb: 主文件,在该文件中编写logstash配置文件的读写与源数据获取的代码,需要继承LogStash::Inputs::Base基类
  • cos_spec.rb: 单元测试文件,通过rspec可以对cos.rb中的代码进行测试
  • logstash-input-cos.gemspec: 类似于maven中的pom.xml文件,配置工程的版本、名称、licene,包依赖等,通过bundle命令可以下载依赖包

配置并下载依赖

因为腾讯云COS服务没有ruby sdk, 因为只能依赖其Java sdk进行开发,首先添加对cos java sdk的依赖。在logstash-input-cos.gemspec中Gem dependencies配置栏中增加以下内容:

# Gem dependencies
  s.requirements << "jar 'com.qcloud:cos_api', '5.4.4'"
  s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
  s.add_runtime_dependency 'logstash-codec-plain'
  s.add_runtime_dependency 'stud', '>= 0.0.22'
  s.add_runtime_dependency 'jar-dependencies'
  s.add_development_dependency 'logstash-devutils', '1.3.6'

相比logstash-input-example.gemspec,增加了对com.qcloud:cos_api包以及jar-dependencies包的依赖,jar-dependencies用于在ruby环境中管理jar包,并且可以跟踪jar包的加载状态。

然后,在logstash-input-cos.gemspec中增加配置:

s.platform = 'java'

这样可以成功下载java依赖包,并且可以在ruby代码中直接调用java代码。

最后,执行以下命令下载依赖:

bundle install

编写代码

logstash-input-cos的代码逻辑其实比较简单,主要是通过执行定时任务,调用cos java sdk中的listObjects方法,获取到指定bucket里的数据,并在每次定时任务执行结束后设置marker保存在本地,再次执行时从marker位置获取数据,以实现数据的增量同步。

jar包的引用

因为要调用cos java sdk中的代码,先引用该jar包:

require 'cos_api-5.4.4.jar'
java_import com.qcloud.cos.COSClient;
java_import com.qcloud.cos.ClientConfig;
java_import com.qcloud.cos.auth.BasicCOSCredentials;
java_import com.qcloud.cos.auth.COSCredentials;
java_import com.qcloud.cos.exception.CosClientException;
java_import com.qcloud.cos.exception.CosServiceException;
java_import com.qcloud.cos.model.COSObjectSummary;
java_import com.qcloud.cos.model.ListObjectsRequest;
java_import com.qcloud.cos.model.ObjectListing;
java_import com.qcloud.cos.region.Region;

读取配置文件

logstash配置文件读取的代码如图所示:


image

config_name为cos,其它的配置项读取代码按照ruby的代码规范编写,添加类型校验与默认值,就可以从以下配置文件中读取配置项:

input {
    cos {
        "endpoint" => "cos.ap-guangzhou.myqcloud.com"
        "access_key_id" => "*****"
        "access_key_secret" => "****"
        "bucket" => "******"
        "region" => "ap-guangzhou"
        "appId" => "**********"
        "interval" => 60
    }
}

output {
    stdout {
        codec=>rubydebug
    }
}

实现register方法

logstash input插件必须实现另个方法:register 和run

register方法类似于初始化方法,在该方法中可以直接使用从配置文件读取并赋值的变量,完成cos client的初始化,代码如下:

    # 1 初始化用户身份信息(appid, secretId, secretKey)
    cred = com.qcloud.cos.auth.BasicCOSCredentials.new(@access_key_id, @access_key_secret)
    # 2 设置bucket的区域, COS地域的简称请参照 https://www.qcloud.com/document/product/436/6224
    clientConfig = com.qcloud.cos.ClientConfig.new(com.qcloud.cos.region.Region.new(@region))
    # 3 生成cos客户端
    @cosclient = com.qcloud.cos.COSClient.new(cred, clientConfig)
    # bucket名称, 需包含appid
    bucketName = @bucket + "-"+ @appId
    @bucketName = bucketName

    @listObjectsRequest = com.qcloud.cos.model.ListObjectsRequest.new()
    # 设置bucket名称
    @listObjectsRequest.setBucketName(bucketName)
    # prefix表示列出的object的key以prefix开始
    @listObjectsRequest.setPrefix(@prefix)
    # 设置最大遍历出多少个对象, 一次listobject最大支持1000
    @listObjectsRequest.setMaxKeys(1000)
    @listObjectsRequest.setMarker(@markerConfig.getMarker)

示例代码中设置了@cosclient和@listObjectRequest为全局变量, 因为在run方法中会用到这两个变量。

注意在ruby中调用java代码的方式:没有变量描述符;不能直接new Object(),而只能Object.new().

实现run方法

run方法获取数据并将数据流转换成event事件

最简单的run方法为:

def run(queue)
    Stud.interval(@interval) do
      event = LogStash::Event.new("message" => @message, "host" => @host)
      decorate(event)
      queue << event
    end # loop
  end # def run

代码说明:

  • 通过Stud ruby模块执行定时任务,interval可自定义,从配置文件中读取
  • 生成event, 示例代码生成了一个包含两个字段数据的event
  • 调用decorate()方法, 给该event打上tag,如果配置的话
  • queue<<event, 将event插入到数据管道中,发送给filter处理

logstash-input-cos的run方法实现为:

def run(queue)
    @current_thread = Thread.current
    Stud.interval(@interval) do
      process(queue)
    end
end
 
def process(queue)
    @logger.info('Marker from: ' + @markerConfig.getMarker)
    
    objectListing = @cosclient.listObjects(@listObjectsRequest)
    nextMarker = objectListing.getNextMarker()
    cosObjectSummaries = objectListing.getObjectSummaries()
    cosObjectSummaries.each do |obj|
       # 文件的路径key
       key = obj.getKey()
    
       if stop?
         @logger.info("stop while attempting to read log file")
         break
       end
       # 根据key获取内容
       getObject(key) { |log|
         # 发送消息
         @codec.decode(log) do |event|
           decorate(event)
           queue << event
         end
       }

       #记录 marker
       @markerConfig.setMarker(key)
       @logger.info('Marker end: ' + @markerConfig.getMarker)
    end
  end


  # 获取下载输入流
 def getObject(key, &block)
    getObjectRequest = com.qcloud.cos.model.GetObjectRequest.new(@bucketName, key)
    cosObject = @cosclient.getObject(getObjectRequest)
    cosObjectInput = cosObject.getObjectContent()
    buffered =BufferedReader.new(InputStreamReader.new(cosObjectInput))
    while (line = buffered.readLine())
      block.call(line)
    end
  end

测试代码

在spec/inputs/cos_spec.rb中增加如下测试代码:

# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/cos"

describe LogStash::Inputs::Cos do

  it_behaves_like "an interruptible input plugin" do
    let(:config) { {
        "endpoint" => 'cos.ap-guangzhou.myqcloud.com',
        "access_key_id" => '*',
        "access_key_secret" => '*',
        "bucket" => '*',
         "region" => 'ap-guangzhou',
         "appId" => '*',
        "interval" => 60 } }
  end
end

rspec是一个ruby测试库,通过bundle命令执行rspec:

bundle exec rspec

如果cos.rb中的代码没有语法或运行时错误,则会出现如果信息表明测试成功:

Finished in 0.8022 seconds (files took 3.45 seconds to load)
1 example, 0 failures

构建并测试input-plugin-cos

build

使用gem对input-plugin-cos插件源码进行build:

gem build logstash-input-cos.gemspec

构建完成后会生成一个名为logstash-input-cos-0.0.1-java.gem的文件

test

在logstash的解压目录下,执行一下命令安装logstash-input-cos plugin:

./bin/logstash-plugin install /usr/local/githome/logstash-input-cos/logstash-input-cos-0.0.1-java.gem

执行结果为:

Validating /usr/local/githome/logstash-input-cos/logstash-input-cos-0.0.1-java.gem
Installing logstash-input-cos
Installation successful

另外,可以通过./bin/logstash-plugin list命令查看logstash已经安装的所有input/output/filter/codec插件。

生成配置文件cos.logstash.conf,内容为:

input {
    cos {
        "endpoint" => "cos.ap-guangzhou.myqcloud.com"
        "access_key_id" => "*****"
        "access_key_secret" => "****"
        "bucket" => "******"
        "region" => "ap-guangzhou"
        "appId" => "**********"
        "interval" => 60
    }
}

output {
    stdout {
        codec=>rubydebug
    }
}

该配置文件使用腾讯云官网账号的secret_id和secret_key进行权限验证,拉取指定bucket里的数据,为了测试,将output设置为标准输出。

执行logstash:

./bin/logstash -f cos.logstash.conf

输出结果为:

Sending Logstash's logs to /root/logstash-5.6.4/logs which is now configured via log4j2.properties
[2018-07-30T19:26:17,039][WARN ][logstash.runner          ] --config.debug was specified, but log.level was not set to 'debug'! No config info will be logged.
[2018-07-30T19:26:17,048][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/root/logstash-5.6.4/modules/netflow/configuration"}
[2018-07-30T19:26:17,049][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/root/logstash-5.6.4/modules/fb_apache/configuration"}
[2018-07-30T19:26:17,252][INFO ][logstash.inputs.cos      ] Using version 0.1.x input plugin 'cos'. This plugin isn't well supported by the community and likely has no maintainer.
[2018-07-30T19:26:17,341][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2018-07-30T19:26:17,362][INFO ][logstash.inputs.cos      ] Registering cos input {:bucket=>"bellengao", :region=>"ap-guangzhou"}
[2018-07-30T19:26:17,528][INFO ][logstash.pipeline        ] Pipeline main started
[2018-07-30T19:26:17,530][INFO ][logstash.inputs.cos      ] Marker from:
log4j:WARN No appenders could be found for logger (org.apache.http.client.protocol.RequestAddCookies).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[2018-07-30T19:26:17,574][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2018-07-30T19:26:17,714][INFO ][logstash.inputs.cos      ] Marker end: access.log
{
       "message" => "77.179.66.156 - - [25/Oct/2016:14:49:33 +0200] \"GET / HTTP/1.1\" 200 612 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"",
      "@version" => "1",
    "@timestamp" => 2018-07-30T11:26:17.710Z
}
{
       "message" => "77.179.66.156 - - [25/Oct/2016:14:49:34 +0200] \"GET /favicon.ico HTTP/1.1\" 404 571 \"http://localhost:8080/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"",
      "@version" => "1",
    "@timestamp" => 2018-07-30T11:26:17.711Z
}

在cos中的bucket里上传了名为access.log的nginx日志,上述输出结果中最后打印出来的每个json结构体构成一个event, 其中message消息即为access.log中每一条日志。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,837评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,551评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,417评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,448评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,524评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,554评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,569评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,316评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,766评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,077评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,240评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,912评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,560评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,176评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,425评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,114评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,114评论 2 352

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,647评论 18 139
  • Logstash是一个具有实时管线能力的开源数据收集引擎。在ELK Stack中,通常选择更轻量级的Filebea...
    steanxy阅读 3,522评论 1 6
  • 概述 监控预警平台, eagle + eye (鹰眼)的合体词, 寓意可以快速发现问题, 并及时作出响应,Eagl...
    Kungfu猫熊阅读 7,381评论 0 52
  • 0.摘要 这是开智学堂「信息分析」课程第1周基础任务,包含分析背景、思路与分析步骤、主要结论、进一步讨论等内容,完...
    空灵一月阅读 353评论 0 1
  • 《吕氏春秋》里有一段,讲孔子周游列国,曾因兵荒马乱,旅途困顿,三餐以野菜果腹,大家已七日没吃下一粒米饭。 一天,颜...
    刘现辉民俗画阅读 1,864评论 0 1