Logstash 的优雅的filter—-处理大堆的if的方案

  • admin 
  • DEV

logstash的filter需要大量的规则来对输入的数据进行大量if操作,这篇文章来写写,怎么把冗余的配置变得稍稍优雅一点。

缘起

为什么想到优化配置呢?因为业务的上报的需要,每个上报的ID其对应的字段都不同。
所以根据官方的给出的示例配置(如下),

 filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
    geoip {
        source => "clientip"
    }
}

就推导出来这样看起来合理的配置:

    if "xxxx," in [message] {
        ruby {
                init => "@kname = ['logid', 'protocol_version'.....']"
                code => "
                    v = Hash[@kname.zip(event.get('message').split(','))]
                    new_event = LogStash::Event.new(v)
                    new_event.remove('@timestamp')
                    event.append(new_event)
                    "
            }
        date {
                 match => ["reported_time","yyyy-MM-dd HH:mm:ss,SSS","UNIX"]
                 target => "@timestamp_app"
                 }
    }
    ... * N

在这种情况下,一个ID需要一个这样的配置段,那么如果有几百甚至上千个ID,那么配置文件的规模将会极为庞大。

关键是,在这种fliter 的配置下,整个的logstash的启动速度是非常的缓慢。如果到达来几百个if的话,启动时间,将会耗费40分钟甚至启动失败


所以,就开始寻找问题,并进行解决。

一次尝试 Grok

Grok作为logstash里面的一个正则组件,使用的是比较多的。为什么这里想到grok呢?
因为主要的目标,是将这种冗余的配置给整体化,也就是变成一整块,这样的话家在和启动速度将会有很大的提高。


在grok里面有一个功能PatternDir可以在一个文件里写好正则的别名,然后使用它的别名来控制将使用的正则表达式。内容差不多如下:

    filter {
      grok {
        match => {
          patterns_dir => ["/opt/logstash/patterns"]
          "message" => "Duration: %{NUMBER:duration}"
        }
      }
    }

这里的NUMBER就是一个定义好了的正则。当执行这句的时候,会取回这个NUMBER对应的正则,对当前的内容进行匹配。

所以,根据这个思路,就有了这样的思路:

match => {
    patterns_dir => ["/opt/logstash/patterns"]
    "message" => "%{type}"
}

希望使用type这个字段来自动的到文件中去寻找对应的正则表达式。然而问题在于:在这个message 的匹配项中,根本就不支持变量这个东西,也就是说,他会去寻找别名为%{type}的正则表达式,而不会去找其值对应的。


所以就继续想办法,由于类似的情景比较少,所以最终只能求助(外部势力)??,在stackover flow 上找到了答案:

filter { 
    grok {
        match => { "message" => [ 
                "TYPE1,%{WORD:a1},%{WORD:a2},%{WORD:a3},%{POSINT:a4}",
                "TYPE2,%{WORD:b1},%{WORD:b2},%{WORD:b3},%{WORD:b4}",
                "TYPE3,%{POSINT:c1},%{WORD:c2},%{POSINT:c3},%{WORD:c4}"  ]
            }
    }
} 

对,虽然这种方式没有放在文件夹里面,但是一样可以使用type来进行统一的管理。
所以就跟就这样的方式来写好来一个正则的array。

然后,试着直接使用filebeat提交了几个数据,发现好使!于是就满怀激动的就接上了日志的输入。
然后问题就来了,CPU直接跑满,而且出现大量的正则超时。

查看了原因,grok的match默认是从第一项开始匹配,如果只有几项这种方法完全可以,但是一旦数目多了,就极度的耗费资源,因为正则本身也蛮累的。所以又得改。。

最终方案

没得办法,最后只能自己想办法。从需求出发,需要达到字段匹配的目的。那么使用id来找到对应的字段这个映射模式第一下就想到了字典。

logstash 本身对ruby 语声称是100%的兼容,所以,就给了很大的空间。
ls里面的 ruby段内有两层:initcode分别是 初始化时候执行一次,和每次事件都执行一遍。

 filter {
        ruby {
            init => "  H = Hash[
                'id1' => ['logid', 'protocol_version',xxxx],
                'id2' => ['logid', 'protocol_version',xxxx'],
            "
            }
            ...
            ruby {
               code => "
                v = Hash[H[event.get('message').split(',').at(0)].zip(event.get('message').split(','))]
                new_event = LogStash::Event.new(v)
                new_event.remove('@timestamp')
                event.append(new_event)
            "
            }
            ...
        }

用了map的方式,把几百次的if操作,变成了查表,使得启动时间有了从40分钟到40秒的优化。

现在发现,很多东西不想以前一样了,他有,你拿来用就行了,真正到了专业的领域,
变成了他没有,你得实现。所以得靠自己的一点点的经验积累才行。

留下点什么吧