过早客
  • 首页
  • 节点
  • 成员
  • 广告投放
  • 登录
  • 注册

从一份定义文件详解ELK中Logstash插件结构

IT技术 • CodeSheep • 发表于 6 年前 • 最后回复来自 dyuanw • 6 年前

Profile



概述

当下分布式系统的 日志收集、日志分析、日志处理、可视化 的热门技术栈方案当然非 ELK(ElasticSearch、Logstash、Kibana)莫属,从 L → E → K 构成了一条数据的 Pipeline管道:

  • Logstash:与数据源对接,用于收集、过滤处理你的日志、事务或其他数据
  • ElasticSearch: 是一个开源的,分布式 RESTful 搜索引擎,在 ELK中可以初略理解为数据存储的地方
  • Kibana:将 Elasticsearch 的数据分析并渲染为可视化的报表,便于高效分析

而且在我的前文《利用 ELK搭建 Docker容器化应用日志中心》之中,曾利用 ELK 搭建了一条数据管道,用作 Docker容器化应用的日志中心。

注: 本文首发于 My 公众号 CodeSheep ,可 长按 或 扫描 下面的 小心心 来订阅 ↓ ↓ ↓

CodeSheep · 程序羊



为什么先讲Logstash

作为与数据源 “直接对接” 的 Logstash,位置处于 ELK 数据管道的 最前端,其主要作用是 收集、过滤分析、输出 各种结构化或者非结构化的原始数据(典型的如日志数据),原始数据从 “无序变有序” 的重担就落在了Logstash的肩上了,因此其作用举足轻重。

说到Logstash,不得不说其中的 插件机制,其几乎所有的功能都是靠插件来实现的,因此灵活易用:

  • 关于 数据收集,Logstash 提供了输入插件来支持各种不同的数据源
  • 关于 数据分析,Logstash 则提供了过滤器插件来支持对输入原始数据的花式处理
  • 关于 数据输出,Logstash 也提供了各种输出插件,从而支持将结果数据输出到各种地方,比如标准控制台,文件,各种数据库包括 ElasticSearch 等


Logstash的插件管理

Logstash 插件是使用 Ruby开发的,Logstash 从很早的1.5.0+版开始,其插件模块和核心模块便分开维护,其插件使用的是 RubyGems包管理器来管理维护。所以 Logstash插件本质上就是自包含的RubyGems。

RubyGems(简称 gems)是一个用于对 Ruby组件进行打包的 Ruby 打包系统。 它提供一个分发 Ruby 程序和库的标准格式,还提供一个管理程序包安装的工具。

可以在网址 rubygems.org上搜索所有Logstash插件:

rubygems.org

关于插件的常用操作如下:

  • 安装插件

可以在线安装:

bin/plugin install [插件名称]

当然也可以将插件提前下载到本地,然后本地安装:

bin/plugin install path/logstash-xxx-x.x.x.gem
  • 卸载插件
bin/plugin uninstall [插件名称]
  • 更新插件
bin/plugin update [插件名称]

其会将插件更新到最新的版本



Logstash的插件定义语法结构

Logstash 插件的定义其实使用的就是一套其自定义的 DSL语法,我还是习惯用图来说明吧:

Logstash的插件结构

从图中可以看出主要包含以下几大部分内容:

1. 需要的依赖

该部分一般会用require语法引入如下依赖:

require "logstash/XXX/base"
require "logstash/namespace"
  • 前者引入 特定类型插件的依赖
  • 后者引入 模块命名空间

2. 类定义

需要用 class语法给每一个插件定义一个类,后面我会用实际代码说明

3. 配置插件名字

通过 config_name 语法来给插件取一个名字,这个名字将会用到 Logstash.conf 配置文件的插件配置之中

4. 配置选项设置

可以使用 config 语法来按需定义任意个配置项。可以设置配置选项的名字、数据类型、默认值以及是否为必选项:

举例:

config :percentage, :validate => :number, :default =>100
  • :percentage:定义配置项的名字
  • :validate:配置指定参数的数据类型,如此处为 number类型
  • :default:指定配置项的默认值
  • :required:用于指定配置项是否必选

5. 插件方法

每一种类型的插件都需要实现一些方法,如下表所示:

插件类型 插件方法
输入插件 register、 run
过滤器插件 register、 filter
输出插件 register、 receive
编解码插件 register、 encode、 decode

Logstash 插件所具备的业务处理功能就来源于上述插件方法业务逻辑实现!

好了,理论部分总结到这,下面结合一份Logstash插件定义的源码来例析一下!



一份Logstash插件定义文件例析

我们以 Logstash 插件的官网给出的一个 Logstash 过滤器插件 logstash-filter-example 的源码为例来进行分析,麻雀虽小,五脏俱全!代码解析已经标注于图中,不再赘述。

logstash-filter-example插件源码

当然此处的实例给出的是一个入门实例,毕竟不可能在一篇篇幅有限的文章里给出一个太过复杂的 Logstash的插件源码。对照该源码和上一节的内容,我想应该不难理解Logstash的插件源码结构了吧。

计划后续展示一个 根据具体数据需求 来自定义开发一个满足特定需求的 Logstash插件的实例。



后记

  • 作者更多的原创文章在此,欢迎观赏

  • My Personal Blog

作者更多的SpringBt实践文章在此:

  • Spring Boot Admin2.0开箱体验
  • Spring Boot应用监控实战
  • SpringBoot应用部署于外置Tomcat容器
  • ElasticSearch搜索引擎在SpringBt中的实践
  • 初探Kotlin+SpringBoot联合编程
  • Spring Boot日志框架实践
  • SpringBoot优雅编码之:Lombok加持

如果有兴趣,也可以抽点时间看看作者一些关于容器化、微服务化方面的文章:

  • 利用K8S技术栈打造个人私有云 连载文章
  • 从一份配置清单详解Nginx服务器配置
  • Docker容器可视化监控中心搭建
  • 利用ELK搭建Docker容器化应用日志中心
  • RPC框架实践之:Apache Thrift
  • RPC框架实践之:Google gRPC
  • 微服务调用链追踪中心搭建
  • Docker容器跨主机通信
  • Docker Swarm集群初探
  • 高效编写Dockerfile的几条准则

可 长按 或 扫描 下面的 小心心 来订阅 CodeSheep,获取更多 务实、能看懂、可复现的 原创文 ↓↓↓

CodeSheep · 程序羊


加入收藏 新浪微博 分享到微信 ❤赞 1379 次点击 0 人赞 2 人收藏

打开微信“扫一扫”,打开网页后点击屏幕右上角分享按钮

共收到3条回复
hello_world_65 6 年前 #1 赞 0

Mark

zhangergou 6 年前 #2 赞 0

kafka -> logstash -> es

一个请求响应 两条数据进了 kafka logstash这边就两条 进了 es 根据 uuid做数据更新 楼主有了解过么 , 多层json 目前内层json还是字符串 才开始玩 能指点迷津不咯 谢啦.

ConsumerRecord(topic = apiTest, partition = 2, offset = 135676, CreateTime = 1532496191672, checksum = 775077821, serialized key size = -1, serialized value size = 629, key = null, value = {"logType":"apilog","logId":"c627625ee7054191bb3febf637c13440","hostIp":"192.168.5.34","hostName":"Dell-PC","dealIp":"0:0:0:0:0:0:0:1","externalIp":null,"domainName":null,"userloginId":null,"createTime":"2018-07-25 13:23:01","url":"http://localhost:9191/tracking/v1/importbill/subStage","datatKey":null,"dataId":null,"method":"qurey","status":"begin","request":"{ \"voyage\": \"1811\", \"vslName\": \"ARCTIC APIRIT\", \"blNo\": \"0418WHG04ASP35\", \"subStage\": \"CUS_DCLR_IM_RLS\"}","response":null,"spendTime":null,"httpMethod":"POST","classMethod":"com.easipass.api.controller.TrackingV1Controller.getImportbillInfo"})

ConsumerRecord(topic = apiTest, partition = 1, offset = 135679, CreateTime = 1532496204494, checksum = 2573605207, serialized key size = -1, serialized value size = 637, key = null, value = {"logType":"apilog","logId":"c627625ee7054191bb3febf637c13440","hostIp":"192.168.5.34","hostName":"Dell-PC","dealIp":null,"externalIp":null,"domainName":null,"userloginId":null,"createTime":null,"url":null,"datatKey":null,"dataId":null,"method":"qurey","status":"end","request":null,"response":"{\"statusCode\":200,\"msg\":\"\",\"dataList\":[{\"stage\":\"CUS_DCLR_IM\",\"cusLetpasStatusCode\":\"1\",\"cusLetpasStatus\":\"\u5DF2\u653E\u884C\",\"vslName\":\"ARCTIC APIRIT\",\"subStage\":\"CUS_DCLR_IM_RLS\",\"cusLetpasTime\":\"2018-06-04T09:14:00+08:00\",\"voyage\":\"1811\"}]}","spendTime":"19883","httpMethod":null,"classMethod":null})



kafka-logstash - es conf

input {
kafka {
bootstrap_servers => "192.168.129.122:9092,192.168.129.123:9092"
group_id => "elasticconsumer"
topics => "apiTest"
codec => json{
charset =>"UTF-8"
}
}
}
filter{
json {
source => "message"
remove_field => ["message"]
}
}

output {
elasticsearch {
hosts => ["192.168.129.171:9200"]
index => "daaslog-%{+YYYY-MM}"
}

}
dyuanw 6 年前 #3 赞 0

@zhangergou 保证es的_id是你的uuid即可,es根据_id判断是否是存在,不存在就插入,存在就更新。

请绑定手机号后,再发言,点击此处
Guozaoke.com—源自武汉的高端交流分享社区
相关主题
现在工作越来越难做了
最近时间少了, 隔了这么久, 才做了一个网站
求推荐一个AI智能体客服
请问大家都是通过哪些好用的链接访问GPT
请教机器学习人工智能的一个技术问题
搞了个AI 生图的网站,不需要登录,永久免费
[第二波送码]动动嘴皮,AI秒出图!2025最炸裂图标神器:免费生成+手机实时预览
开发了一个管理 Linux 服务器的桌面可视化管理工具,求蹂躏
服务器可视化采用纯前端渲染,这个技术实现难度如何
做了一款摸鱼软件,有人要试用吗?

过早客微信公众号:guozaoke • 过早客新浪微博:@过早客 • 广告投放合作微信:fullygroup50 鄂ICP备2021016276号-2 • 鄂公网安备42018502001446号