Flink on k8s 讲解与实战操作
一、概述
其针对数据流的分布式计算提供了数据分布,数据通信及容错机制等功能。
Flink 官网:
https://flink.apache.org/ 不同版本的文档:
https://nightlies.apache.org/flink flink on k8s 官方文档:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/ GitHub 地址:
https://github.com/apache/flink/tree/release-1.14.6/ 二、Flink 运行模式
官方文档:
https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/deployment/overview/
Flink on yarn 有三种运行模式:
yarn-session 模式(Seesion Mode)yarn-cluster 模式(Per-Job Mode)Application 模式(Application Mode)三、Flink on k8s 实战操作
1)flink 下载下载地址:
https://flink.apache.org/downloads.html 复制docker pull apache/flink:1.14.6-scala_2.12
docker tag apache/flink:1.14.6-scala_2.12 myharbor.com/bigdata/flink:1.14.6-scala_2.12
docker push myharbor.com/bigdata/flink:1.14.6-scala_2.121.2.3.Flink Session 集群作为长时间运行的 Kubernetes Deployment 执行。你可以在一个 Session 集群上运行多个 Flink 作业。每个作业都需要在集群部署完成后提交到集群。
Kubernetes 中的 Flink Session 集群部署至少包含三个组件:
运行JobManager的部署TaskManagers池的部署暴露JobManager 的 REST 和 UI 端口的服务1、Native Kubernetes 模式
参数配置:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#kubernetes-namespace 【1】构建镜像 Dockerfile
myharbor.com/bigdata/flink:1.14.6-scala_2.12
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai"> /etc/timezone
RUN export LANG=zh_CN.UTF-81.2.3.开始构建镜像
复制docker build -t myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 . --no-cache
# 上传镜像docker push myharbor.com/bigdata/flink-session:1.14.6-scala_2.121.2.3.【2】创建命名空间和 serviceaccount
kubectl create ns flink
# 创建serviceaccountkubectl create serviceaccount flink-service-account -n flink
# 用户授权kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account1.2.3.4.5.6.【3】创建 flink 集群
复制./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=my-first-flink-cluster \
-Dkubernetes.container.image=myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
-Dkubernetes.rest-service.exposed.type=NodePort1.2.3.4.5.6.【4】提交任务
复制./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=my-first-flink-cluster \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
./examples/streaming/TopSpeedWindowing.jar
# 参数配置./examples/streaming/WordCount.jar
-Dkubernetes.taskmanager.cpu=2000m \
-Dexternal-resource.limits.kubernetes.cpu=4000m \
-Dexternal-resource.limits.kubernetes.memory=10Gi \
-Dexternal-resource.requests.kubernetes.cpu=2000m \
-Dexternal-resource.requests.kubernetes.memory=8Gi \
-Dkubernetes.taskmanager.cpu=2000m \1.2.3.4.5.6.7.8.9.10.11.12.13.14.【温馨提示】注意 jdk 版本,目前 jdk8 是正常的。
【5】查看
复制kubectl get pods -n flink
kubectl logs -f my-first-flink-cluster-taskmanager-1-11.2.【6】删除 flink 集群
复制kubectl delete deployment/my-first-flink-cluster -n flink
kubectl delete ns flink --force1.2.2、Standalone 模式
【1】构建镜像
默认用户是 flink 用户,这里我换成 admin,根据企业需要更换用户,脚本可以通过上面运行的 pod 拿到。
启动脚本 docker-entrypoint.sh
COMMAND_STANDALONE="standalone-job"
COMMAND_HISTORY_SERVER="history-server"
# If unspecified, the hostname of the container is taken as the JobManager addressJOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"
drop_privs_cmd() {
if [ $(id -u) != 0 ]; then
# Dont need to drop privs if EUID != 0return
elif [ -x /sbin/su-exec ]; then
# Alpineecho su-exec admin
else
# Othersecho gosu admin
fi
}
copy_plugins_if_required() {
if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then
return 0
fi
echo "Enabling required built-in plugins"
for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ; ); do
echo "Linking ${target_plugin} to plugin directory"
plugin_name=${target_plugin%.jar}
mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"
if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then
echo "Plugin ${target_plugin} does not exist. Exiting."
exit 1
else
ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"
echo "Successfully enabled ${target_plugin}"
fi
done
}
set_config_option() {
local option=$1
local value=$2
# escape periods for usage in regular expressionslocal escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")
# either override an existing entry, or append a new oneif grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then
sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"
else
echo "${option}: ${value}" >> "${CONF_FILE}"
fi
}
prepare_configuration() {
set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
set_config_option blob.server.port 6124
set_config_option query.server.port 6125
if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
fi
if [ -n "${FLINK_PROPERTIES}" ]; then
echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
fi
envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
}
maybe_enable_jemalloc() {
if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then
JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"
JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"
if [ -f "$JEMALLOC_PATH" ]; then
export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH
elif [ -f "$JEMALLOC_FALLBACK" ]; then
export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK
else
if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then
MSG_PATH=$JEMALLOC_PATH
else
MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"
fi
echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldnt be found. glibc will be used instead."
fi
fi
}
maybe_enable_jemalloc
copy_plugins_if_required
prepare_configuration
args=("$@")
if [ "$1" = "help" ]; then
printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"
printf " Or $(basename "$0") help\n\n
printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the DISABLE_JEMALLOC environment variable to true.\n"
exit 0
elif [ "$1" = "jobmanager" ]; then
args=("${args[@]:1}")
echo "Starting Job Manager"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_STANDALONE} ]; then
args=("${args[@]:1}")
echo "Starting Job Manager"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then
args=("${args[@]:1}")
echo "Starting History Server"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
elif [ "$1" = "taskmanager" ]; then
args=("${args[@]:1}")
echo "Starting Task Manager"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
fi
args=("${args[@]}")
# Running command in pass-through modeexec $(drop_privs_cmd) "${args[@]}"1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.83.84.85.86.87.88.89.90.91.92.93.94.95.96.97.98.99.100.101.102.103.104.105.106.107.108.109.110.111.112.113.114.115.116.117.118.119.120.121.122.123.124.编排 Dockerfile
myharbor.com/bigdata/centos:7.9.2009
USERroot
# 安装常用工具RUNyum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof
# 设置时区,默认是UTC时区RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai"> /etc/timezone
RUNmkdir -p /opt/apache
ADDjdk-8u212-linux-x64.tar.gz /opt/apache/
ADDflink-1.14.6-bin-scala_2.12.tgz /opt/apache/
ENVFLINK_HOME /opt/apache/flink-1.14.6
ENVJAVA_HOME /opt/apache/jdk1.8.0_212
ENVPATH $JAVA_HOME/bin:$PATH
# 创建用户应用jar目录RUNmkdir $FLINK_HOME/usrlib/
#RUN mkdir homeCOPYdocker-entrypoint.sh /opt/apache/
RUNchmod +x /opt/apache/docker-entrypoint.sh
RUNgroupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin
RUNchown -R admin:admin /opt/apache
#设置的工作目录WORKDIR$FLINK_HOME
# 对外暴露端口EXPOSE 6123 8081# 执行脚本,构建镜像时不执行,运行实例才会执行ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]
CMD ["help"]1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.开始构建镜像
复制docker build -t myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 . --no-cache
# 上传镜像docker push myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
# 删除镜像docker rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
crictl rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.121.2.3.4.5.6.【2】创建命名空间和 serviceaccount
kubectl create ns flink
# 创建serviceaccountkubectl create serviceaccount flink-service-account -n flink
# 用户授权kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account1.2.3.4.5.6.【3】编排 yaml 文件
flink-configuration-configmap.yaml复制apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 3200m
taskmanager.memory.process.size: 2728m
taskmanager.memory.flink.size: 2280m
parallelism.default: 2
log4j-console.properties: |+
# This affects logging for both user code and FlinkrootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
# Uncomment this if you want to _only_ change Flinks logging #logger.flink.name = org.apache.flink #logger.flink.level = INFO # The following lines keep the log level of common libraries/connectors on # log level INFO. The root logger does not override this. You have to manually # change the log levels here.logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
# Log all infos to the consoleappender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Log all infos in the given rolling fileappender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
# Suppress the irrelevant (wrong) warnings from the Netty channel handlerlogger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.复制apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob-server
port: 6124
- name: webui
port: 8081
selector:
app: flink
component: jobmanager1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.复制apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager-rest
spec:
type: NodePort
ports:
- name: rest
port: 8081
targetPort: 8081
nodePort: 30081
selector:
app: flink
component: jobmanager1.2.3.4.5.6.7.8.9.10.11.12.13.14.复制apiVersion: v1
kind: Service
metadata:
name: flink-taskmanager-query-state
spec:
type: NodePort
ports:
- name: query-state
port: 6125
targetPort: 6125
nodePort: 30025
selector:
app: flink
component: taskmanager1.2.3.4.5.6.7.8.9.10.11.12.13.14.以上几个配置文件是公共的
jobmanager-session-deployment-non-ha.yaml复制apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
args: ["jobmanager"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/apache/flink-1.14.6/conf/
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.复制apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/apache/flink-1.14.6/conf/
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.【4】创建 flink 集群
复制kubectl create ns flink
# Configuration and service definitionkubectl create -f flink-configuration-configmap.yaml -n flink
# servicekubectl create -f jobmanager-service.yaml -n flink
kubectl create -f jobmanager-rest-service.yaml -n flink
kubectl create -f taskmanager-query-state-service.yaml -n flink
# Create the deployments for the clusterkubectl create -f jobmanager-session-deployment-non-ha.yaml -n flink
kubectl create -f taskmanager-session-deployment.yaml -n flink1.2.3.4.5.6.7.8.9.10.镜像逆向解析 dockerfile
复制alias whaler="docker run -t --rm -v /var/run/docker.sock:/var/run/docker.sock:ro pegleg/whaler"
whaler flink:1.14.6-scala_2.121.2.查看
Web UI 地址:
http://192.168.182.110:30081/#/overview 【5】提交任务
【6】删除 flink 集群
复制kubectl delete -f jobmanager-service.yaml -n flink
kubectl delete -f flink-configuration-configmap.yaml -n flink
kubectl delete -f taskmanager-session-deployment.yaml -n flink
kubectl delete -f jobmanager-session-deployment.yaml -n flink
kubectl delete ns flink --force1.2.3.4.5.【7】访问 flink web
端口就是jobmanager-rest-service.yaml文件中的 NodePort
http://192.168.182.110:30081/#/overview Kubernetes 中一个基本的 Flink Application 集群部署包含三个组件:
运行JobManager的应用程序TaskManagers池的部署暴露JobManager 的 REST 和 UI 端口的服务1、Native Kubernetes 模式(常用)
【1】构建镜像 Dockerfile
myharbor.com/bigdata/flink:1.14.6-scala_2.12
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai"> /etc/timezone
RUNexport LANG=zh_CN.UTF-8
RUNmkdir -p $FLINK_HOME/usrlib
COPY TopSpeedWindowing.jar $FLINK_HOME/usrlib/1.2.3.4.5.开始构建镜像
复制docker build -t myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 . --no-cache
# 上传镜像docker push myharbor.com/bigdata/flink-application:1.14.6-scala_2.12
# 删除镜像docker rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12
crictl rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.121.2.3.4.5.6.【2】创建命名空间和 serviceacount
kubectl create ns flink
# 创建serviceaccountkubectl create serviceaccount flink-service-account -n flink
# 用户授权kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account1.2.3.4.5.6.【3】创建 flink 集群并提交任务
复制./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image=myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 \
-Dkubernetes.jobmanager.replicas=1 \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
-Dexternal-resource.limits.kubernetes.cpu=2000m \
-Dexternal-resource.limits.kubernetes.memory=2Gi \
-Dexternal-resource.requests.kubernetes.cpu=1000m \
-Dexternal-resource.requests.kubernetes.memory=1Gi \
-Dkubernetes.rest-service.exposed.type=NodePort \
local:///opt/flink/usrlib/TopSpeedWindowing.jar1.2.3.4.5.6.7.8.9.10.11.12.13.查看
【4】删除 flink 集群
复制kubectl delete deployment/my-first-application-cluster -n flink
kubectl delete ns flink --force1.2.2、Standalone 模式
【1】构建镜像 Dockerfile
启动脚本 docker-entrypoint.sh
COMMAND_STANDALONE="standalone-job"
COMMAND_HISTORY_SERVER="history-server"
# If unspecified, the hostname of the container is taken as the JobManager addressJOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"
drop_privs_cmd() {
if [ $(id -u) != 0 ]; then
# Dont need to drop privs if EUID != 0return
elif [ -x /sbin/su-exec ]; then
# Alpineecho su-exec admin
else
# Othersecho gosu admin
fi
}
copy_plugins_if_required() {
if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then
return 0
fi
echo "Enabling required built-in plugins"
for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ; ); do
echo "Linking ${target_plugin} to plugin directory"
plugin_name=${target_plugin%.jar}
mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"
if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then
echo "Plugin ${target_plugin} does not exist. Exiting."
exit 1
else
ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"
echo "Successfully enabled ${target_plugin}"
fi
done
}
set_config_option() {
local option=$1
local value=$2
# escape periods for usage in regular expressionslocal escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")
# either override an existing entry, or append a new oneif grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then
sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"
else
echo "${option}: ${value}" >> "${CONF_FILE}"
fi
}
prepare_configuration() {
set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
set_config_option blob.server.port 6124
set_config_option query.server.port 6125
if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
fi
if [ -n "${FLINK_PROPERTIES}" ]; then
echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
fi
envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
}
maybe_enable_jemalloc() {
if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then
JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"
JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"
if [ -f "$JEMALLOC_PATH" ]; then
export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH
elif [ -f "$JEMALLOC_FALLBACK" ]; then
export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK
else
if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then
MSG_PATH=$JEMALLOC_PATH
else
MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"
fi
echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldnt be found. glibc will be used instead."
fi
fi
}
maybe_enable_jemalloc
copy_plugins_if_required
prepare_configuration
args=("$@")
if [ "$1" = "help" ]; then
printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"
printf " Or $(basename "$0") help\n\n"
printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the DISABLE_JEMALLOC environment variable to true.\n"
exit 0
elif [ "$1" = "jobmanager" ]; then
args=("${args[@]:1}")
echo "Starting Job Manager"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_STANDALONE} ]; then
args=("${args[@]:1}")
echo "Starting Job Manager"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then
args=("${args[@]:1}")
echo "Starting History Server"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
elif [ "$1" = "taskmanager" ]; then
args=("${args[@]:1}")
echo "Starting Task Manager"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
fi
args=("${args[@]}")
# Running command in pass-through modeexec $(drop_privs_cmd) "${args[@]}"1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.83.84.85.86.87.88.89.90.91.92.93.94.95.96.97.98.99.100.101.102.103.104.105.106.107.108.109.110.111.112.113.114.115.116.117.118.119.120.121.122.编排Dockerfile
myharbor.com/bigdata/centos:7.9.2009
USERroot
# 安装常用工具RUNyum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof
# 设置时区,默认是UTC时区RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai"> /etc/timezone
RUNmkdir -p /opt/apache
ADDjdk-8u212-linux-x64.tar.gz /opt/apache/
ADDflink-1.14.6-bin-scala_2.12.tgz /opt/apache/
ENVFLINK_HOME /opt/apache/flink-1.14.6
ENVJAVA_HOME /opt/apache/jdk1.8.0_212
ENVPATH $JAVA_HOME/bin:$PATH
# 创建用户应用jar目录RUNmkdir $FLINK_HOME/usrlib/
#RUN mkdir homeCOPYdocker-entrypoint.sh /opt/apache/
RUNgroupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin
RUNchown -R admin:admin /opt/apache
RUNchmod +x ${FLINK_HOME}/docker-entrypoint.sh
#设置的工作目录WORKDIR$FLINK_HOME
# 对外暴露端口EXPOSE 6123 8081# 执行脚本,构建镜像时不执行,运行实例才会执行ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]
CMD ["help"]1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.复制docker build -t myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 . --no-cache
# 上传镜像docker push myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
# 删除镜像docker rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.121.2.3.4.5.【2】创建命名空间和 serviceacount
kubectl create ns flink
# 创建serviceaccountkubectl create serviceaccount flink-service-account -n flink
# 用户授权kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account1.2.3.4.5.6.【3】编排 yaml 文件
flink-configuration-configmap.yaml
复制apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 3200m
taskmanager.memory.process.size: 2728m
taskmanager.memory.flink.size: 2280m
parallelism.default: 2
log4j-console.properties: |+
# This affects logging for both user code and FlinkrootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
# Uncomment this if you want to _only_ change Flinks logging #logger.flink.name = org.apache.flink #logger.flink.level = INFO # The following lines keep the log level of common libraries/connectors on # log level INFO. The root logger does not override this. You have to manually # change the log levels here.logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
# Log all infos to the consoleappender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Log all infos in the given rolling fileappender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
# Suppress the irrelevant (wrong) warnings from the Netty channel handlerlogger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.jobmanager-service.yaml可选服务,仅非 HA 模式需要。
复制apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob-server
port: 6124
- name: webui
port: 8081
selector:
app: flink
component: jobmanager1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.jobmanager-rest-service.yaml 可选服务,将 jobmanager rest端口公开为公共 Kubernetes 节点的端口。
复制apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager-rest
spec:
type: NodePort
ports:
- name: rest
port: 8081
targetPort: 8081
nodePort: 30081
selector:
app: flink
component: jobmanager1.2.3.4.5.6.7.8.9.10.11.12.13.14.taskmanager-query-state-service.yaml 可选服务,公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。
复制apiVersion: v1
kind: Service
metadata:
name: flink-taskmanager-query-state
spec:
type: NodePort
ports:
- name: query-state
port: 6125
targetPort: 6125
nodePort: 30025
selector:
app: flink
component: taskmanager1.2.3.4.5.6.7.8.9.10.11.12.13.14.jobmanager-application-non-ha.yaml ,非高可用
复制apiVersion: batch/v1
kind: Job
metadata:
name: flink-jobmanager
spec:
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
restartPolicy: OnFailure
containers:
- name: jobmanager
image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
env:
args: ["standalone-job", "--job-classname", "org.apache.flink.examples.java.wordcount.WordCount","--output","/tmp/result"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/apache/flink-1.14.6/conf
- name: job-artifacts-volume
mountPath: /opt/apache/flink-1.14.6/usrlib
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: job-artifacts-volume
hostPath:
path: /mnt/nfsdata/flink/application/job-artifacts1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.taskmanager-job-deployment.yaml
复制apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
env:
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/apache/flink-1.14.6/conf
- name: job-artifacts-volume
mountPath: /opt/apache/flink-1.14.6/usrlib
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: job-artifacts-volume
hostPath:
path: /mnt/nfsdata/flink/application/job-artifacts1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.【4】创建 flink 集群并提交任务
复制kubectl create ns flink
# Configuration and service definitionkubectl create -f flink-configuration-configmap.yaml -n flink
# servicekubectl create -f jobmanager-service.yaml -n flink
kubectl create -f jobmanager-rest-service.yaml -n flink
kubectl create -f taskmanager-query-state-service.yaml -n flink
# Create the deployments for the clusterkubectl create -f jobmanager-application-non-ha.yaml -n flink
kubectl create -f taskmanager-job-deployment.yaml -n flink1.2.3.4.5.6.7.8.9.10.查看
【5】删除 flink 集群
复制kubectl delete -f flink-configuration-configmap.yaml -n flink
kubectl delete -f jobmanager-service.yaml -n flink
kubectl delete -f jobmanager-rest-service.yaml -n flink
kubectl delete -f taskmanager-query-state-service.yaml -n flink
kubectl delete -f jobmanager-application-non-ha.yaml -n flink
kubectl delete -f taskmanager-job-deployment.yaml -n flink
kubectl delete ns flink --force1.2.3.4.5.6.7.【6】查看
复制kubectl get pods,svc -n flink
kubectl exec -it flink-taskmanager-54cb7fc57c-g484q -n flink -- bash1.2.