![深入理解Flink:实时大数据处理实践](https://wfqqreader-1252317822.image.myqcloud.com/cover/828/25449828/b_25449828.jpg)
2.3 编程模型
2.3.1 分层组件栈
Flink的组件分为4层,各个模块之间的层次关系如图2-5所示。
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/68_1.jpg?sign=1739296005-yAftSferhNZhtQKjOKq2R9VKlpcA4fP3-0-a06bb4c3e89827baef7af59a475d057b)
图2-5 Flink各个模块之间的层次关系
(1)Deploy层:Flink支持多种部署模式,如本地(Local)单机版、Standalone集群、YARN集群及云(Cloud)部署模式。
(2)Core 层:本层是 Flink 分布式数据处理引擎的核心实现层,包括计算图的所有底层实现,例如时间与窗口机制、一致性语义、任务管理与调度、物理执行计划。应用程序通常不需要调用本层 API,而是调用流处理 API、批处理API或构建在这两层API基础之上的Library API。
(3)API层:该层包括流处理API和批处理API,Flink的批处理是建立在流式架构上的,而不是用批处理模拟流处理,这种技术基因决定了 Flink 更适用于流处理的场合。
(4)Library层:该层是Flink的应用框架层,构建在流处理API和批处理API之上,因此同一应用框架库有两种版本选择,如流式关系型 API(Table/SQL)。此外,本层还包括CEP、FlinkML和Gelly。
2.3.2 流式计算模型
一个典型的流处理应用程序(命名为Programm 2.1)如下:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/69_1.jpg?sign=1739296005-mxxTSZfAd9amqYZaDJkRvt1C1FklKhPz-0-f0c7f9aff5c433cec09d25cf5ed788f3)
这段程序的逻辑计算图形式如图2-6所示。
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/69_2.jpg?sign=1739296005-EM9cTinghlKJfnF9P0nMgu3R7sYlkCcU-0-a251d9544b20a71988ee788ef0acf39c)
图2-6 Programm 2.1的逻辑计算图形式
图 2-6 中 Stream 为传输通道中的数据,Operator 为计算图的节点
,Streaming Dataflow为计算图
。
计算图的物理形式由计算节点的多个并行实例组成,其中并行实例的含义是:在分布式环境中,同一计算节点有多个功能相同的物理部署实例,如图2-7中逻辑形式中的map()节点会有两个部署实例map()[1]和map()[2]。
在并行模式时:
(1)每个Operator的实例数为并行度,任意两个Operator的并行度之间是独立的。例如,图2-7中Source Operator的并行度为2,而Sink Operator的并行度为1;每个Operator称为一个任务,Operator的每个实例称为子任务(subtask),子任务这个概念来自其和JVM线程之间的关系。
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/70_1.jpg?sign=1739296005-SJjoMFx0ObzwwzbulnvqrLtoriaGkKi2-0-9de5da7205655a2778a596c16d347612)
图2-7 Programm 2.1的物理计算图形式
(2)Stream有一个或多个分区(partition)。Stream有两种模式:
● 直连(One-to-One)模式,即一个实例的输出是另一个实例的输入。在Programm 2.1 的物理计算图形式中,Source 的 subtask[1](即 Source[1])和map的subtask[1](即map [1])直接相连,Source[1]的输出全部传输给map [1],没有被拆分成多个分区。
● 分区(Redistribution)模式,即一个实例的输出被拆分成多个部分传输给多个下级实例。在Programm 2.1的物理计算图形式中,map [1]被拆分成两部分,分别输入给不同的下级实例。
2.3.3 流处理编程
1.DataStream与DataSet
Flink用DataStream表示无界数据集,用DataSet表示有界数据集,前者用于流处理应用程序,后者用于批处理程序。根据所处理事件数据结构类型的不同,应用程序可以定义不同类型的 DataStream对象和 DataSet对象。以下程序定义事件类型为String的DataStream对象和事件类型为LabeledVector(带标签的训练样本,每个样本用向量表示)的DataSet对象:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/71_1.jpg?sign=1739296005-IwtvhhaEEdghCltVhwfomDJBjiaJphPe-0-f99f1155c0559aabbef009554c83838d)
从操作形式上看,DataStream 和 DataSet 与集合(Collection)有些相似,但是两者有着本质不同:
(1)DataStream 和 DataSet 是不可变的数据集合,因此不可以像操纵集合那样增加或删除 DataStream和 DataSet中的元素,也不可以通过诸如下标等方式访问某个元素。这里重申之前定义的概念,事件、元素、数据等都是用于指代流处理或批处理所处理的数据对象的,具体使用哪个称呼依赖语境。
(2)Flink应用程序通过Source创建DataStream对象和DataSet对象,通过转换操作产生新的DataStream对象和DataSet对象。
2.程序结构
Flink按照数据处理流程编写应用程序,共分为5个步骤。
1)获取运行时
运行时分为两类:StreamingExecutionEnvironment和ExecutionEnvironment,分别对应流处理和批处理程序:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/72_1.jpg?sign=1739296005-3KshF81L1NReL9Kbklzp06BE51XxZ8Cg-0-d795411181af00cbdfa35f4c34cff4a8)
运行时是应用程序被调度执行时的上下文环境,上述方法根据当前环境自动选择本地或集群运行时环境。以流处理为例,创建方法如下:
(1)通过createLocalEnvironment方法创建运行时,基于这种运行时的应用程序会运行在同一个 JVM 进程中,本地调试时通常采用这种运行时。createLocalEnvironment有三种接口形式:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/72_2.jpg?sign=1739296005-sT3itCGoxvfAiWVjayPguIUEMlB2uNBN-0-2713b064fdd6b7cb5e824da508db48f2)
从上面的接口可以看出,通过 createLocalEnvironment 方法创建的运行仍是StreamingExecutionEnvironment。
(2)通过 createRemoteEnvironment 创建运行时,基于这种运行时的应用程序会被提交到集群中运行,连接集群调试通常用这种运行时。createRemoteEnvironment有两种接口形式:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/72_3.jpg?sign=1739296005-Ey6wBhdrQdl5n8RWgfzF1C2jJe22f1Ou-0-42a05c39b759b5fbefbc983c5d66ddcd)
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/73_1.jpg?sign=1739296005-IB5J4GmJbjJOlCme2z8Q6es8m0B8mx4F-0-efdad6560d58163e8832fc2d42347613)
2)添加外部数据源
可以添加外部数据源,如 Kafka和文件,也可以由应用创建 DataStream或DataSet,后一种方法常用于测试环境。
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/73_2.jpg?sign=1739296005-sRe1XoYpAaflb1whgCvNKZxTKsgw6mbB-0-32c62d4e9c18867b78f76726147e2c7a)
3)定义算子转换函数
下面的代码将input元素值转换成整型,转换后得到DataStream[Int]:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/74_1.jpg?sign=1739296005-O42RK79V9Xwa7G0eesW7UI9iImhjy1ez-0-42d68ccc1eb018941acb41132ab85991)
4)定义Sink
Sink的功能是将数据处理结果写入外部系统:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/74_2.jpg?sign=1739296005-hBMjDZYhD0mAw5aS10ETSprtfUdc0T2H-0-c18d5f5635a2160467ae51fecc7f4f53)
除了上述两种常用的Sink,应用程序还可以将处理结果写入Kafka:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/74_3.jpg?sign=1739296005-BaUVZew0ohlmEM1m9HRXRUaVsqaM56TR-0-8531c42cb11091feb2b6fc41c4989e06)
5)启动程序
调用运行时的execute()方法:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/74_4.jpg?sign=1739296005-DE7lmaI8yzvfKwTJiDSYrzLokO2TojoB-0-b7b44124e8d886b56f7ebe7043375f42)
3.指定键(key)
可以通过Scala Case类(或Java元组)的位置索引、对象属性名称、key的选择器(selector)三种方式指定key,定义如下:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/74_5.jpg?sign=1739296005-Fb65Cd5i2tV8Qfp6jnX9ekem7JzUH63Z-0-683799661b300994bb56158ae0f9ef96)
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/75_1.jpg?sign=1739296005-nyj4HGemZtK3axczBubJwge55trlK1FG-0-028bf2ac879e3b2fbfce9e76ffd3736e)
4.并行度设置
有4种设置Flink并行度的方式。
(1)通过紧跟在Operator之后的setParallelism方法设置并行度,这种并行度只影响对应的Operator:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/75_2.jpg?sign=1739296005-vCsrKCfLTNUp1tpLkXBAKPmBRcvubwgc-0-5b4ee52fa314b964bc8412a134ebf7e5)
(2)通过运行时设置作业级并行度:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/75_3.jpg?sign=1739296005-GTsaRmvYaqIGrPLZCy9kjd18vfVX0nEr-0-4f6dd9cbc15e1719499b9bd425dba2e5)
(3)通过客户端设置并行度,这种并行度也是作业级的:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/75_4.jpg?sign=1739296005-YrmauGV2HH1lVP6Ho4vcLuRudnxIeXxX-0-55982ecc6c98a6a6d4d9c820a99361eb)
(4)通过 Flink 的配置文件设置系统级并行度,这种并行度对集群上的所有作业都起作用:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/76_1.jpg?sign=1739296005-BwZTpvyuFMONlclRbtscvPtOXKH0mT8E-0-0cb2b139f23e097ea1f91206d5f55066)