logstash的filter需要大量的规则来对输入的数据进行大量if操作,这篇文章来写写,怎么把冗余的配置变得稍稍优雅一点。
缘起
为什么想到优化配置呢?因为业务的上报的需要,每个上报的ID其对应的字段都不同。
所以根据官方的给出的示例配置(如下),
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
就推导出来这样看起来合理的配置:
if "xxxx," in [message] {
ruby {
init => "@kname = ['logid', 'protocol_version'.....']"
code => "
v = Hash[@kname.zip(event.get('message').split(','))]
new_event = LogStash::Event.new(v)
new_event.remove('@timestamp')
event.append(new_event)
"
}
date {
match => ["reported_time","yyyy-MM-dd HH:mm:ss,SSS","UNIX"]
target => "@timestamp_app"
}
}
... * N
在这种情况下,一个ID需要一个这样的配置段,那么如果有几百甚至上千个ID,那么配置文件的规模将会极为庞大。
关键是,在这种fliter 的配置下,整个的logstash的启动速度是非常的缓慢。如果到达来几百个if的话,启动时间,将会耗费40分钟甚至启动失败。
所以,就开始寻找问题,并进行解决。
一次尝试 Grok
Grok作为logstash里面的一个正则组件,使用的是比较多的。为什么这里想到grok呢?
因为主要的目标,是将这种冗余的配置给整体化,也就是变成一整块,这样的话家在和启动速度将会有很大的提高。
在grok里面有一个功能PatternDir
可以在一个文件里写好正则的别名,然后使用它的别名来控制将使用的正则表达式。内容差不多如下:
filter {
grok {
match => {
patterns_dir => ["/opt/logstash/patterns"]
"message" => "Duration: %{NUMBER:duration}"
}
}
}
这里的NUMBER就是一个定义好了的正则。当执行这句的时候,会取回这个NUMBER对应的正则,对当前的内容进行匹配。
所以,根据这个思路,就有了这样的思路:
match => {
patterns_dir => ["/opt/logstash/patterns"]
"message" => "%{type}"
}
希望使用type这个字段来自动的到文件中去寻找对应的正则表达式。然而问题在于:在这个message 的匹配项中,根本就不支持变量这个东西,也就是说,他会去寻找别名为%{type}的正则表达式,而不会去找其值对应的。
所以就继续想办法,由于类似的情景比较少,所以最终只能求助(外部势力)??,在stackover flow 上找到了答案:
filter {
grok {
match => { "message" => [
"TYPE1,%{WORD:a1},%{WORD:a2},%{WORD:a3},%{POSINT:a4}",
"TYPE2,%{WORD:b1},%{WORD:b2},%{WORD:b3},%{WORD:b4}",
"TYPE3,%{POSINT:c1},%{WORD:c2},%{POSINT:c3},%{WORD:c4}" ]
}
}
}
对,虽然这种方式没有放在文件夹里面,但是一样可以使用type来进行统一的管理。
所以就跟就这样的方式来写好来一个正则的array。
然后,试着直接使用filebeat提交了几个数据,发现好使!于是就满怀激动的就接上了日志的输入。
然后问题就来了,CPU直接跑满,而且出现大量的正则超时。
查看了原因,grok的match默认是从第一项开始匹配,如果只有几项这种方法完全可以,但是一旦数目多了,就极度的耗费资源,因为正则本身也蛮累的。所以又得改。。
最终方案
没得办法,最后只能自己想办法。从需求出发,需要达到字段匹配的目的。那么使用id来找到对应的字段这个映射模式第一下就想到了字典。
logstash 本身对ruby 语声称是100%的兼容,所以,就给了很大的空间。
ls里面的 ruby段内有两层:init
, code
分别是 初始化时候执行一次,和每次事件都执行一遍。
filter {
ruby {
init => " H = Hash[
'id1' => ['logid', 'protocol_version',xxxx],
'id2' => ['logid', 'protocol_version',xxxx'],
"
}
...
ruby {
code => "
v = Hash[H[event.get('message').split(',').at(0)].zip(event.get('message').split(','))]
new_event = LogStash::Event.new(v)
new_event.remove('@timestamp')
event.append(new_event)
"
}
...
}
用了map的方式,把几百次的if操作,变成了查表,使得启动时间有了从40分钟到40秒的优化。
后
现在发现,很多东西不想以前一样了,他有,你拿来用就行了,真正到了专业的领域,
变成了他没有,你得实现。所以得靠自己的一点点的经验积累才行。