将nginx日志放入kafka的最佳选择?

x6h2sr28  于 2021-06-07  发布在  Kafka
关注(0)|答案(5)|浏览(397)

我们正在处理来自几个服务器的大型日志文件,这些服务器是我们在hdfs上添加的。我们目前有一个很好的批处理解决方案(主要是每天移动和写入文件),并希望用kafka实现一个实时解决方案。
基本上,我们需要将nginx的日志放到kafka中,然后编写一个消费者在hdfs上进行编写(这可以通过hdfs消费者完成)https://github.com/kafka-dev/kafka/tree/master/contrib/hadoop-consumer).
您建议用哪种方法将原木移入Kafka?
我们可以编写一个nginx模块,但并不是那么简单。这个https://github.com/demandcube/sparkngin 可以提供一些线索。
读取日志文件(tail…)看起来是个坏主意,因为有一个无用的写操作。logstash还需要写/读操作,然后再推到kafka,这似乎是不必要的。
还有别的主意吗?

unftdfkk

unftdfkk1#

你试过用吗 Logstash 为了这个目的?它在我们的系统中运行良好。 Logstash 是一个组件,您只需要解压缩并配置一些小参数,如 Kafka 端点和端口、主题和要传输消息的格式以及从哪个文件传输消息。

vnzz0bqm

vnzz0bqm2#

试试linux pipes tail+kafkacat。
从stdin读取消息,使用snappy压缩生成“syslog”主题

tail -F /var/log/syslog | kafkacat -b mybroker -t syslog -z snappy

小心使用-f而不是-f,-f能够处理日志旋转问题-f即使文件无法访问或变得无法访问,也要继续尝试打开文件;按名称后跟时很有用。

v2g6jxz6

v2g6jxz63#

我在为nginx logpush to kafka工作时读到了这篇文章。读完后,我发现它很有用。特别是“马雷萨”的评论。确切地说,这应该是使用rsyslog的最佳方法。
但同时,我个人认为,logstash不能很好地适应Kafka被使用的管道。至少我没有找到任何理由在使用Kafka处理日志的地方使用logstash。如果我们使用logstash,那么在这个管道中使用logstash而不使用kafka会更有意义。
无论如何谢谢你。很棒的线。感谢任何人分享更多的想法。
雷格,阿明

kqhtkvqz

kqhtkvqz4#

我知道这是个老问题。但最近,我也需要做同样的事情。
问题 tail -f producer 是在原木上旋转,当尾巴死的时候,你真的不知道哪条线被送到了Kafka。
从nginx1.7.1开始,access\u log指令可以登录到syslog。请看http://nginx.org/en/docs/syslog.html. 我们利用它来记录rsyslog和从rsyslog到kafka。http://www.rsyslog.com/doc/master/configuration/modules/omkafka.html
这是一种有点迂回的方式来做,但这种方式,有没有减少的机会,日志丢失。另外,如果您使用的是centos,rsyslog还是标准的。
简而言之,以下是将nginx log放入kafka的最佳选项:
nginx->rsyslog->Kafka

0ve6wy6x

0ve6wy6x5#

另一种选择是使用nginx clojure java日志处理程序将nginx访问日志实时保存到kafka。在下面的示例中,我们使用java将nginx访问日志实时写入文件。
nginx.conf格式:

location /hello {
  ....
  log_handler_type java;
  log_handler_name mytest.MyLogHandler;
  log_handler_property logUserAgent on;
}

java代码:

public class SimpleLogHandler implements NginxJavaRingHandler, Configurable {

    boolean logUserAgent;

    @Override
    public Object[] invoke(Map<String, Object> request) throws IOException {
        File file = new File("logs/SimpleLogHandler.log");
        NginxJavaRequest r = (NginxJavaRequest) request;
        try (FileOutputStream out = new FileOutputStream(file, true)) {
            String msg = String.format("%s - %s [%s] \"%s\" %s \"%s\" %s %s\n", r.getVariable("remote_addr"),
                    r.getVariable("remote_user", "x"), r.getVariable("time_local"), r.getVariable("request"),
                    r.getVariable("status"), r.getVariable("body_bytes_sent"), r.getVariable("http_referer", "x"),
                    logUserAgent ? r.getVariable("http_user_agent") : "-");
            out.write(msg.getBytes("utf8"));
        }
        return null;
    }

    @Override
    public void config(Map<String, String> properties) {
        logUserAgent = "on".equalsIgnoreCase(properties.get("logUserAgent"));
    }

    @Override
    public String[] variablesNeedPrefetch() {
        return new String[] { "remote_addr", "remote_user", "time_local", "request", "status", "body_bytes_sent",
                "http_referer", "http_user_agent" };
    }
}

相关问题