logstash:elasticsearch输出和非结构化数据

wnrlj8wa  于 2021-06-15  发布在  ElasticSearch
关注(0)|答案(1)|浏览(349)

filebeat.yml文件:

filebeat.inputs:
- type: log
  paths:
    - C:\Program Files\Filebeat\test_logs\*\*\*\*.txt
  exclude_lines: ['^Infobase.+']
output.logstash:
  hosts: ["localhost:5044"]
  worker: 1

filebeat从这样的文件夹结构收集日志:

C:\Program Files\Filebeat\test_logs\*\*\*\*.txt

这里有许多文件夹,每个文件夹的末尾至少有几个日志。
日志文件示例(在多个日志文件中,时间可能相同,因为日志来自不同的用户):

"03.08.2020 10:56:38","Event LClick","Type Menu","t=0","beg"
"03.08.2020 10:56:38","Event LClick","Type Menu","Detail SomeDetail","t=109","end"
"03.08.2020 10:56:40","Event LClick","t=1981","beg"
"03.08.2020 10:56:40","Event LClick","t=2090","end"
"03.08.2020 10:56:41","Event LClick","Type ToolBar","t=3026","beg"
"03.08.2020 10:56:43","Event LClick","Type ToolBar","Detail User_Desktop","t=4477","end"
"03.08.2020 10:56:44","Event FormActivate","Name Form_Name:IsaA","t=5444"
"03.08.2020 10:56:51","Event LClick","t=12543","beg"
"03.08.2020 10:56:51","Event LClick","t=12605","end"
"03.08.2020 10:56:52","Event LClick","Form ","Type Label","Name Application.for.training","t=13853","beg"
"03.08.2020 10:57:54","Event LClick","Form Application.for.training","Type Label","Name Application.for.training","t=75442","end"
"03.08.2020 10:57:54","Event FormActivate","Name List.form","t=75785"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85769","beg"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85769","end"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85847","beg"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85847","end"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85879","beg"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85879","end"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85925","beg"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85925","end"
"03.08.2020 10:58:08","Event LClick","Form List.form","Type FormTable","Name Список","t=89373","beg"
"03.08.2020 10:58:08","Event LClick","Form List.form","Type FormTable","Name Список","Detail Data","t=89451","end"
"03.08.2020 10:58:15","Event LClick","Form List.form","Type FormTable","Name Список","t=96580","beg"
"03.08.2020 10:58:15","Event LClick","Form List.form","Type FormTable","Name Список","Detail Data","t=96643","end"

日志存储确认文件:

input {
    beats {
        port => '5044'
    }
}
 filter {
    grok {
        patterns_dir => ['./patterns']
        match => { 'message' => '%{TIME:timestamp}(","Event\s)(?<Event>([^"]+))(","Form\s)?(?<Form>([^"]+))?(","ParentType\s)?(?<parent_type>([^"]+))?(","ParentName\s)?(?<parent_name>([^"]+))?(","Type\s)?(?<type>([^"]+))?(","Name\s)?(?<Name_of_form>([^"]+))?(","Detail\s)?(?<Detail>([^"]+))?(","t=)?(?<t>([\d]+))?(",")?(?<Status>(end|beg))?' }
        add_tag => [ '%{Status}' ]
    }
    dissect {
        mapping => {
            '[log][file][path]' => 'C:\Program Files\Filebeat\test_logs\%{somethingtoo}\%{something}\%{User_Name}\%{filename}.txt'
        }
    }
    date {
        match => [ 'timestamp', 'dd.MM.yyyy HH:mm:ss' ]
    }
    elapsed {
        unique_id_field => 'Event'
        start_tag => 'beg'
        end_tag => 'end'
        new_event_on_match => false
    }

    if 'elapsed' in [tags] {
        aggregate {
            task_id => '%{Event}'
            code => 'map["duration"] = [(event.get("elapsed_time")*1000).to_i]'
            map_action => 'create'
        }
    }
    mutate {
        remove_field => ['timestamp', 'ecs', 'log', 'tags', 'message', '@version', 'something', 'somethingtoo', 'filename', 'input', 'host', 'agent', 't', 'parent_type', 'parent_name', 'type']
        rename => {'elapsed_time' => 'Event_duration'}
    }
}
output {
    elasticsearch {
        hosts => ['localhost:9200']
        index => 'test'
    }
}

在我的logstash.conf中,我使用了聚合过滤器并将worker 1(-w 1)设置为正常工作。
当我只使用一个日志文件进行测试和配置时,我设置了-w1,一切正常。但是当我开始收集每个目录的所有日志时,问题就出现了。数据没有正确地放入elasticsearch(这从基于聚合结果的奇怪数字中可以清楚地看出)
我试着在logstash输出(worker:1)的filebeat.yml中设置它,但仍然没有帮助。
问题:
也许你知道怎么解决这个问题?因为奇怪的是,对于一个日志文件或一个目录末尾的多个日志文件,所有内容都正常工作,当添加更多目录时,所有内容都会突然崩溃。
如果我正确理解了这个理论,那么elasticsearch就有了索引和类型。每个日志都有一个时间和一个用户名,这个用户名是谁的日志,也许我应该按日志时间把数据放在索引中,按用户名键入,这样不同用户的同一时间的日志就不会重叠。我应该如何实现这一点?我试图查找信息,只找到有关文档类型的信息,该类型已被弃用。

f2uvfpb9

f2uvfpb91#

您正在使用 elapsed 以及 aggregate 对于不唯一的字段,可以为 Event 字段,可以使 elapsed 筛选器使用一个文件中的开始事件和另一个文件中的结束事件。
这是因为filebeat harvester并行归档并将其批量发送到logstash。这个 worker 配置中的选项在您的情况下没有任何用处,它与传送数据的工作进程数有关,而不是与收集数据有关。
你可以尝试使用这个选项 harvester_limit: 1 ,以限制并行收割机的数量,但这会降低数据处理速度,并且不能保证不会混淆过滤器。而且,filebeat不保证事件的顺序,只保证至少一次交付。
最好的解决方案是创建一个连接 Eventfilename 字段,这样就不会混淆不同文件中的事件。
您可以通过添加 mutate 在您的 elapsed 过滤器。

mutate {
  add_field => { "uniqueEvent" => "%{Event}_%{filename}" }
}

这将创建一个名为 uniqueEvent 具有如下值 Lclick_filename ,然后您将在 elapsed 以及 aggregate 过滤器。
如果在不同的文件夹中有相同的文件名,则需要使用路径中的另一个字段,直到将 uniqueEvent 独特的价值。

相关问题