流处理是指在数据不断流入和输出的过程中对这些数据进行分析或查询。换句话说,流处理通常称为实时分析,因为它在数据移动、创建和接收后立即处理。在使用字符串处理(如下图)之前,它主要采用批处理形式,例如将数据存储在大容量存储(如 database 或文件系统)中,然后根据需要应用程序执行查询或分析。
使用流处理之前
但是,随着大数据的繁荣和流处理的出现,这种数据处理模式发生了许多变化,传统的批处理管道能够实时、持续地处理和分析数据。(不是不使用批处理,而是具有流处理系统和批处理系统,提高了实时性和准确性)
在流处理中,当流中事件发生时,流处理应用程序会立即响应这些事件,并以各种形式处理这些事件,例如统计或存储这些数据以供将来使用。
因此,流处理可以实时分析数据,并处理以事件为中心的数据,从而轻松解决开发人员今天需要关注大数据处理的许多问题。流处理的章节如下所示:
流处理使应用程序和分析立即响应。"事件"->"分析"->"行动"的过程几乎没有延迟。因此,分析和操作始终反映有意义的数据。
由于流处理不是在存储数据后进行分析,因此通常比其他静态数据处理系统处理更大的数据容量。
流处理针对实时处理进行了优化,因为它不断地分析传入数据,与定期计算所有数据的布局和静态数据分析形成鲜明对比。
流处理可以减少对大型共享数据库的依赖,因为应用程序与自己的数据保持状态。这种方法对 msa 方法友好。
处理流时,需要引用处理旧流的结果。这种处理方法称为基于状态的处理。要进行基于状态的流处理,需要状态存储来存储应用程序的处理结果。当流处理应用程序管理此存储时,它称为内部状态存储,当使用单独的状态存储(如数据库)时,它称为外部状态存储。
相反,无状态流处理仅基于当前到达应用程序的流,而不考虑以前流的处理结果。
kafka streams 是一个流处理,与 kafka 0.10.0.0 版本一起发布,其主要功能包括:
※ 2016年3月10日,confluent(由最初在linkedin上开发apache kafka的公司)发布的帖子
introducing kafka streams: stream processing made simple - confluent
confluent, founded by the creators of apache kafka, delivers a complete execution of kafka for the enterprise, to help you run your business in real time.
www.confluent.io
卡夫卡流是一个客户端库,用于处理和分析存储在卡夫卡的数据。其他常见的字符串处理是"执行框架",而 Kafka 流是库,因此由用户手动驱动,完全由开发人员决定是否将其装载到特定框架中。这意味着您可以轻松地在现有应用程序中添加和使用它们。
除了 apache kafka 之外,没有对外部 dependencies 的依赖。使用卡夫卡分区模型,并可水平扩展。同时,它有力地保证消息的顺序。
它支持 fault-tolerant 本地状态存储,可实现快速、高效的存储操作。
在流处理期间,即使客户端或 kafka broker 出现故障,它也会只处理 1 次流。
一次只处理一条记录,以确保处理延迟(以 millisecond 为单位)。
支持级别的流处理并行并行(域),以及低级处理 api。
Kafka 流是一个 api,它创建和处理流处理过程,从而创建和处理拓扑。因此,在常规进程节点上处理当前记录时,其他远程系统也可以访问它。处理的结果可以流式传输回 kafka 或写入外部系统。
kafka stream topology 中使用的术语
stream :流是卡夫卡流提供的最重要抽象。它意味着使用卡夫卡流 api 不断传递到生成的拓扑的数据集。写入流的单位以键值形式显示。
流处理应用程序 :表示使用 Kafka 流客户端的应用程序处理一个或多个拓扑逻辑。
流处理器 :表示构成处理器拓扑的一个节点。在这里,节点的作用是接收和转换由处理器几何连接的一个输入流中的数据,然后发送到重新连接的处理器。
source processor :源处理器是指没有向上连接的处理器的处理器。此处理器从一个或多个卡夫卡主题中读取数据记录,并传递给子处理器。
sink processor :表示拓扑底部没有处理器。此处理器将从父处理器接收的数据记录存储在卡夫卡特定主题中。
卡夫卡流提供两种方法:源处理器和 sink processor。其中一个方法提供了处理数据的方法,如运算符映射、筛选器、join 和 aggregations,这些方法在处理数据时使用 kafka stream dsl 处理数据,另一种是低电平处理器 api,允许开发人员定义/连接自定义处理器并处理状态存储。
过程学是流处理代码的抽象概念。实际上,在运行时,拓扑将在应用程序中实例化、复制和并行处理。
然后,让我们编写一个代码,将来自名为 input-stream 的主题的消息实时移动到 output-stream 主题中。
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class Pipe {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-stream").to("output-stream");
final Topology topology = builder.build();
System.out.println(topology.describe());
final KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
stream.close();
}
}
首先,让我们使用 java.util.properties 输入进程所需的设置值。
streamsconfig.application_id_config:它应是唯一用于区分卡夫卡群集中的流应用程序的值。
streamsconfig.bootstrap_servers_config :输入流应用程序将访问的代理的 ip 和 port。
streamsconfig.default_key_serde_class_config :指定主题将涵盖的数据的"key"格式。
streamsconfig.default_value_serde_class_config :指定主题将涵盖的数据的"value"格式。
完成设置后,将生成一个拓扑,该拓扑指定从输入源执行的操作。拓扑使用 streamsbuilder,它首先创建 streamsbuilder,然后设置为从 input-stream 主题生成输入流并将其传递到 output-stream 主题。最后,生成器调用 build() 方法以创建拓扑。(要验证创建的拓扑,可以调用 .describe() 方法。
现在,通过使用您创建的拓扑对象和属性对象创建流对象,可以创建实际操作的 Kafka 流应用程序。然后,我们将调用 start() 方法以启动流操作,并调用 close() 方法以结束它。
刚才的例子是管道过程,它将数据从一个主题移动到另一个主题。这一次,让我们编写一个代码,将从一个主题读取的数据分离到一个空白中,并将其作为值存储在另一个主题中。您有两种主要使用 kstream 方法的方法:
flatmap :创建新流时,可以创建新的键和值。
flatmapvalues :创建新流时,不能更改键,只能更改值。
上述两种方法都称为无状态操作器,因为它们不参考以前的流状态。
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Arrays;
import java.util.Properties;
public class LineSplit {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("streams-plaintext-input");
source.flatMapValues(value -> Arrays.asList(value.split("\\W+"))).to("streams-linesplit-output");
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
}
}
在 source 流中使用 flatmapvalues 方法创建新流,而传递给 flatmapvallues 方法的 valuemapper 应用于源流中的每个数据,以创建新值。
内容来源于网络,如有侵权,请联系作者删除!