WHCSRL 技术网

Flink On Yarn模式配置

Flink On Yarn模式配置


引言

​ Flink依靠Yarn来实现高可用,由于Yarn依赖于Hadoop,而Hadoop又依赖于Jdk。

​ 准备三台机器

​ 1.1.1.1 node1

​ 1.1.1.2 node2

​ 1.1.1.3 node3

一、安装JDK

1. 下载解压
	tar -xvf jdk-8u271-linux-x64.tar.gz -C /usr/local
	mv jdk_1.8.271 jdk
2. 配置环境变量
export JAVA_HOME=/usr/local/jdk
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

3. 验证 
java -version
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

二、安装Hadoop

1. 配置hosts,做主机名到ip地址映射,每台机器都要更改
	vi /etc/hosts
	添加如下内容
		1.1.1.1	node1

		1.1.1.2	node2

		1.1.1.3	node3
	
2. 配置ssh免密登录
	ssh-keygen
	ssh-copy-id node2
	ssh-copy-id node3
3. 解压hadoop安装包
	tar -xvf hadoop-2.10.1.tar.gz -C /usr/local
	mv hadoop-2.10.1 hadoop
	
4. 配置环境变量
export HADOOP_HOME=/usr/local/hadoop
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin
	
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
5. 配置HDFS集群
	1. hadoop-env.sh
		添加jdk路径
		export JAVA_HOME=/usr/local/jdk

	2. core-site.xml
<configuration>
        <property>
                <name>hadoop.tmp.dir</name>
                <value>file:/usr/local/hadoop/data/hdfs/tmp</value>
                <description>A base for other temporary directories.</description>
        </property>
    <!-- sequenceFiles文件中读写缓存size设定 单位为KB,131072即默认为64M -->    
    	<property>
                <name>io.file.buffer.size</name>
                <value>131072</value>
        </property>
        <property>
                <name>fs.defaultFS</name>
                <value>hdfs://ns</value>
        </property>
    <!-- 允许root用户在任意主机节点代理任意的用户组 -->
    	<property>
                <name>hadoop.proxyuser.root.hosts</name>
                <value>*</value>
        </property>
        <property>
                <name>hadoop.proxyuser.root.groups</name>
                <value>*</value>
        </property>
        <property>
                <name>dfs.journalnode.edits.dir</name>
                <value>/usr/local/hadoop/data/hdfs/journal</value>
        </property>
        <!-- zookeeper信息 -->
        <property>
                <name>ha.zookeeper</name>
                <value>node1:2181,node2:2181,node3:2181</value>
        </property>
</configuration>

	3. hdfs-site.xml
<configuration>
	<property>
        <!-- 分片数量 -->
		<name>dfs.replication</name>
		<value>2</value>
	</property>
    <property>
        <!-- 切分的block大小 单位为KB 即128M-->
		<name>dfs.block.size</name>
		<value>134217728</value>
	</property>
	<property>
        <!-- namenode在本地元数据的存储路径 -->
		<name>dfs.namenode.name.dir</name>
		<value>file:///usr/local/hadoop/data/hdfs/namenode</value>
		</property>
	<property>
        <!-- datanode在本地存放block的存储路径 -->
		<name>dfs.datanode.data.dir</name>
		<value>file:///usr/local/hadoop/data/hdfs/datanode</value>
	</property>
    <property>
        <!-- namenode日志文件存储路径 -->
		<name>dfs.namenode.edits.dir</name>
		<value>file:///usr/local/hadoop/data/hdfs/nn/edits</value>
	</property>
    <!-- 集群名 -->
    <property>
		<name>dfs.nameservices</name>
		<value>ns</value>
	</property>
    <!-- 配置两个namenode,另一个为standby模式 -->
    <property>
		<name>dfs.ha.namenodes.ns</name>
		<value>nn1,nn2</value>
	</property>
    <property>
		<name>dfs.namenode.rpc-address.ns.nn1</name>
		<value>node1:9000</value>
	</property>
    <property>
		<name>dfs.namenode.rpc-address.ns.nn2</name>
		<value>node2:9000</value>
	</property>
    <property>
		<name>dfs.namenode.http-address.ns.nn1</name>
		<value>node1:50070</value>
	</property>
    <property>
		<name>dfs.namenode.http-address.ns.nn2</name>
		<value>node2:50070</value>
	</property>
    <property>
		<name>dfs.namenode.shard.edits.dir</name>
		<value>qjournal://node1:8485;node2:8485;node3:8485/ns</value>
	</property>
	<property>
        <!-- secondaryNamenode的网页端口号 -->
		<name>dfs.namenode.secondary.http-address</name>
		<value>node1:9001</value>
	</property>
	<property>
        <!-- 不区分namenode和datanode的端口号,可直接使用namenode的ip端口号进行所有的webhdfs操作 -->
		<name>dfs.webhdfs.enabled</name>
		<value>true</value>
	</property>
    <property>
		<name>dfs.ha.automatic-failover.enabled.ns</name>
		<value>true</value>
	</property>
	<property>
        <!-- 每个用户存取文件时,是否检查权限 -->
		<name>dfs.permissions</name>
		<value>false</value>
	</property>
    <property>
		<name>dfs.ha.fencing.methods</name>
		<value>sshfence</value>
	</property>
    <property>
		<name>dfs.ha.fencing.ssh.private-key-files</name>
		<value>~/.ssh/id_rsa</value>
	</property>
    <!-- 解决 识别不到集群名ns异常:UnknownHostsException:ns -->
    <property>
		<name>dfs.client.failover.proxy.provider.ns</name>
		<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
	</property>
</configuration>

	4. mapred-site.xml
<configuration>
    <!-- 执行框架为yarn -->
	<property>
		<name>mapreduce.framework.name</name>
		<value>yarn</value>
	</property>
    <!-- jobhistory地址 -->
	<property>
		<name>mapreduce.jobhistory.address</name>
		<value>node1:10200</value>
	</property>
    <!-- jobhistory网页地址 -->
	<property>
		<name>mapreduce.jobhistory.webapp.address</name>
		<value>node1:19888</value>
	</property>
	
</configuration>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
6. 配置yarn集群
	yarn-site.xml
<configuration>
    <!-- nodemanager上运行的附属服务,不配置成mapreduce_shuffle则无法运行mapreduce程序 -->
	<property>
		<name>yarn.nodemanager.aux-services</name>
		<value>mapreduce_shuffle</value>
	</property>
	<property>
		<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
		<value>org.apache.hadoop.mapred.ShuffleHandler</value>
	</property>
    <property>
		<name>yarn.resourcemanager.ha.enabled</name>
		<value>true</value>
	</property>
    <property>
		<name>yarn.resourcemanager.cluster-id</name>
		<value>ns</value>
	</property>
    <property>
		<name>yarn.resourcemanager.ha.rm-ids</name>
		<value>rm1,rm2</value>
	</property>
    <property>
		<name>yarn.resourcemanager.hostname.rm1</name>
		<value>node1</value>
	</property>
    <property>
		<name>yarn.resourcemanager.hostname.rm2</name>
		<value>node2</value>
	</property>
    <property>
		<name>yarn.resourcemanager.webapp.address.rm1</name>
		<value>node1:8088</value>
	</property>
    <property>
		<name>yarn.resourcemanager.webapp.address.rm2</name>
		<value>node2:8088</value>
	</property>
    <property>
		<name>yarn.resourcemanager.recovery.enabled</name>
		<value>true</value>
	</property>
    <!-- 基于zookeeper的HA高可用 -->
    <property>
		<name>yarn.resourcemanager.store.class</name>
		<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
	</property>
    <!-- 开启日志聚合功能 -->
	<property>
		<name>yarn.log-aggregation-enable</name>
		<value>true</value>
	</property>
     <!-- 日志保留设置为7天 -->
	<property>
		<name>yarn.log-aggregation-retain-seconds</name>
		<value>604800</value>
	</property>
    <!-- 配置为zookeeper存储时,指定zookeeper集群的地址 -->
	<property>
		<name>yarn.resourcemanager.zk-address</name>
		<value>node1:2181,node2:2181,node3:2181</value>
	</property>
    <!-- nodemanager运行内存,必须大于或等于1024,否则nodemanager启动不成功 -->
	<property>
		<name>yarn.nodemanager.resource.memory-mb</name>
		<value>4096</value>
	</property>
    <!-- 关闭yarn内存检查 -->
    <property>
		<name>yarn.nodemanager.vmem-check-enabled</name>
		<value>false</value>
	</property>
    <property>
		<name>yarn.nodemanager.pmem-check-enabled</name>
		<value>false</value>
	</property>
    <property>
        <name>yarn.client.failover-proxy-provider</name>
        <value>org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider</value>
   </property>
   <property>
        <name>yarn.resourcemanager.ha.automatic-failover.enabled</name>
        <value>true</value>
   </property>
   <!-- application master在重启时,最大的尝试次数 -->
    <property>
        <name>yarn.resourcemanager.am.max-attempts</name>
        <value>10</value>
   </property>
</configuration>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
7. 将/usr/local/hadoop文件夹分发给slave1和slave2
	scp -r hadoop root@node2:/usr/local/
	scp -r hadoop root@node3:/usr/local/

8. 启动集群
	1) 在node1上
		hdfs zkfc -formatZK
	2) 在三个节点分别启动
		hadoop-daemon.sh start journalnode
	3) 在node1
		hdfs namenode -format
		hadoop-daemon.sh start namenode
	4) 在node2上
		hdfs namenode -bootstrapStandby
		hadoop-daemon.sh start namenode
	5) 在node1和node2上
		hadoop-daemon.sh start zkfc
	6) 在三个节点上分别启动
		hadoop-daemon.sh start datanode
	7) 在node1和node2上
		yarn-daemon.sh start resourcemanager
	8) 在三个节点上分别启动
		yarn-daemon.sh start nodemanager
10. 验证
	jps
	
日常启动
	在三个节点分别启动
		hadoop-daemon.sh start journalnode
	在node1和node2启动
		hadoop-daemon.sh start zkfc
	一键启动
		start-dfs.sh
		start-yarn.sh
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

三、安装Zookeeper

1. 下载解压
	tar -xvf apache-zookeeper-3.5.9-bin.tar.gz -C /usr/local
	mv /usr/local/apache-zookeeper-3.5.9 /usr/local/zookeeper
	
2. 修改用户名和用户组权限
	chown -R root:root zookeeper/

3. 配置环境变量

4. 修改配置文件
	cp zoo_sample.cfg zoo.cfg
	
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/usr/local/zookeeper/tmp/data/zookeeper
dataLogDir=/usr/local/zookeeper/tmp/log/zookeeper
# the port at which the clients will connect
clientPort=2181
autopurge.purgeInterval=1
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888
# 注:server.1中的1为服务器id,需要与myid中的id一致

# 每个节点重复以上步骤

5. 设置服务器id
	touch /usr/local/zookeeper/tmp/data/zookeeper/myid
	echo 1 > /usr/local/zookeeper/tmp/data/zookeeper/myid
# node2 2 , node3中echo 3

6. 启动服务器
zkServer.sh start

7. 连接客户端
zkCli.sh -server node1:2181
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

四、安装Flink

1. 下载解压
	tar -xvf flink-1.13.2-bin-scala_2.11.tgz -C /usr/local/
	mv /usr/local/flink-1.13.2 /usr/local/flink
    
2. 配置环境变量
	export HADOOP_CLASSPATH=`/usr/local/hadoop/bin/hadoop classpath`
	export FLINK_HOME=/usr/local/flink

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
3. 编辑配置文件
	vi flink-conf.yaml
# JobManager内存主要分为四部分:JVM Heap、Off-Heap Memory、JVM Metaspace、JVM Overhead
# JobManager总内存设置为2048m,则JVM Overhead可根据0.1的fraction换算得到204.8m,即JVM Overhead内存为205m
# JVM Metaspace默认为256m
# Off-Heap Memory默认为128m
# JVM Heap最终被推断为2048m-205m-256m - 128m = 1459m,即1.42g
# 但gc算法会占用一小部分固定内存作为Non-Heap,占用大小为0.05g
# JVM Heap实际大小为1.42g - 0.05g = 1.38g
jobmanager.rpc.address: node1

jobmanager.rpc.port: 6123
#JobManager jvm堆大小,主要取决于运行的作业数量、作业结构及用户代码的要求
jobmanager.heap.size: 1024m
#进程总内存
jobmanager.memory.process.size: 2048m

taskmanager.memory.process.size: 4096m
#每个TaskManager提供的任务Slots数量,建议与cpu核数一致
taskmanager.numberOfTaskSlots: 4

parallelism.default: 1

env.hadoop.conf.dir: /usr/local/hadoop/etc/hadoop

high-availability: zookeeper
# flink在重启时,尝试的最大次数
yarn.application-attempts: 10

high-availability.storageDir: hdfs://ns/flink/recovery

high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181

high-availability.zookeeper.path.root: /flink
#用于存储和检查点状态
state.backend: filesystem

state.checkpoints.dir: hdfs://ns/flink/checkpoints

state.savepoints.dir: hdfs://ns/flink/savepoints
#故障转移策略
jobmanager.execution.failover-strategy: region

rest.port: 8081
#是否启动web提交
web.submit.enable: true

io.tmp.dirs: /usr/local/flink/data/tmp

env.log.dir: /usr/local/flink/data/logs

taskmanager.memory.network.fraction: 0.1
taskmanager.memory.network.min: 64mb
taskmanager.memory.network.max: 1gb
fs.hdfs.hadoopconf: /usr/local/hadoop/etc/hadoop

historyserver.web.address: 0.0.0.0

historyserver.web.port: 8082

historyserver.archive.fs.refresh-interval: 10000

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
4. 修改masters
	node1:8081
	node2:8081
	
5. 修改workers
	node1
	node2
	node3
6. 修改zoo.cfg
	tickTime=2000
	
	initLimit=10
	
	syncLimit=5
	
	dataDir=/usr/local/flink/data/tmp/zookeeper/dataDir
	dataLogDir=/usr/local/flink/data/tmp/zookeeper/dataLogDir
	
	clientPort=2181
	server.1=node1:2888:3888
	server.2=node2:2888:3888
	server.3=node3:2888:3888

7. 添加jar包
	flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
	
8. 启动flink yarn session模式
	yarn-session.sh

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
推荐阅读