logstash-时间戳转自定义时间格式

关于logstash, 网上的博客资料还算不少, 但是之前在做项目的时候, 碰到个需求, 需要将kafka中读到的数据放到elasticsearch中, 而这个index的值是根据kafka中创建时间来定的。

比如说kafka中传递的创建时间是2021-10-10的时间戳, 而我这条数据需要放入es中index为xxx_2021.10.10的表中, 这其中就需要将时间戳对象转成自定义时间格式数据, 然后再将该数据赋值给index, 但是就是这么个简单的事, 我查了半天资料才发现事情并不简单, 网上基本都是时间格式类型转时间戳, 基本没见到有时间戳转时间格式的, 所以这次就把自己做的发出来给各位小伙伴们作为参考, 以免都跟我一样卡半天。

上代码

filter {

    mutate{

        add_field => {"index_time"=>"error_timestamp"}

    }

    ruby{

        code=>"event.set('index_time',Time.at((event.get('create_time').to_i)/1000).strftime('%Y.%m.%d'))"

    }

}

output {

    elasticsearch {

        hosts => ["xxx:9092"]

        index => "xxxx_%{index_time}"

        document_type=>"log"

    }

}

重要代码基本就这一行:event.set('index_time',Time.at((event.get('create_time').to_i)/1000).strftime('%Y.%m.%d'))

to_i的效果是将String转成int或long类型, 因为这边kafka传递的是字符串的, 所以进行转化

Time.at是秒级时间戳转时间格式的方法, /1000是除以1000, 这边时间戳是毫秒级的

最后的strftime是时间格式转换, 里面的是格式, 更详细的可以参考ruby文档


以上基本就是咋们处理这个小问题的方式, 希望对大家有用

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容