Apache Flink 零基础入门(3):开发环境搭建和应用的配置、部署及运行¶
1 Flink 开发环境部署和配置¶
Flink
是一个以 Java
及 Scala
作为开发语言的开源大数据项目,代码开源在 GitHub
上,并使用 Maven
来编译和构建项目。对于大部分使用 Flink
的同学来说, Java
、 Maven
和 Git
这三个工具是必不可少的,另外一个强大的 IDE
有助于我们更快的阅读代码、开发新功能以及修复 Bug
。因为篇幅所限,我们不会详述每个工具的安装细节,但会给出必要的安装建议。
关于开发测试环境, Mac OS
、 Linux
系统或者 Windows
都可以。如果使用的是 Windows 10
系统,建议使用 Windows 10
系统的 Linux
子系统来编译和运行。
建议选用社区已发布的稳定分支,比如 Release-1.6
或者 Release-1.7
。
1.1 编译 Flink 代码¶
在我们配置好之前的几个工具后,编译 Flink
就非常简单了,执行如下命令即可:
Bash | |
---|---|
1 2 3 |
|
常用编译参数:
Bash | |
---|---|
1 2 3 |
|
当成功编译完成后,能在当前 Flink
代码目录下的 flink-dist/target/子目录
中看到如下文件(不同的 Flink
代码分支编译出的版本号不同,这里的版本号是 Flink 1.5.1
):
其中有三个文件可以留意一下:
注意:国内用户在编译时可能遇到编译失败 Build Failure
(且有 MapR
相关报错),一般都和 MapR
相关依赖的下载失败有关,即使使用了推荐的 settings.xml
配置(其中 Aliyun Maven
源专门为 MapR
相关依赖做了代理),还是可能出现下载失败的情况。问题主要和 MapR
的 Jar
包比较大有关。遇到这些问题时,重试即可。在重试之前,要先根据失败信息删除 Maven local repository
中对应的目录,否则需要等待 Maven
下载的超时时间才能再次出发下载依赖到本地。
1.2 开发环境准备¶
推荐使用 IntelliJ IDEA IDE
作为 Flink
的 IDE
工具。官方不建议使用 Eclipse IDE
,主要原因是 Eclipse
的 Scala IDE
和 Flink
用 Scala
的不兼容。如果你需要做一些 Flink
代码的开发工作,则需要根据 Flink
代码的 tools/maven/目录
下的配置文件来配置 Checkstyle
,因为 Flink
在编译时会强制代码风格的检查,如果代码风格不符合规范,可能会直接编译失败。
2 运行 Flink 应用¶
2.1 基本概念¶
运行 Flink
应用其实非常简单,但是在运行 Flink
应用之前,还是有必要了解 Flink
运行时的各个组件,因为这涉及到 Flink
应用的配置问题。 图 1
所示,这是用户用 DataStream API
写的一个数据处理程序。可以看到,在一个 DAG
图中不能被 Chain
在一起的 Operator
会被分隔到不同的 Task
中,也就是说 Task
是 Flink
中资源调度的最小单位。
如下图 图 2
所示, Flink
实际运行时包括两类进程:
-
JobManager
(又称为JobMaster
):协调Task
的分布式执行,包括调度Task
、协调创Checkpoint
以及当Job failover
时协调各个Task
从Checkpoint
恢复等。 -
TaskManager
(又称为Worker
):执行Dataflow
中的Tasks
,包括内存Buffer
的分配、Data Stream
的传递等。
图 3
所示, Task Slot
是一个 TaskManager
中的最小资源分配单位,一个 TaskManager
中有多少个 Task Slot
就意味着能支持多少并发的 Task
处理。需要注意的是,一个 Task Slot
中可以执行多个 Operator
,一般这些 Operator
是能被 Chain
在一起处理的。
2.2 运行环境准备¶
-
准备
Flink binary
-
直接从
Flink
官网上下载Flink binary
的压缩包 -
或者从
Flink
源码编译而来 -
安装
Java
,并配置JAVA_HOME
环境变量
2.3 单机 Standalone 的方式运行 Flink¶
2.3.1 基本的启动流程¶
最简单的运行 Flink
应用的方法就是以单机 Standalone
的方式运行。
启动集群:
Bash | |
---|---|
1 |
|
打开 http://127.0.0.1:8081/ 就能看到 Flink
的 Web
界面。尝试提交 Word Count
任务:
Bash | |
---|---|
1 |
|
大家可以自行探索 Web
界面中展示的信息,比如,我们可以看看 TaskManager
的 stdout
日志,就可以看到 Word Count
的计算结果。我们还可以尝试通过 -input
参数指定我们自己的本地文件作为输入,然后执行:
Bash | |
---|---|
1 |
|
停止集群:
Bash | |
---|---|
1 |
|
2.3.2 常用配置介绍¶
-
这种方式太过底层,不够直观,我们还需要在此之上实现可视化展示;
-
系统表只记录了
CH
自己的运行指标,有些时候我们需要外部系统的指标进行关联分析,例如ZooKeeper
、服务器CPU
、IO
等等。 -
conf / slaves
conf / slaves
用于配置TaskManager
的部署,默认配置下只会启动一个TaskManager
进程,如果想增加一个TaskManager
进程的,只需要文件中追加一行localhost
。也可以直接通过
./bin/taskmanager.sh start
这个命令来追加一个新的TaskManager
:Bash 1
./bin/taskmanager.sh start|start-foreground|stop|stop-all
-
conf/flink-conf.yaml
conf/flink-conf.yaml
用于配置JM
和TM
的运行参数,常用配置有:Bash 1 2 3 4 5 6 7
## The heap size for the JobManager JVM jobmanager.heap.mb: 1024 ## The heap size for the TaskManager JVM taskmanager.heap.mb: 1024 ## The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. taskmanager.numberOfTaskSlots: 4 ## the managed memory size for each task manager.
Standalone
集群启动后,我们可以尝试分析一下 Flink
相关进程的运行情况。执行 jps
命令,可以看到 Flink
相关的进程主要有两个,一个是 JobManager
进程,另一个是 TaskManager
进程。我们可以进一步用 ps
命令看看进程的启动参数中 -Xmx
和 -Xms
的配置。然后我们可以尝试修改 flink-conf.yaml
中若干配置,然后重启 Standalone
集群看看发生了什么变化。
需要补充的是,在 Blink
开源分支上, TaskManager
的内存计算上相对于现在的社区版本要更精细化, TaskManager
进程的堆内存限制( -Xmx
)一般的计算方法是:
Bash | |
---|---|
1 |
|
而最新的 Flink
社区版本 Release-1.7
中 JobManager
和 TaskManager
默认内存配置方式为:
Bash | |
---|---|
1 2 3 4 |
|
Flink
社区 Release-1.7
版本中的 taskmanager.heap.size
配置实际上指的不是 Java heap
的内存限制,而是 TaskManager
进程总的内存限制。我们可以同样用上述方法查看 Release-1.7
版本的 Flink binary
启动的 TaskManager
进程的 -Xmx
配置,会发现实际进程上的 -Xmx
要小于配置的 taskmanager.heap.size
的值,原因在于从中扣除了 Network buffer
用的内存,因为 Network buffer
用的内存一定是 Direct memory
,所以不应该算在堆内存限制中。
2.3.3 日志的查看和配置¶
JobManager
和 TaskManager
的启动日志可以在 Flink binary
目录下的 Log
子目录中找到。 Log
目录中以 flink-{id}-${hostname}
为前缀的文件对应的是 JobManager
的输出,其中有三个文件:
-
flink-${user}-standalonesession-${id}-${hostname}.log
:代码中的日志输出 -
flink-${user}-standalonesession-${id}-${hostname}.out
:进程执行时的stdout
输出 -
flink-${user}-standalonesession-${id}-${hostname}-gc.log
:JVM
的GC
的日志
Log
目录中以 flink-{id}-${hostname}
为前缀的文件对应的是 TaskManager
的输出,也包括三个文件,和 JobManager
的输出一致。
日志的配置文件在 Flink binary
目录的 conf
子目录下,其中:
-
log4j-cli.properties
:用Flink
命令行时用的log
配置,比如执行flink run
命令 -
log4j-yarn-session.properties
:用yarn-session.sh
启动时命令行执行时用的log
配置 -
log4j.properties
:无论是Standalone
还是Yarn
模式,JobManager
和TaskManager
上用的log
配置都是log4j.properties
这三个 log4j.*properties
文件分别有三个 logback.*xml
文件与之对应,如果想使用 Logback
的同学,之需要把与之对应的 log4j.*properties
文件删掉即可,对应关系如下:
-
log4j-cli.properties -> logback-console.xml
-
log4j-yarn-session.properties -> logback-yarn.xml
-
log4j.properties -> logback.xml
需要注意的是, flink-{id}-和{user}-taskexecutor-{hostname}
都带有 {id}
表示本进程在本机上该角色( JobManager
或 TaskManager
)的所有进程中的启动顺序,默认从 0
开始。
2.3.4 进一步探索¶
尝试重复执行 ./bin/start-cluster.sh
命令,然后看看 Web
页面(或者执行 jps
命令),看看会发生什么?可以尝试看看启动脚本,分析一下原因。接着可以重复执行 ./bin/stop-cluster.sh
,每次执行完后,看看会发生什么。
2.4 多机部署 Flink Standalone 集群¶
部署前要注意的要点:
-
每台机器上配置好
Java
以及JAVA_HOME
环境变量 -
每台机器上部署的
Flink binary
的目录要保证是同一个目录 -
如果需要用
HDFS
,需要配置HADOOP_CONF_DIR
环境变量配置
根据你的集群信息修改 conf/masters
和 conf/slaves
配置。
修改 conf/flink-conf.yaml
配置,注意要确保和 Masters
文件中的地址一致:
Bash | |
---|---|
1 |
|
确保所有机器的 Flink binary
目录中 conf
中的配置文件相同,特别是以下三个:
Bash | |
---|---|
1 2 3 |
|
然后启动 Flink
集群:
Bash | |
---|---|
1 |
|
提交 WordCount
作业:
Bash | |
---|---|
1 |
|
上传 WordCount
的 Input
文件:
Bash | |
---|---|
1 |
|
提交读写 HDFS
的 WordCount
作业:
Bash | |
---|---|
1 |
|
增加 WordCount
作业的并发度(注意输出文件重名会提交失败):
Bash | |
---|---|
1 |
|
2.5 Standalone 模式的 HighAvailability(HA)部署和配置¶
通过 图 2 Flink Runtime 架构图
,我们可以看到 JobManager
是整个系统中最可能导致系统不可用的角色。如果一个 TaskManager
挂了,在资源足够的情况下,只需要把相关 Task
调度到其他空闲 TaskSlot
上,然后 Job
从 Checkpoint
中恢复即可。而如果当前集群中只配置了一个 JobManager
,则一旦 JobManager
挂了,就必须等待这个 JobManager
重新恢复,如果恢复时间过长,就可能导致整个 Job
失败。
因此如果在生产业务使用 Standalone
模式,则需要部署配置 HighAvailability
,这样同时可以有多个 JobManager
待命,从而使得 JobManager
能够持续服务。
注意:
-
如果想使用
Flink standalone HA
模式,需要确保基于Flink Release-1.6.1
及以上版本,因为这里社区有个bug
会导致这个模式下主JobManager
不能正常工作。 -
接下来的实验中需要用到
HDFS
,所以需要下载带有Hadoop
支持的Flink Binary
包。
2.5.1 (可选)使用 Flink 自带的脚本部署 ZK¶
Flink
目前支持基于 Zookeeper
的 HA
。如果你的集群中没有部署 ZK
, Flink
提供了启动 Zookeeper
集群的脚本。首先修改配置文件 conf/zoo.cfg
,根据你要部署的 Zookeeper Server
的机器数来配置 server.X=addressX:peerPort:leaderPort
,其中 X
是一个 Zookeeper Server
的唯一 ID
,且必须是数字。
Bash | |
---|---|
1 2 3 4 5 |
|
然后启动 Zookeeper
:
Bash | |
---|---|
1 |
|
jps
命令看到 ZK
进程已经启动:
停掉 Zookeeper
集群的命令:
Bash | |
---|---|
1 |
|
2.5.2 修改 Flink Standalone 集群的配置¶
修改 conf/masters
文件,增加一个 JobManager
:
Bash | |
---|---|
1 2 3 |
|
之前修改过的 conf/slaves
文件保持不变:
Bash | |
---|---|
1 2 3 4 |
|
修改 conf/flink-conf.yaml
文件:
Bash | |
---|---|
1 2 3 4 5 6 7 8 9 10 |
|
需要注意的是,在 HA
模式下 conf/flink-conf.yaml
中的这两个配置都失效了(想想为什么)。
Bash | |
---|---|
1 |
|
修改完成后,确保配置同步到其他机器。
启动 Zookeeper
集群:
Bash | |
---|---|
1 |
|
再启动 Standalone
集群(要确保之前的 Standalone
集群已经停掉):
Bash | |
---|---|
1 |
|
分别打开两个 Master
节点上的 JobManager Web
页面:
http://z05f06378.sqa.zth.tbsite.net:8081
http://z05c19426.sqa.zth.tbsite.net:8081
可以看到两个页面最后都转到了同一个地址上,这个地址就是当前主 JobManager
所在机器,另一个就是 Standby JobManager
。以上我们就完成了 Standalone
模式下 HA
的配置。
接下来我们可以测试验证 HA
的有效性。当我们知道主 JobManager
的机器后,我们可以把主 JobManager
进程 Kill
掉,比如当前主 JobManager
在 z05c19426.sqa.zth.tbsite.net
这个机器上,就把这个进程杀掉。
接着,再打开这两个链接: http://z05f06378.sqa.zth.tbsite.net:8081 http://z05c19426.sqa.zth.tbsite.net:8081
可以发现后一个链接已经不能展示了,而前一个链接可以展示,说明发生主备切换。然后我们再重启前一次的主 JobManager
:
Bash | |
---|---|
1 |
|
再打开 http://z05c19426.sqa.zth.tbsite.net:8081 这个链接,会发现现在这个链接可以转到 http://z05f06378.sqa.zth.tbsite.net:8081 这个页面上了。说明这个 JobManager
完成了一个 Failover Recovery
。
2.6 使用 Yarn 模式跑 Flink job¶
相对于 Standalone
模式, Yarn
模式允许 Flink job
的好处有:
-
资源按需使用,提高集群的资源利用率
-
任务有优先级,根据优先级运行作业
-
基于
Yarn
调度系统,能够自动化地处理各个角色的Failover
-
JobManager
进程和TaskManager
进程都由Yarn NodeManager
监控 -
如果
JobManager
进程异常退出,则Yarn ResourceManager
会重新调度JobManager
到其他机器 -
如果
TaskManager
进程异常退出,JobManager
会收到消息并重新向Yarn ResourceManager
申请资源,重新启动TaskManager
2.6.1 在 Yarn 上启动 Long Running 的 Flink 集群(Session Cluster 模式)¶
查看命令参数:
Bash | |
---|---|
1 |
|
创建一个 Yarn
模式的 Flink
集群:
Bash | |
---|---|
1 |
|
其中用到的参数是:
-
-n,-container Number of TaskManagers
-
-jm,-jobManagerMemory Memory for JobManager Container with optional unit (default: MB)
-
-tm,-taskManagerMemory Memory per TaskManager Container with optional unit (default: MB)
-
-qu,-queue Specify YARN queue.
-
-s,-slots Number of slots per TaskManager
-
-t,-ship Ship files in the specified directory (t for transfer)
提交一个 Flink job
到 Flink
集群:
Bash | |
---|---|
1 |
|
这次提交 Flink job
,虽然没有指定对应 Yarn application
的信息,却可以提交到对应的 Flink
集群,原因在于 /tmp/.yarn-properties-${user}
文件中保存了上一次创建 Yarn session
的集群信息。所以如果同一用户在同一机器上再次创建一个 Yarn session
,则这个文件会被覆盖掉。
-
如果删掉
/tmp/.yarn-properties-${user}
或者在另一个机器上提交作业能否提交到预期到yarn session
中呢?可以配置了high-availability.cluster-id
参数,据此从Zookeeper
上获取到JobManager
的地址和端口,从而提交作业。 -
如果
Yarn session
没有配置HA
,又该如何提交呢?
这个时候就必须要在提交 Flink job
的命令中指明 Yarn
上的 Application ID
,通过 -yid
参数传入:
Bash | |
---|---|
1 |
|
我们可以发现,每次跑完任务不久, TaskManager
就被释放了,下次在提交任务的时候, TaskManager
又会重新拉起来。如果希望延长空闲 TaskManager
的超时时间,可以在 conf/flink-conf.yaml
文件中配置下面这个参数,单位是 milliseconds
:
Bash | |
---|---|
1 2 |
|
2.6.2 在 Yarn 上运行单个 Flink job(Job Cluster 模式)¶
如果你只想运行单个 Flink Job
后就退出,那么可以用下面这个命令:
Bash | |
---|---|
1 |
|
常用的配置有:
-
-yn,-yarncontainer Number of Task Managers
-
-yqu,-yarnqueue Specify YARN queue.
-
-ys,-yarnslots Number of slots per TaskManager
-
-yqu,-yarnqueue Specify YARN queue.
可以通过 Help
命令查看 Run
的可用参数:
Bash | |
---|---|
1 |
|
我们可以看到, ./bin/flink run -h
看到的 Options for yarn-cluster mode
中的 -y
和 -yarn
为前缀的参数其实和 ./bin/yarn-session.sh -h
命令是一一对应的,语义上也基本一致。
关于 -n
(在 yarn session
模式下)、 -yn
在( yarn single job
模式下)与 -p
参数的关系:
-
-n
和-yn
在社区版本中(Release-1.5
~Release-1.7
)中没有实际的控制作用,实际的资源是根据-p
参数来申请的,并且TM
使用完后就会归还 -
在
Blink
的开源版本中,-n
(在Yarn Session
模式下)的作用就是一开始启动指定数量的TaskManager
,之后即使Job
需要更多的Slot
,也不会申请新的TaskManager
-
在
Blink
的开源版本中,Yarn single job
模式-yn
表示的是初始TaskManager
的数量,不设置TaskManager
的上限。(需要特别注意的是,只有加上-yd
参数才能用Single job
模式(例如:命令./bin/flink run -yd -m yarn-cluster xxx
)
2.7 Yarn 模式下的 HighAvailability 配置¶
首先要确保启动 Yarn
集群用的 yarn-site.xml
文件中的这个配置,这个是 Yarn
集群级别 AM
重启的上限。
Bash | |
---|---|
1 2 3 4 |
|
然后在 conf/flink-conf.yaml
文件中配置这个 Flink job
的 JobManager
能够重启的次数。
Bash | |
---|---|
1 |
|
最后再在 conf/flink-conf.yaml
文件中配置上 ZK
相关配置,这几个配置的配置方法和 Standalone
的 HA
配置方法基本一致,如下所示。
Bash | |
---|---|
1 2 3 4 5 6 7 8 9 10 |
|
需要特别注意的是: high-availability.cluster-id
这个配置最好去掉,因为在 Yarn
(以及 Mesos
)模式下, cluster-id
如果不配置的话,会配置成 Yarn
上的 Application ID
,从而可以保证唯一性。