如何在 Flink 中实现数据的实时计算?

Flink实时计算

一、Flink实时计算介绍

Flink 是一个基于分布式计算技术的开源流式计算平台,具备部分容错能力,可以处理实时分析和事件驱动应用程序。Flink 能将定时分析任务转为实时模式,用于对无限流数据进行实时计算。

Flink有一个自动优化的优势,它可以在运行时自动根据应用程序的性能、可用系统资源,以及数据特点做出合理的优化;

Flink支持容错机制,当有节点发生失效,它可以将任务在健康的节点上重新调度,完成任务;

Flink支持实时处理,可以在接收到数据时立即进行处理,而不是等待批处理任务,可以提供更好的响应性能;

Flink 处理实时流数据要以流形式从源端(Source)读取数据入口,流经算子(Operator)、调整(Transform),经过流处理(Process)出口,最后以 sink的形式将数据写出。基于Flink的实时流处理示例如下:

以Kafka为源,Flink可以以实时的方式消费Kafka数据,leftJoin新旧数据做实时的合并:

// 连接kafka val zookeeper: String = “localhost:2181” val groupId: String = “group_flink” val topic: String = “input_topic” val kafkaDataStream: DataStream[String] = env .addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), getKafkaProperties(zookeeper,groupId)))

//LeftJoin val output:ConnectedStreams[Order,Order] = kafkaDataStream .keyBy(“productId”) .connect(order_stream_old.keyBy(“productId”)) .process(new CoProcessFunction[Order,Order,Order] { // leftJoin连接处理逻辑
})

针对实时的数据,Flink可以智能的对数据流有上下文的处理,做实时的统计:

// 聚合求平均 val avg: DataStream[(String, Double)] = output .map(x => (x.product, x.newPrice.toDouble)) .keyBy(0) .timeWindow(Time.seconds(60 * 60 * 24 * 1)) // 一天窗口 .reduce[(String, Double)] ((x, y) => (x._1, (x._2 + y._2) / 2) )

最后以常用的数据库或者HDFS 为例做实时数据落盘:

// 落盘HDFS avg.writeAsText(outputPath,WriteMode.OVERWRITE) .setParallelism(1)

// 落盘数据库 output.map(x =>(x.productId,x.amount,x.timestamp)) .addSink(new MysqlSink()) .setParallelism(1)

添加sink后Flink会自动将数据流落到HDFS或者数据库中,实现完整的实时数据处理逻辑。

随机文章