![深入理解Flink:实时大数据处理实践](https://wfqqreader-1252317822.image.myqcloud.com/cover/828/25449828/b_25449828.jpg)
2.2 让轮子转起来
2.2.1 本书约定
(1)本书中的例子以Scala语言的编程进行讲解,Flink的API也只讲述Scala语言形式。本书中的例子不会过多地运用 Scala 编程技巧,因此读者只需要具备基本的Scala语言知识即可。
(2)例子的开发环境为Java 8(1.8.0_73)、Maven(3.0.4)、SBT(1.2.6)和Scala(IDEA Scala plugin 2.11.12)。
(3)IDE选用IntelliJ IDEA,使用社区版(Community Edition 2018.2.6 x64)。由于存在版本兼容性问题,作者不推荐使用Eclipse。
(4)开发环境为Windows 7,Flink部署环境为Linux(CentOS)。
(5)Flink的版本为1.6.1。
2.2.2 搭建单机版环境
1.搭建一个单机版的运行环境
(1)下载不带Hadoop组件的Flink程序包:flink-1.6.1-bin-scala_2.11.tgz。
(2)部署在Linux服务器上,然后启动单机版Flink:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/62_1.jpg?sign=1738861408-t14fUYfIhRr8b1C0hxF1y1p6oJVbxGMa-0-1945b6ae1f451ccf37a9fa2ddb7dd7ac)
为了访问方便,设置开发环境机器hosts文件,以以下域名映射Linux服务器IP地址:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/62_2.jpg?sign=1738861408-1d4w1Qk4MO1vwGv4azeecOzHtxQbLQ76-0-83312b3ce4d7dfb37062aa0fbfd50e23)
(3)启动成功后,在浏览器地址栏中输入以下地址,访问 Flink 的 Web Dashboard:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/62_3.jpg?sign=1738861408-z80Dzs2b7zbfqczGLO4NZV9BDt4of3O4-0-b9f3e0fdc4bf854bb5f7d5e96efbab96)
Web Dashboard展示当前Job Manager和Task Manager的状态,如图2-1所示。
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/62_4.jpg?sign=1738861408-z4aQAnalavq5EZ6aoRy1kmzBAaep0R29-0-30e441417930cdb092f1b1a2fa2c0c98)
图2-1 Flink的Web Dashboard
2.运行SocketWindowWordCount程序
(1)启动一个端口号为9000的Socket server:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/63_1.jpg?sign=1738861408-xvueSNKP1sLZlDYNrFh47aIeCptuCvfp-0-a025bcbb1a35026dc462fd3ef432fd8e)
(2)运行SocketWindowWordCount应用程序:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/63_2.jpg?sign=1738861408-dFrP1jhz728TSfiwSulZPpjzTx17xmDK-0-9b9ebcc4af3ef6987159a6a7586e5706)
(3)在Socket server端手动输入单词,如果一行有多个单词,就在两个单词之间输入空格。输入及对应的聚合结果如图2-2所示。
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/63_3.jpg?sign=1738861408-krGSMZny3GNYN7pH0WDZrWsNRK4Qudgs-0-a73c384af5ddd43150505d204bec8f70)
图2-2 输入及对应的聚合结果
图2-2中同一种颜色的输入和输出是对应的,其中“:1”是Socket server端换行的聚合结果。
SocketWindowWordCount 应用程序根据处理时间开滚动窗口,每秒计算一次窗口接收单词的次数,代码如下:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/63_4.jpg?sign=1738861408-2sztD6gf2XytsYj9uNnzwSrjJoAnk7dD-0-96f1c773b18df60c24a75e38e99bed75)
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/64_1.jpg?sign=1738861408-yDU47npZ5J3mcfqCz2vU4mQhqAFvnFWc-0-746c0992922820c2cff2217305572902)
以上代码从socket(9000端口)按行读入字符,切割成单词(w=> w.split ("\\s"))后转换成 case 对象(WordWithCount),该对象有两个属性,其中 String类型属性代表单词本身;Long类型属性代表单词出现的次数。
其中 timeWindow 为开窗机制,如果应用程序的时间特征为事件时间,则开长度为5秒的事件时间窗口,否则开长度为1秒的处理时间窗口。Flink流处理环境(StreamExecutionEnvironment)默认的时间特征为处理时间,因此本例中的开窗机制为长度为1秒的处理时间窗口。
2.2.3 配置IDEA
使用 Maven从 Flink官网下载应用程序工程模板。为了避免输入错误,我们设置 Maven为 Batch模式,在命令行中设定 groupId、artifactId和 version,需要注意版本号的值用双引号包裹起来,代码如下:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/65_1.jpg?sign=1738861408-MffDdn9Mqhe0EJFuD9KE0n8ak2zp6JbH-0-f2970470fe6b5f359021522fea857eb6)
应用程序模板Maven的构建过程,如图2-3所示。
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/65_2.jpg?sign=1738861408-E7PXpNHUitAL0ll8KxqaK6HR8sPrDT6r-0-ecff1eedae933a30932a0fcef0c7a4c2)
图2-3 应用程序模板Maven的构建过程
然后,将下载的应用程序工程导入 IDEA。该工程有两个样例程序,分别为批处理应用程序(BatchJob)和流处理应用程序(StreamingJob)。该工程的pom.xml文件的主要内容如下:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/66_1.jpg?sign=1738861408-47ReLt4k9KNogYaJnAg2HmjMhqPCuSUl-0-becb76d3e716951c2562b82e95841d3f)
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/67_1.jpg?sign=1738861408-UNV2WSCXQdkNPdhjpYjqBz2nr0IihnpF-0-e9cd22d64b87edac2f45d46cbc484306)
Flink应用程序模板如图2-4所示。
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/67_2.jpg?sign=1738861408-dNgehcOsdgZAE8Lku0qDBNzMkthKtKoF-0-a26c25a952cf41dc9269afc1b8c632d5)
图2-4 Flink应用程序模板
此外,为了让开发工具自动检查代码规范,IDEA 开启了 Scala 语言对应的Checkstyle功能。