使用elasticsearch实时分析事件日志

jw5wzhpr  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(313)

每次更改某个设备的属性时,我都会收集事件日志。为此,我决定使用:
logstash—我的代理iot应用程序以json格式将日志发送到,
elasticsearch-用于存储数据(日志),
kibana-用于数据可视化。
带有日志的json是定期发送的,其形式如下:

{"deviceEventLogs":[{"date":"16:16:39 31-08-2016","locationName":"default","property":"on","device":"Lamp 1","value":"
false","roomName":"LivingRoom"}, ... ,]}

elasticsearch中单个事件条目的示例如下所示:

{
            "_index": "logstash-2016.08.25",
            "_type": "on",
            "_id": "AVbDYQPq54WlAl_UD_yg",
            "_score": 1,
            "_source": {
               "@version": "1",
               "@timestamp": "2016-08-25T20:25:28.750Z",
               "host": "127.0.0.1",
               "headers": {
                  "request_method": "PUT",
                  "request_path": "/deviceEventLogs",
                  "request_uri": "/deviceEventLogs",
                  "http_version": "HTTP/1.1",
                  "content_type": "application/json",
                  "http_user_agent": "Java/1.8.0_91",
                  "http_host": "127.0.0.1:31311",
                  "http_accept": "text/html, image/gif, image/jpeg, *; q=.2, */*; q=.2",
                  "http_connection": "keep-alive",
                  "content_length": "34861"
               },
               "date": "2016-08-08T14:48:11.000Z",
               "device": "Lamp 1",
               "property": "on",
               "locationName": "default",
               "roomName": "LivingRoom",
               "value_boolean": true
            }
 }

我的目标是创建一个具有某种 Jmeter 盘的网站,在合理的时间内显示分析的数据(可以接受几分钟),即:
展示能源消耗历史,预测能源消耗特征
检测能源消耗或其他因素(如照明或取暖)的异常情况
显示基于某种非统计数据的建议,即“您可以将给定设备从位置1移动到位置2,因为那里更需要它(比其他地方使用更密集)”,等等。
最后一点很简单-我可以在elasticsearch中使用简单的查询或聚合,然后将其与某个treshold值进行比较,前两点需要深入的分析,如机器学习或数据挖掘。
目前,该系统平均每10秒就有大约50台设备更新状态。在未来,设备的数量可以增加到50000台。在elasticsearch中,假设一个事件日志有100个字节,每年大约有15 TB的数据。
一般的问题是-什么是合理的解决方案/技术/体系结构?
在elasticsearch中存储所有日志是否合理?
我认为使用elasticsearch和apachespark的es hadoop库能够在spark中使用mlib处理我的数据-这是一个合理的方向吗?
我可以只使用elasticsearch来存储我的所有数据,而只使用spark和mlib来提供深入的分析吗?还是应该考虑实现所谓的“lambda架构”,将elasticsearch视为一个速度层?我对使用kafka和apachestorm的各种配置有过一些了解,但我不确定是否需要它。由于该项目应该在一个月内完成,我是一个初学者,我担心的复杂性,因此这样的实施所需的时间。
如果数据负载减少10倍(大约每年1.5TB),你的答案会是一样的吗?

ar7v8xwq

ar7v8xwq1#

这是一个非常复杂的问题,让我试着把它分解一下:
你应该考虑的问题
您的数据可用于查询的端到端延迟是多少?你需要实时的还是你可以接受延迟?
您愿意容忍什么样的数据丢失?
你所看到的analytics/ml算法的准确度是多少?你需要高准确度的结果,还是你对一些不准确的地方没意见?
你需要的结果只有当他们是完整的还是你需要某种推测性的结果?
这些问题以及诸如空间限制和数据负载增加时的延迟等常规问题应该可以帮助您确定正确的解决方案。
通常,这些问题可以看作是摄取->处理->呈现。
摄取-需要消息总线
一般来说,人们选择像kafka这样的消息总线来处理来自下游用户的反压力,并提供可靠性(通过持久化到磁盘)来防止数据丢失。kafka在集成方面也有很好的社区支持,比如spark streaming、druid firehose支持、es插件等。
处理-需要可扩展的计算层
这是你需要决定的事情,如实时与批处理,适用的数据丢失,准确与推测的结果,等阅读泰勒akidau的文章流https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101 详细解释。
人们选择spark流用于实时用例,而一个简单的m/r作业应该可以实现批处理作业。如果您计划流式处理作业,那么窗口和事件会话会使事情进一步复杂化。
演示-需要交互式查询和快速响应
这就是面向前端的应用程序要集成的地方,选择一个非常适合预期查询类型和所需响应准确性的工具是有意义的。
像es这样的工具在搜索、过滤和切面方面表现得非常好,但是当需要复杂的数学聚合时就失败了。afaik es不像druid那样支持概率结构,比如hyperloglog。
改造
现在您必须将您的需求Map到上面的每个层。
展示能源消耗历史,预测能源消耗特征
检测能源消耗或其他因素(如照明或取暖)的异常情况
正如你提到的,你显然需要机器学习库。spark的mlib支持非常棒。
显示基于某种非统计数据的建议,即“您可以将给定设备从位置1移动到位置2,因为那里更需要它(比其他地方使用更密集)”,等等。
您甚至可以使用spark上的mlib来实现这一点,并将这些建议放在es中的单独索引中,甚至放在kafka主题中,您可以进一步将其放在hdfs或es中。在这里,您应该小心垃圾收集,因为这可能导致数据爆炸,您需要在这里积极保留。此外,在手之前计算推荐还可以帮助你做一些React性的事情,比如警报、推送通知,甚至是来自ui的查询都会更快。
在elasticsearch中,假设一个事件日志有100个字节,每年大约有15 TB的数据。
这些是任何存储系统资源调配的正常问题。您可以在这里通过计算历史数据的具体化视图来进行优化,但是您可以稍后再做决定,因为这可能会导致过早的优化。最好先测量查询的存储和延迟,然后对容量进行追溯分析。
在elasticsearch中存储所有日志是否合理?
考虑到您的用例,情况非常好。但是如果使用spark streaming/mlib或批处理mr作业,那么您甚至可以使用哑数据存储,因为大多数计算都是在手工操作之前进行的。
我认为使用elasticsearch和apachespark的es hadoop库能够在spark中使用mlib处理我的数据-这是一个合理的方向吗?
看起来您已经决定批处理,在这种情况下,您可以使用标准mr或spark batch以及mlib。如果你需要实时,你需要像Kafka和使用Spark流。如果您对数据丢失没有意见,那么在决定窗口/滑动间隔等时,您可能会在保留方面表现出攻击性,甚至在spark中也是如此。如果您对结果不准确没有意见,那么您可以使用概率数据结构(如bloom filter、hyperloglog-druid支持这一点)来表示结果。
我可以只使用elasticsearch来存储我的所有数据,而只使用spark和mlib来提供深入的分析吗?还是应该考虑实现所谓的“lambda架构”,将elasticsearch视为一个速度层?
我不知道你是否可以从es流数据到spark jobs。而且lambda体系结构被过分夸大了,只有当你确信你的实时层是不准确的并且你不能处理数据丢失/不准确的时候,它才有帮助。否则,从kafka读取数据并泵送到es的简单spark流作业就足够了。在决定使用lambda这样的复杂体系结构之前,请考虑测量数据丢失,因为操作成本(如重复代码、需要维护的基础设施等)可能很高。
如果数据负载减少10倍(大约每年1.5TB),你的答案会是一样的吗?
我仍然更喜欢相同的架构-Kafka+Spark流(mlib)+es/druid-这更易于实现和维护。

相关问题