Flink on k8s 讲解与实战操作

一、概述

其针对数据流的分布式计算提供了数据分布,数据通信及容错机制等功能。

Flink 官网:

​    https://flink.apache.org/ ​

不同版本的文档:

​    https://nightlies.apache.org/flink ​

flink on k8s 官方文档:

​    https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/ ​

GitHub 地址:

​    https://github.com/apache/flink/tree/release-1.14.6/ ​

二、Flink 运行模式

官方文档:

​    https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/deployment/overview/ ​

Flink on yarn 有三种运行模式:

yarn-session 模式(Seesion Mode)yarn-cluster 模式(Per-Job Mode)Application 模式(Application Mode)

【温馨提示】Per-Job 模式(已弃用),Per-job 模式仅由 YARN 支持,并已在 Flink 1.15 中弃用。它将被丢弃在 FLINK-26000 中。

三、Flink on k8s 实战操作

1)flink 下载

下载地址:

​    https://flink.apache.org/downloads.html ​
复制wget https://dlcdn.apache.org/flink/flink-1.14.6/flink-1.14.6-bin-scala_2.12.tgz1.
2)构建基础镜像

复制docker pull apache/flink:1.14.6-scala_2.12

docker tag apache/flink:1.14.6-scala_2.12 myharbor.com/bigdata/flink:1.14.6-scala_2.12

docker push myharbor.com/bigdata/flink:1.14.6-scala_2.121.2.3.
3)session 模式

Flink Session 集群作为长时间运行的 Kubernetes Deployment 执行。你可以在一个 Session 集群上运行多个 Flink 作业。每个作业都需要在集群部署完成后提交到集群。

Kubernetes 中的 Flink Session 集群部署至少包含三个组件:

运行JobManager的部署TaskManagers池的部署暴露JobManager 的 REST 和 UI 端口的服务

1、Native Kubernetes 模式

参数配置:

​    https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#kubernetes-namespace ​

【1】构建镜像 Dockerfile

复制FROM

myharbor.com/bigdata/flink:1.14.6-scala_2.12

RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai"

> /etc/timezone

RUN export LANG=zh_CN.UTF-81.2.3.

开始构建镜像

复制docker build -t myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 . --no-cache

# 上传镜像docker push myharbor.com/bigdata/flink-session:1.14.6-scala_2.121.2.3.

【2】创建命名空间和 serviceaccount

复制# 创建namespace

kubectl create ns flink

# 创建serviceaccount

kubectl create serviceaccount flink-service-account -n flink

# 用户授权kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account1.2.3.4.5.6.

【3】创建 flink 集群

复制./bin/kubernetes-session.sh \

-Dkubernetes.cluster-id=my-first-flink-cluster \

-Dkubernetes.container.image=myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 \

-Dkubernetes.namespace=flink \

-Dkubernetes.jobmanager.service-account=flink-service-account \

-Dkubernetes.rest-service.exposed.type=NodePort1.2.3.4.5.6.

【4】提交任务

复制./bin/flink run \

--target kubernetes-session \

-Dkubernetes.cluster-id=my-first-flink-cluster \

-Dkubernetes.namespace=flink \

-Dkubernetes.jobmanager.service-account=flink-service-account \

./examples/streaming/TopSpeedWindowing.jar

# 参数配置

./examples/streaming/WordCount.jar

-Dkubernetes.taskmanager.cpu=2000m \

-Dexternal-resource.limits.kubernetes.cpu=4000m \

-Dexternal-resource.limits.kubernetes.memory=10Gi \

-Dexternal-resource.requests.kubernetes.cpu=2000m \

-Dexternal-resource.requests.kubernetes.memory=8Gi \

-Dkubernetes.taskmanager.cpu=2000m \1.2.3.4.5.6.7.8.9.10.11.12.13.14.

   【温馨提示】注意 jdk 版本,目前 jdk8 是正常的。

【5】查看

复制kubectl get pods -n flink

kubectl logs -f my-first-flink-cluster-taskmanager-1-11.2.

【6】删除 flink 集群

复制kubectl delete deployment/my-first-flink-cluster -n flink

kubectl delete ns flink --force1.2.

2、Standalone 模式

【1】构建镜像

默认用户是 flink 用户,这里我换成 admin,根据企业需要更换用户,脚本可以通过上面运行的 pod 拿到。

启动脚本 docker-entrypoint.sh

复制#!/usr/bin/env bash################################################################################ Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.###############################################################################

COMMAND_STANDALONE="standalone-job"

COMMAND_HISTORY_SERVER="history-server"

# If unspecified, the hostname of the container is taken as the JobManager address

JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}

CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"

drop_privs_cmd() {

if [ $(id -u) != 0 ]; then

# Dont need to drop privs if EUID != 0

return

elif [ -x /sbin/su-exec ]; then

# Alpine

echo su-exec admin

else

# Others

echo gosu admin

fi

}

copy_plugins_if_required() {

if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then

return 0

fi

echo "Enabling required built-in plugins"

for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ; ); do

echo "Linking ${target_plugin} to plugin directory"

plugin_name=${target_plugin%.jar}

mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"

if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then

echo "Plugin ${target_plugin} does not exist. Exiting."

exit 1

else

ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"

echo "Successfully enabled ${target_plugin}"

fi

done

}

set_config_option() {

local option=$1

local value=$2

# escape periods for usage in regular expressions

local escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")

# either override an existing entry, or append a new one

if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then

sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"

else

echo "${option}: ${value}" >> "${CONF_FILE}"

fi

}

prepare_configuration() {

set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}

set_config_option blob.server.port 6124

set_config_option query.server.port 6125

if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then

set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}

fi

if [ -n "${FLINK_PROPERTIES}" ]; then

echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"

fi

envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"

}

maybe_enable_jemalloc() {

if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then

JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"

JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"

if [ -f "$JEMALLOC_PATH" ]; then

export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH

elif [ -f "$JEMALLOC_FALLBACK" ]; then

export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK

else

if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then

MSG_PATH=$JEMALLOC_PATH

else

MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"

fi

echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldnt be found. glibc will be used instead."

fi

fi

}

maybe_enable_jemalloc

copy_plugins_if_required

prepare_configuration

args=("$@")

if [ "$1" = "help" ]; then

printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"

printf " Or $(basename "$0") help\n\n

printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the DISABLE_JEMALLOC environment variable to true.\n"

exit 0

elif [ "$1" = "jobmanager" ]; then

args=("${args[@]:1}")

echo "Starting Job Manager"

exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"

elif [ "$1" = ${COMMAND_STANDALONE} ]; then

args=("${args[@]:1}")

echo "Starting Job Manager"

exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"

elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then

args=("${args[@]:1}")

echo "Starting History Server"

exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"

elif [ "$1" = "taskmanager" ]; then

args=("${args[@]:1}")

echo "Starting Task Manager"

exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"

fi

args=("${args[@]}")

# Running command in pass-through modeexec $(drop_privs_cmd) "${args[@]}"1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.83.84.85.86.87.88.89.90.91.92.93.94.95.96.97.98.99.100.101.102.103.104.105.106.107.108.109.110.111.112.113.114.115.116.117.118.119.120.121.122.123.124.

编排 Dockerfile

复制FROM

myharbor.com/bigdata/centos:7.9.2009

USER

root

# 安装常用工具RUN

yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof

# 设置时区,默认是UTC时区RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai"

> /etc/timezone

RUN

mkdir -p /opt/apache

ADD

jdk-8u212-linux-x64.tar.gz /opt/apache/

ADD

flink-1.14.6-bin-scala_2.12.tgz /opt/apache/

ENV

FLINK_HOME /opt/apache/flink-1.14.6

ENV

JAVA_HOME /opt/apache/jdk1.8.0_212

ENV

PATH $JAVA_HOME/bin:$PATH

# 创建用户应用jar目录RUN

mkdir $FLINK_HOME/usrlib/

#RUN mkdir homeCOPY

docker-entrypoint.sh /opt/apache/

RUN

chmod +x /opt/apache/docker-entrypoint.sh

RUN

groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin

RUN

chown -R admin:admin /opt/apache

#设置的工作目录WORKDIR

$FLINK_HOME

# 对外暴露端口EXPOSE 6123 8081# 执行脚本,构建镜像时不执行,运行实例才会执行ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"

]

CMD ["help"]1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.

开始构建镜像

复制docker build -t myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 . --no-cache

# 上传镜像

docker push myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12

# 删除镜像

docker rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12

crictl rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.121.2.3.4.5.6.

【2】创建命名空间和 serviceaccount

复制# 创建namespace

kubectl create ns flink

# 创建serviceaccount

kubectl create serviceaccount flink-service-account -n flink

# 用户授权kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account1.2.3.4.5.6.

【3】编排 yaml 文件

flink-configuration-configmap.yaml

复制apiVersion: v1

kind: ConfigMap

metadata:

name: flink-config

labels:

app: flink

data:

flink-conf.yaml: |+

jobmanager.rpc.address: flink-jobmanager

taskmanager.numberOfTaskSlots: 2

blob.server.port: 6124

jobmanager.rpc.port: 6123

taskmanager.rpc.port: 6122

queryable-state.proxy.ports: 6125

jobmanager.memory.process.size: 3200m

taskmanager.memory.process.size: 2728m

taskmanager.memory.flink.size: 2280m

parallelism.default: 2

log4j-console.properties: |+

# This affects logging for both user code and Flink

rootLogger.level = INFO

rootLogger.appenderRef.console.ref = ConsoleAppender

rootLogger.appenderRef.rolling.ref = RollingFileAppender

# Uncomment this if you want to _only_ change Flinks logging #logger.flink.name = org.apache.flink #logger.flink.level = INFO # The following lines keep the log level of common libraries/connectors on # log level INFO. The root logger does not override this. You have to manually # change the log levels here.

logger.akka.name = akka

logger.akka.level = INFO

logger.kafka.name= org.apache.kafka

logger.kafka.level = INFO

logger.hadoop.name = org.apache.hadoop

logger.hadoop.level = INFO

logger.zookeeper.name = org.apache.zookeeper

logger.zookeeper.level = INFO

# Log all infos to the console

appender.console.name = ConsoleAppender

appender.console.type = CONSOLE

appender.console.layout.type = PatternLayout

appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# Log all infos in the given rolling file

appender.rolling.name = RollingFileAppender

appender.rolling.type = RollingFile

appender.rolling.append = false

appender.rolling.fileName = ${sys:log.file}

appender.rolling.filePattern = ${sys:log.file}.%i

appender.rolling.layout.type = PatternLayout

appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

appender.rolling.policies.type = Policies

appender.rolling.policies.size.type = SizeBasedTriggeringPolicy

appender.rolling.policies.size.size=100MB

appender.rolling.strategy.type = DefaultRolloverStrategy

appender.rolling.strategy.max = 10

# Suppress the irrelevant (wrong) warnings from the Netty channel handler

logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline

logger.netty.level = OFF1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.
jobmanager-service.yaml可选服务,仅非 HA 模式需要。

复制apiVersion: v1

kind: Service

metadata:

name: flink-jobmanager

spec:

type: ClusterIP

ports:

- name: rpc

port: 6123

- name: blob-server

port: 6124

- name: webui

port: 8081

selector:

app: flink

component: jobmanager1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.
jobmanager-rest-service.yaml 可选服务,将 jobmanager rest端口公开为公共 Kubernetes 节点的端口。

复制apiVersion: v1

kind: Service

metadata:

name: flink-jobmanager-rest

spec:

type: NodePort

ports:

- name: rest

port: 8081

targetPort: 8081

nodePort: 30081

selector:

app: flink

component: jobmanager1.2.3.4.5.6.7.8.9.10.11.12.13.14.
taskmanager-query-state-service.yaml 可选服务,公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。

复制apiVersion: v1

kind: Service

metadata:

name: flink-taskmanager-query-state

spec:

type: NodePort

ports:

- name: query-state

port: 6125

targetPort: 6125

nodePort: 30025

selector:

app: flink

component: taskmanager1.2.3.4.5.6.7.8.9.10.11.12.13.14.

以上几个配置文件是公共的

jobmanager-session-deployment-non-ha.yaml

复制apiVersion: apps/v1

kind: Deployment

metadata:

name: flink-jobmanager

spec:

replicas: 1

selector:

matchLabels:

app: flink

component: jobmanager

template:

metadata:

labels:

app: flink

component: jobmanager

spec:

containers:

- name: jobmanager

image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12

args: ["jobmanager"]

ports:

- containerPort: 6123

name: rpc

- containerPort: 6124

name: blob-server

- containerPort: 8081

name: webui

livenessProbe:

tcpSocket:

port: 6123

initialDelaySeconds: 30

periodSeconds: 60

volumeMounts:

- name: flink-config-volume

mountPath: /opt/apache/flink-1.14.6/conf/

securityContext:

runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary

volumes:

- name: flink-config-volume

configMap:

name: flink-config

items:

- key: flink-conf.yaml

path: flink-conf.yaml

- key: log4j-console.properties

path: log4j-console.properties1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.
taskmanager-session-deployment.yaml

复制apiVersion: apps/v1

kind: Deployment

metadata:

name: flink-taskmanager

spec:

replicas: 2

selector:

matchLabels:

app: flink

component: taskmanager

template:

metadata:

labels:

app: flink

component: taskmanager

spec:

containers:

- name: taskmanager

image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12

args: ["taskmanager"]

ports:

- containerPort: 6122

name: rpc

- containerPort: 6125

name: query-state

livenessProbe:

tcpSocket:

port: 6122

initialDelaySeconds: 30

periodSeconds: 60

volumeMounts:

- name: flink-config-volume

mountPath: /opt/apache/flink-1.14.6/conf/

securityContext:

runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary

volumes:

- name: flink-config-volume

configMap:

name: flink-config

items:

- key: flink-conf.yaml

path: flink-conf.yaml

- key: log4j-console.properties

path: log4j-console.properties1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.

【4】创建 flink 集群

复制kubectl create ns flink

# Configuration and service definition

kubectl create -f flink-configuration-configmap.yaml -n flink

# service

kubectl create -f jobmanager-service.yaml -n flink

kubectl create -f jobmanager-rest-service.yaml -n flink

kubectl create -f taskmanager-query-state-service.yaml -n flink

# Create the deployments for the cluster

kubectl create -f jobmanager-session-deployment-non-ha.yaml -n flink

kubectl create -f taskmanager-session-deployment.yaml -n flink1.2.3.4.5.6.7.8.9.10.

镜像逆向解析 dockerfile

复制alias whaler="docker run -t --rm -v /var/run/docker.sock:/var/run/docker.sock:ro pegleg/whaler"

whaler flink:1.14.6-scala_2.121.2.

查看

复制kubectl get pods,svc -n flink -owide1.

Web UI 地址:

​    http://192.168.182.110:30081/#/overview ​

【5】提交任务

复制./bin/flink run -m local-168-182-110:30081 ./examples/streaming/WordCount.jar1.
复制kubectl logs flink-taskmanager-54649bf96c-zjtkh -n flink1.

【6】删除 flink 集群

复制kubectl delete -f jobmanager-service.yaml -n flink

kubectl delete -f flink-configuration-configmap.yaml -n flink

kubectl delete -f taskmanager-session-deployment.yaml -n flink

kubectl delete -f jobmanager-session-deployment.yaml -n flink

kubectl delete ns flink --force1.2.3.4.5.

【7】访问 flink web

端口就是jobmanager-rest-service.yaml文件中的 NodePort

​    http://192.168.182.110:30081/#/overview ​

4)application 模式(推荐)

Kubernetes 中一个基本的 Flink Application 集群部署包含三个组件:

运行JobManager的应用程序TaskManagers池的部署暴露JobManager 的 REST 和 UI 端口的服务

1、Native Kubernetes 模式(常用)

【1】构建镜像 Dockerfile

复制FROM

myharbor.com/bigdata/flink:1.14.6-scala_2.12

RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai"

> /etc/timezone

RUN

export LANG=zh_CN.UTF-8

RUN

mkdir -p $FLINK_HOME/usrlib

COPY TopSpeedWindowing.jar $FLINK_HOME/usrlib/1.2.3.4.5.

开始构建镜像

复制docker build -t myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 . --no-cache

# 上传镜像

docker push myharbor.com/bigdata/flink-application:1.14.6-scala_2.12

# 删除镜像

docker rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12

crictl rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.121.2.3.4.5.6.

【2】创建命名空间和 serviceacount

复制# 创建namespace

kubectl create ns flink

# 创建serviceaccount

kubectl create serviceaccount flink-service-account -n flink

# 用户授权kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account1.2.3.4.5.6.

【3】创建 flink 集群并提交任务

复制./bin/flink run-application \

--target kubernetes-application \

-Dkubernetes.cluster-id=my-first-application-cluster \

-Dkubernetes.container.image=myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 \

-Dkubernetes.jobmanager.replicas=1 \

-Dkubernetes.namespace=flink \

-Dkubernetes.jobmanager.service-account=flink-service-account \

-Dexternal-resource.limits.kubernetes.cpu=2000m \

-Dexternal-resource.limits.kubernetes.memory=2Gi \

-Dexternal-resource.requests.kubernetes.cpu=1000m \

-Dexternal-resource.requests.kubernetes.memory=1Gi \

-Dkubernetes.rest-service.exposed.type=NodePort \

local:///opt/flink/usrlib/TopSpeedWindowing.jar1.2.3.4.5.6.7.8.9.10.11.12.13.
【注意】 local是应用模式中唯一支持的方案。local 代表本地环境,这里即 pod 或者容器环境,并非宿主机。

查看

复制kubectl get pods pods,svc -n flink1.

复制kubectl logs -f my-first-application-cluster-taskmanager-1-1 -n flink1.

【4】删除 flink 集群

复制kubectl delete deployment/my-first-application-cluster -n flink

kubectl delete ns flink --force1.2.

2、Standalone 模式

【1】构建镜像 Dockerfile

启动脚本 docker-entrypoint.sh

复制#!/usr/bin/env bash################################################################################ Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at# http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.###############################################################################

COMMAND_STANDALONE="standalone-job"

COMMAND_HISTORY_SERVER="history-server"

# If unspecified, the hostname of the container is taken as the JobManager address

JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}

CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"

drop_privs_cmd() {

if [ $(id -u) != 0 ]; then

# Dont need to drop privs if EUID != 0

return

elif [ -x /sbin/su-exec ]; then

# Alpine

echo su-exec admin

else

# Others

echo gosu admin

fi

}

copy_plugins_if_required() {

if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then

return 0

fi

echo "Enabling required built-in plugins"

for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ; ); do

echo "Linking ${target_plugin} to plugin directory"

plugin_name=${target_plugin%.jar}

mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"

if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then

echo "Plugin ${target_plugin} does not exist. Exiting."

exit 1

else

ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"

echo "Successfully enabled ${target_plugin}"

fi

done

}

set_config_option() {

local option=$1

local value=$2

# escape periods for usage in regular expressions

local escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")

# either override an existing entry, or append a new one

if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then

sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"

else

echo "${option}: ${value}" >> "${CONF_FILE}"

fi

}

prepare_configuration() {

set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}

set_config_option blob.server.port 6124

set_config_option query.server.port 6125

if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then

set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}

fi

if [ -n "${FLINK_PROPERTIES}" ]; then

echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"

fi

envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"

}

maybe_enable_jemalloc() {

if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then

JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"

JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"

if [ -f "$JEMALLOC_PATH" ]; then

export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH

elif [ -f "$JEMALLOC_FALLBACK" ]; then

export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK

else

if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then

MSG_PATH=$JEMALLOC_PATH

else

MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"

fi

echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldnt be found. glibc will be used instead."

fi

fi

}

maybe_enable_jemalloc

copy_plugins_if_required

prepare_configuration

args=("$@")

if [ "$1" = "help" ]; then

printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"

printf " Or $(basename "$0") help\n\n"

printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the DISABLE_JEMALLOC environment variable to true.\n"

exit 0

elif [ "$1" = "jobmanager" ]; then

args=("${args[@]:1}")

echo "Starting Job Manager"

exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"

elif [ "$1" = ${COMMAND_STANDALONE} ]; then

args=("${args[@]:1}")

echo "Starting Job Manager"

exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"

elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then

args=("${args[@]:1}")

echo "Starting History Server"

exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"

elif [ "$1" = "taskmanager" ]; then

args=("${args[@]:1}")

echo "Starting Task Manager"

exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"

fi

args=("${args[@]}")

# Running command in pass-through modeexec $(drop_privs_cmd) "${args[@]}"1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.83.84.85.86.87.88.89.90.91.92.93.94.95.96.97.98.99.100.101.102.103.104.105.106.107.108.109.110.111.112.113.114.115.116.117.118.119.120.121.122.

编排Dockerfile

复制FROM

myharbor.com/bigdata/centos:7.9.2009

USER

root

# 安装常用工具RUN

yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof

# 设置时区,默认是UTC时区RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai"

> /etc/timezone

RUN

mkdir -p /opt/apache

ADD

jdk-8u212-linux-x64.tar.gz /opt/apache/

ADD

flink-1.14.6-bin-scala_2.12.tgz /opt/apache/

ENV

FLINK_HOME /opt/apache/flink-1.14.6

ENV

JAVA_HOME /opt/apache/jdk1.8.0_212

ENV

PATH $JAVA_HOME/bin:$PATH

# 创建用户应用jar目录RUN

mkdir $FLINK_HOME/usrlib/

#RUN mkdir homeCOPY

docker-entrypoint.sh /opt/apache/

RUN

groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin

RUN

chown -R admin:admin /opt/apache

RUN

chmod +x ${FLINK_HOME}/docker-entrypoint.sh

#设置的工作目录WORKDIR

$FLINK_HOME

# 对外暴露端口EXPOSE 6123 8081# 执行脚本,构建镜像时不执行,运行实例才会执行ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"

]

CMD ["help"]1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.

复制docker build -t myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 . --no-cache

# 上传镜像

docker push myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12

# 删除镜像docker rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.121.2.3.4.5.

【2】创建命名空间和 serviceacount

复制# 创建namespace

kubectl create ns flink

# 创建serviceaccount

kubectl create serviceaccount flink-service-account -n flink

# 用户授权kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account1.2.3.4.5.6.

【3】编排 yaml 文件

flink-configuration-configmap.yaml

复制apiVersion: v1

kind: ConfigMap

metadata:

name: flink-config

labels:

app: flink

data:

flink-conf.yaml: |+

jobmanager.rpc.address: flink-jobmanager

taskmanager.numberOfTaskSlots: 2

blob.server.port: 6124

jobmanager.rpc.port: 6123

taskmanager.rpc.port: 6122

queryable-state.proxy.ports: 6125

jobmanager.memory.process.size: 3200m

taskmanager.memory.process.size: 2728m

taskmanager.memory.flink.size: 2280m

parallelism.default: 2

log4j-console.properties: |+

# This affects logging for both user code and Flink

rootLogger.level = INFO

rootLogger.appenderRef.console.ref = ConsoleAppender

rootLogger.appenderRef.rolling.ref = RollingFileAppender

# Uncomment this if you want to _only_ change Flinks logging #logger.flink.name = org.apache.flink #logger.flink.level = INFO # The following lines keep the log level of common libraries/connectors on # log level INFO. The root logger does not override this. You have to manually # change the log levels here.

logger.akka.name = akka

logger.akka.level = INFO

logger.kafka.name= org.apache.kafka

logger.kafka.level = INFO

logger.hadoop.name = org.apache.hadoop

logger.hadoop.level = INFO

logger.zookeeper.name = org.apache.zookeeper

logger.zookeeper.level = INFO

# Log all infos to the console

appender.console.name = ConsoleAppender

appender.console.type = CONSOLE

appender.console.layout.type = PatternLayout

appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# Log all infos in the given rolling file

appender.rolling.name = RollingFileAppender

appender.rolling.type = RollingFile

appender.rolling.append = false

appender.rolling.fileName = ${sys:log.file}

appender.rolling.filePattern = ${sys:log.file}.%i

appender.rolling.layout.type = PatternLayout

appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

appender.rolling.policies.type = Policies

appender.rolling.policies.size.type = SizeBasedTriggeringPolicy

appender.rolling.policies.size.size=100MB

appender.rolling.strategy.type = DefaultRolloverStrategy

appender.rolling.strategy.max = 10

# Suppress the irrelevant (wrong) warnings from the Netty channel handler

logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline

logger.netty.level = OFF1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.

jobmanager-service.yaml可选服务,仅非 HA 模式需要。

复制apiVersion: v1

kind: Service

metadata:

name: flink-jobmanager

spec:

type: ClusterIP

ports:

- name: rpc

port: 6123

- name: blob-server

port: 6124

- name: webui

port: 8081

selector:

app: flink

component: jobmanager1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.

jobmanager-rest-service.yaml 可选服务,将 jobmanager rest端口公开为公共 Kubernetes 节点的端口。

复制apiVersion: v1

kind: Service

metadata:

name: flink-jobmanager-rest

spec:

type: NodePort

ports:

- name: rest

port: 8081

targetPort: 8081

nodePort: 30081

selector:

app: flink

component: jobmanager1.2.3.4.5.6.7.8.9.10.11.12.13.14.

taskmanager-query-state-service.yaml 可选服务,公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。

复制apiVersion: v1

kind: Service

metadata:

name: flink-taskmanager-query-state

spec:

type: NodePort

ports:

- name: query-state

port: 6125

targetPort: 6125

nodePort: 30025

selector:

app: flink

component: taskmanager1.2.3.4.5.6.7.8.9.10.11.12.13.14.

jobmanager-application-non-ha.yaml ,非高可用

复制apiVersion: batch/v1

kind: Job

metadata:

name: flink-jobmanager

spec:

template:

metadata:

labels:

app: flink

component: jobmanager

spec:

restartPolicy: OnFailure

containers:

- name: jobmanager

image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12

env:

args: ["standalone-job", "--job-classname", "org.apache.flink.examples.java.wordcount.WordCount","--output","/tmp/result"]

ports:

- containerPort: 6123

name: rpc

- containerPort: 6124

name: blob-server

- containerPort: 8081

name: webui

livenessProbe:

tcpSocket:

port: 6123

initialDelaySeconds: 30

periodSeconds: 60

volumeMounts:

- name: flink-config-volume

mountPath: /opt/apache/flink-1.14.6/conf

- name: job-artifacts-volume

mountPath: /opt/apache/flink-1.14.6/usrlib

securityContext:

runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary

volumes:

- name: flink-config-volume

configMap:

name: flink-config

items:

- key: flink-conf.yaml

path: flink-conf.yaml

- key: log4j-console.properties

path: log4j-console.properties

- name: job-artifacts-volume

hostPath:

path: /mnt/nfsdata/flink/application/job-artifacts1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.
【温馨提示】注意这里的挂载/mnt/bigdata/flink/usrlib,最好这里使用共享目录。

taskmanager-job-deployment.yaml

复制apiVersion: apps/v1

kind: Deployment

metadata:

name: flink-taskmanager

spec:

replicas: 2

selector:

matchLabels:

app: flink

component: taskmanager

template:

metadata:

labels:

app: flink

component: taskmanager

spec:

containers:

- name: taskmanager

image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12

env:

args: ["taskmanager"]

ports:

- containerPort: 6122

name: rpc

- containerPort: 6125

name: query-state

livenessProbe:

tcpSocket:

port: 6122

initialDelaySeconds: 30

periodSeconds: 60

volumeMounts:

- name: flink-config-volume

mountPath: /opt/apache/flink-1.14.6/conf

- name: job-artifacts-volume

mountPath: /opt/apache/flink-1.14.6/usrlib

securityContext:

runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary

volumes:

- name: flink-config-volume

configMap:

name: flink-config

items:

- key: flink-conf.yaml

path: flink-conf.yaml

- key: log4j-console.properties

path: log4j-console.properties

- name: job-artifacts-volume

hostPath:

path: /mnt/nfsdata/flink/application/job-artifacts1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.

【4】创建 flink 集群并提交任务

复制kubectl create ns flink

# Configuration and service definition

kubectl create -f flink-configuration-configmap.yaml -n flink

# service

kubectl create -f jobmanager-service.yaml -n flink

kubectl create -f jobmanager-rest-service.yaml -n flink

kubectl create -f taskmanager-query-state-service.yaml -n flink

# Create the deployments for the cluster

kubectl create -f jobmanager-application-non-ha.yaml -n flink

kubectl create -f taskmanager-job-deployment.yaml -n flink1.2.3.4.5.6.7.8.9.10.

查看

复制kubectl get pods,svc -n flink1.

【5】删除 flink 集群

复制kubectl delete -f flink-configuration-configmap.yaml -n flink

kubectl delete -f jobmanager-service.yaml -n flink

kubectl delete -f jobmanager-rest-service.yaml -n flink

kubectl delete -f taskmanager-query-state-service.yaml -n flink

kubectl delete -f jobmanager-application-non-ha.yaml -n flink

kubectl delete -f taskmanager-job-deployment.yaml -n flink

kubectl delete ns flink --force1.2.3.4.5.6.7.

【6】查看

复制kubectl get pods,svc -n flink

kubectl exec -it flink-taskmanager-54cb7fc57c-g484q -n flink -- bash1.2.

阅读剩余
THE END