ElasticSearch日志时间

x33g5p2x  于2021-03-14 发布在 ElasticSearch  
字(1.6k)|赞(0)|评价(0)|浏览(688)

kibana默认是按照客户端的采集时间(@timestamp)进行排序,这往往不是我们所需要的,我们需要的是对日志实际时间进行排序,要解决这个问题,可以通logstash的过滤插件将日志时间转换成默认@timestamp进行排序;

1、我们要解析的时间格式是标准的java日期格式:

如何取日志中的时间作为kibana展示时排序用的时间

日志主机是 java 的sprint boot 应用程序的日志, 每行日志的最前边都是时间信息 如 2018-09-11 22:07:42.346 这样, 默认kiban 展示那里的时间 好像都是 es 读到这条日志的时间, 有时候会与日志中的时间差别较大。 所以我在想, 怎么样在 filebeat 这里从日志中取出时间 , 作为kibana 那里的时间。

我的问题是“kibana上分析的时间是系统当前时间,而不是日志产生的时间,当然我是追求解决问题的方式,就是让日志产生时间,变成kibana分析时间(在分析旧日志相当有效,不然你丫开个10线程,一天分析十天的旧日志,统计一些数据就会出现很大偏差,你会发现,挖槽,日pv几十亿,不得了拉)

其实也就是在filter中,和grok并列多加一行date,来处理即可

input {
    file {
        type => "nginx_access"
        path => ["/data/logs/*.log"]
        start_position => beginning
        ignore_older => 0
    }
}
filter {
    grok {
        match => {
            "message" => "%{IPORHOST:client_ip} (%{USER:ident}|-) (%{USER:auth}|-) \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} (%{NOTSPACE:request}|-)(?: HTTP/%{NUMBER:http_version})?|-)\" (%{NUMBER:response}|-) (?:%{NUMBER:bytes}|-) \"(%{NOTSPACE:referrer}|-)\" (%{QS:agent}|-) \"(%{WORD:x_forword}|-)\"\"(?<upstream_cache_status>\S+)\""
        }
    }
    date {
        match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
    }
    geoip {
        source => "client_ip"
    }
}
output {
    elasticsearch {
        hosts => ["172.16.105.153:9200"]
        index => "logstash-nginx-access"
       workers => 2
       flush_size => 1
       idle_flush_time => 1
       template_overwrite => true
   }
}

继续跑起来,你会发现,一切都是你想要的

es ingest node

grok {
    match => [ "message", "%{DATESTAMP:timestamp}" ]
}
date {
    locale => "en"
    timezone => "Europe/Paris"
    match => [ "timestamp", "yyyy-MM-dd HH:mm:ss,SSS" ]
    target => ["tstamp"]
    remove_field => ["timestamp"]
}

相关文章

微信公众号

最新文章

更多