如何在 Flink 中实现数据的实时计算?
Flink实时计算
一、Flink实时计算介绍
Flink 是一个基于分布式计算技术的开源流式计算平台,具备部分容错能力,可以处理实时分析和事件驱动应用程序。Flink 能将定时分析任务转为实时模式,用于对无限流数据进行实时计算。
1 Flink实时计算的特性
1.1 自动优化
Flink有一个自动优化的优势,它可以在运行时自动根据应用程序的性能、可用系统资源,以及数据特点做出合理的优化;
1.2 数据容错
Flink支持容错机制,当有节点发生失效,它可以将任务在健康的节点上重新调度,完成任务;
1.3 实时处理
Flink支持实时处理,可以在接收到数据时立即进行处理,而不是等待批处理任务,可以提供更好的响应性能;
2 实时计算示例
Flink 处理实时流数据要以流形式从源端(Source)读取数据入口,流经算子(Operator)、调整(Transform),经过流处理(Process)出口,最后以 sink的形式将数据写出。基于Flink的实时流处理示例如下:
2.1 实时数据读取
以Kafka为源,Flink可以以实时的方式消费Kafka数据,leftJoin新旧数据做实时的合并:
// 连接kafka val zookeeper: String = “localhost:2181” val groupId: String = “group_flink” val topic: String = “input_topic” val kafkaDataStream: DataStream[String] = env .addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), getKafkaProperties(zookeeper,groupId)))
//LeftJoin
val output:ConnectedStreams[Order,Order] = kafkaDataStream
.keyBy(“productId”)
.connect(order_stream_old.keyBy(“productId”))
.process(new CoProcessFunction[Order,Order,Order] {
// leftJoin连接处理逻辑
})
2.2 实时计算
针对实时的数据,Flink可以智能的对数据流有上下文的处理,做实时的统计:
// 聚合求平均 val avg: DataStream[(String, Double)] = output .map(x => (x.product, x.newPrice.toDouble)) .keyBy(0) .timeWindow(Time.seconds(60 * 60 * 24 * 1)) // 一天窗口 .reduce[(String, Double)] ((x, y) => (x._1, (x._2 + y._2) / 2) )
2.3 实时数据落盘
最后以常用的数据库或者HDFS 为例做实时数据落盘:
// 落盘HDFS avg.writeAsText(outputPath,WriteMode.OVERWRITE) .setParallelism(1)
// 落盘数据库 output.map(x =>(x.productId,x.amount,x.timestamp)) .addSink(new MysqlSink()) .setParallelism(1)
添加sink后Flink会自动将数据流落到HDFS或者数据库中,实现完整的实时数据处理逻辑。