Logstash filter 的使用

2015-12-14 22:46:43   最后更新: 2015-12-15 21:53:41   访问数量:4360




logstash 之所以强大和流行,与其丰富的过滤器插件是分不开的

过滤器提供的并不单单是过滤的功能,还可以对进入过滤器的原始数据进行复杂的逻辑处理,甚至添加独特的新事件到后续流程中

 

grok 是一个十分强大的 logstash filter 插件,他可以解析任何格式的文本,他是目前 logstash 中解析非结构化日志数据最好的方式

 

基本用法

Grok 的语法规则是:

%{语法 : 语义}

 

 

“语法”指的就是匹配的模式,例如使用 NUMBER 模式可以匹配出数字,IP 则会匹配出 127.0.0.1 这样的 IP 地址:

%{NUMBER:lasttime}%{IP:client}

 

默认情况下,所有“语义”都被保存成字符串,你也可以添加转换到的数据类型

%{NUMBER:lasttime:int}%{IP:client}

 

目前转换类型只支持 int 和 float

 

覆盖 -- overwrite

使用 Grok 的 overwrite 参数也可以覆盖日志中的信息

filter { grok { match => { "message" => "%{SYSLOGBASE} %{DATA:message}" } overwrite => [ "message" ] } }

 

日志中的 message 字段将会被覆盖

 

示例

对于下面的log,事实上是一个 HTTP 请求行:

55.3.244.1 GET /index.html 15824 0.043

 

 

我们可以使用下面的 logstash 配置:

input { file { path => "/var/log/http.log" } } filter { grok { match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" } } }

 

 

可以看到收集结果:

client: 55.3.244.1 method: GET request: /index.html bytes: 15824 duration: 0.043

 

将无结构的数据通过这样的方式实现了结构化输出

 

grok 是在正则表达式的基础上实现的(使用 Oniguruma 库),因此他可以解析任何正则表达式

 

创建模式

提取日志字段和正则表达式提取字段的规则一样:

(?<field_name>the pattern here)

 

 

首先,创建一个模式文件,写入你需要的正则表达式:

# contents of ./patterns/postfix: POSTFIX_QUEUEID [0-9A-F]{10,11}

 

 

然后配置你的 Logstash:

filter { grok { patterns_dir => "./patterns" match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" } } }

 

 

针对日志:

Jan  1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>

可以匹配出:

timestamp: Jan 1 06:25:43

logsource: mailserver14

program: postfix/cleanup

pid: 21403

queue_id: BEF25A72965

syslog_message: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>

 

Logstash 1.3.0 以上可以使用 geoip 插件获取 IP 对应的地理位置,对于 accesslog 等的统计来说,IP 来源是非常有用的一个信息

 

使用方法

geoip { source => ... }

 

 

示例

filter { geoip { source => "message" } }

 

运行结果:

{ "message" => "183.60.92.253", "@version" => "1", "@timestamp" => "2014-08-07T10:32:55.610Z", "host" => "raochenlindeMacBook-Air.local", "geoip" => { "ip" => "183.60.92.253", "country_code2" => "CN", "country_code3" => "CHN", "country_name" => "China", "continent_code" => "AS", "region_name" => "30", "city_name" => "Guangzhou", "latitude" => 23.11670000000001, "longitude" => 113.25, "timezone" => "Asia/Chongqing", "real_region_name" => "Guangdong", "location" => [ [0] 113.25, [1] 23.11670000000001 ] } }

 

 

可以看到,logstash 通过提取到的 message 字段中的 IP,解析到了地理位置相关的一系列信息

当然,对于解析出的众多数据,你也可以通过 fields 选项进行筛选

filter { geoip { fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone"] } }

 

 

选项

上面我们看到了 source 和 fields 两个选项,geoip 还提供了下列选项:

geoip 提供的可选选项
选项类型是否必须默认值意义
add_fieldhash{}为当前事件增加一个字段
add_tagarray[]为当前事件增加一个用于标识的tag
databasepath位置信息库所在文件
fieldsarray在 geoip 的返回结果中筛选部分字段
lru_cashe_sizeint1000geoip 占用的缓存大小
periodic_flushboolfalse是否定期调用刷新方e
remove_fieldarray[]从结果集中删除字段
remove_tagarray[]从结果集中删除tag
sourcestring需要解析的存有 IP 的字段名称
targetstring"geoip"返回的结果中保存 geoip 解析结果的字段名

 

对于 json 格式的 log,可以通过 codec 的 json 编码进行解析,但是如果记录中只有一部分是 json,这时候就需要在 filter 中使用 json 解码插件

 

示例

filter { json { source => "message" target => "jsoncontent" } }

 

 

运行结果:

{ "@version": "1", "@timestamp": "2014-11-18T08:11:33.000Z", "host": "web121.mweibo.tc.sinanode.com", "message": "{\"uid\":3081609001,\"type\":\"signal\"}", "jsoncontent": { "uid": 3081609001, "type": "signal" } }

 

 

上面的例子中,解析结果被放到了 target 所指向的节点下,如果希望将解析结果与 log 中其他字段保持在同一层级输出,那么只需要去掉 target 即可:

{ "@version": "1", "@timestamp": "2014-11-18T08:11:33.000Z", "host": "web121.mweibo.tc.sinanode.com", "message": "{\"uid\":3081609001,\"type\":\"signal\"}", "uid": 3081609001, "type": "signal" }

 

 

mutiline 让 logstash 将多行数据变成一个事件,当然了,logstash 同样支持将一行数据变成多个事件

logstash 提供了 split 插件,用来把一行数据拆分成多个事件

 

示例:

filter { split { field => "message" terminator => "#" } }

 

 

运行结果:

对于 "test1#test2",上述 logstash 配置将其变成了下面两个事件:

{ "@version": "1", "@timestamp": "2014-11-18T08:11:33.000Z", "host": "web121.mweibo.tc.sinanode.com", "message": "test1" } { "@version": "1", "@timestamp": "2014-11-18T08:11:33.000Z", "host": "web121.mweibo.tc.sinanode.com", "message": "test2" }

 

 

需要注意的是,当 split 插件执行结束后,会直接进入 output 阶段,其后的所有 filter 都将不会被执行

 

logstash 还支持在 filter 中对事件中的数据进行修改

 

重命名 -- rename

对于已经存在的字段,重命名其字段名称

 

filter { mutate { rename => ["syslog_host", "host"] } }

 

 

更新字段内容 -- update

更新字段内容,如果字段不存在,不会新建

filter { mutate { update => { "sample" => "My new message" } } }

 

 

替换字段内容 -- replace

与 update 功能相同,区别在于如果字段不存在则会新建字段

filter { mutate { replace => { "message" => "%{source_host}: My new message" } } }

 

 

数据类型转换 -- convert

filter { mutate { convert => ["request_time", "float"] } }

 

 

文本替换 -- gsub

gsub 提供了通过正则表达式实现文本替换的功能

filter { mutate { gsub => [ # replace all forward slashes with underscore "fieldname", "/", "_", # replace backslashes, question marks, hashes, and minuses # with a dot "." "fieldname2", "[\\?#-]", "." ] } }

 

 

大小写转换 -- uppercase、lowercase

filter { mutate { uppercase => [ "fieldname" ] } }

 

 

去除空白字符 -- strip

类似 php 中的 trim,只去除首尾的空白字符

filter { mutate { strip => ["field1", "field2"] } }

 

 

删除字段 -- remove、remove_field

remove 不推荐使用,推荐使用 remove_field

filter { mutate { remove_field => [ "foo_%{somefield}" ] } }

 

 

分割字段 -- split

将提取到的某个字段按照某个字符分割

filter { mutate { split => ["message", "|"] } }

 

 

针对字符串 "123|321|adfd|dfjld*=123",可以看到输出结果:

{ "message" => [ [0] "123", [1] "321", [2] "adfd", [3] "dfjld*=123" ], "@version" => "1", "@timestamp" => "2014-08-20T15:58:23.120Z", "host" => "raochenlindeMacBook-Air.local" }

 

 

聚合数组 -- join

将类型为 array 的字段中的 array 元素使用指定字符为分隔符聚合成一个字符串

 

如我们可以将 split 分割的结果再重新聚合起来:

filter { mutate { split => ["message", "|"] } mutate { join => ["message", ","] } }

 

 

输出:

{ "message" => "123,321,adfd,dfjld*=123", "@version" => "1", "@timestamp" => "2014-08-20T16:01:33.972Z", "host" => "raochenlindeMacBook-Air.local" }

 

 

合并数组 -- merge

对于几个类型为 array 或 hash 或 string 的字段,我们可以使用 merge 合并

filter { mutate { merge => [ "dest_field", "added_field" ] } }

 

 

需要注意的是,array 和 hash 两个字段是不能 merge 的

 






技术帖      龙潭书斋      merge      event      json      filter      logstash      elk      elkstack      multiline      split      mutate      grok      geoip     


京ICP备15018585号