TalkingData的Spark On Kubernetes实践

众所周知,Spark是一个快速、通用的大规模数据处理平台,和Hadoop的MapReduce计算框架类似。但是相对于MapReduce,Spark凭借其可伸缩、基于内存计算等特点,以及可以直接读写Hadoop上任何格式数据的优势,使批处理更加高效,并有更低的延迟。实际上,Spark已经成为轻量级大数据快速处理的统一平台。

Spark官方支持四种ClusterManager:Sparkstandaloneclustermanager、Mesos、YARN和Kubernetes。由于我们TalkingData是使用Kubernetes作为资源的调度和管理平台,所以SparkOnKubernetes对于我们是最好的解决方案。

如何搭建生产可用的Kubernetes集群

部署

目前市面上有很多搭建Kubernetes的方法,比如Scratch、Kubeadm、Minikube或者各种托管方案。因为我们需要简单快速地搭建功能验证集群,所以选择了Kubeadm作为集群的部署工具。部署步骤很简单,在master上执行:

kubeadminit

在node上执行:

kubeadmjoin--token:--discovery-token-ca-cert-hashsha256:

具体配置可见官方文档:。

需要注意的是由于国内网络限制,很多镜像无法从获取,我们需要将之替换为第三方提供的镜像,比如:。

网络

Kubernetes网络默认是通过CNI实现,主流的CNIplugin有:LinuxBridge、MACVLAN、Flannel、Calico、Kube-router、WeaveNet等。Flannel主要是使用VXLANtunnel来解决pod间的网络通信,Calico和Kube-router则是使用BGP。由于软VXLAN对宿主机的性能和网络有不小的损耗,BGP则对硬件交换机有一定的要求,且我们的基础网络是VXLAN实现的大二层,所以我们最终选择了MACVLAN。

CNIMACVLAN的配置示例如下:

{"name":"mynet","type":"macvlan","master":"eth0","ipam":{"type":"host-local","subnet":"10.0.0.0/17","rangeStart":"10.0.64.1","range":"10.0.64.126","gateway":"10.0.127.254","routes":[{"dst":"0.0.0.0/0"},{"dst":"10.0.80.0/24","gw":"10.0.0.61"}]}}

Podsubnet是10.0.0.0/17,实际podippool是10.0.64.0/20。clustercidr是10.0.80.0/24。我们使用的IPAM是host-local,规则是在每个Kubernetesnode上建立/25的子网,可以提供126个IP。我们还配置了一条到clustercidr的静态路由10.0.80.0/24,网关是宿主机。这是因为容器在macvlan配置下egress并不会通过宿主机的iptables,这点和LinuxBridge有较大区别。在LinuxBridge模式下,只要指定内核参数=1,所有进入bridge的流量都会通过宿主机的iptables。经过分析kube-proxy,我们发现可以使用KUBE-FORWARD这个chain来进行pod到service的网络转发:

-AFORWARD-mcomment--comment"kubernetesforwardrules"-jKUBE-FORWARD-AKUBE-FORWARD-mcomment--comment"kubernetesforwardingrules"-mmark--mark0x4000/0/17-mcomment--comment"kubernetesforwardingconntrackpodsourcerule"-mconntrack--ctstateRELATED,/17-mcomment--comment"kubernetesforwardingconntrackpoddestinationrule"-mconntrack--ctstateRELATED,ESTABLISHED-jACCEPT

最后通过KUBE-SERVICES使用DNAT到后端的pod。pod访问其他网段的话,就通过物理网关10.0.127.254。

还有一个需要注意的地方是出于kernelsecurity的考虑,link物理接口的macvlan是无法直接和物理接口通信的,这就导致容器并不能将宿主机作为网关。我们采用了一个小技巧,避开了这个限制。我们从物理接口又创建了一个macvlan,将物理IP移到了这个接口上,物理接口只作为网络入口:

$cat/etc/sysconfig/network-scripts/ifcfg-eth0DEVICE=eth0IPV6INIT=noBOOTPROTO=none$cat/etc/sysconfig/network-scripts/ifcfg-macvlanDEVICE=macvlanNAME=macvlanBOOTPROTO=noneONBOOT=yesTYPE=macvlanDEVICETYPE=macvlanDEFROUTE=yesPEERDNS=yesPEERROUTES=yesIPV4_FAILURE_FATAL=noIPADDR=10.0.0.61PREFIX=17GATEWAY=10.0.127.254MACVLAN_PARENT=eth0MACVLAN_MODE=bridge

这样两个macvlan是可以互相通信的。

Kube-dns

默认配置下,Kubernetes使用kube-dns进行DNS解析和服务发现。但在实际使用时,我们发现在pod上通过servicedomain访问service总是有5秒的延迟。使用tcpdump抓包,发现延迟出现在DNSAAAA。进一步排查,发现问题是由于netfilter在conntrack和SNAT时的RaceCondition导致。简言之,DNSA和AAAA记录请求报文是并行发出的,这会导致netfilter在_nf_conntrack_confirm时认为第二个包是重复的(因为有相同的五元组),从而丢包。具体可看我提的issue:。一个简单的解决方案是在/etc/中增加optionssingle-request-reopen,使DNSA和AAAA记录请求报文使用不同的源端口。我提的PR在:,大家可以参考。我们的解决方法是不使用Kubernetesservice,设置hostNetwork=true使用宿主机网络提供DNS服务。因为我们的基础网络是大二层,所以pod和node可以直接通信,这就避免了conntrack和SNAT。

Spark与Kubernetes集成

由于Spark的抽象设计,我们可以使用第三方资源管理平台调度和管理Spark作业,比如Yarn、Mesos和Kubernetes。目前官方有一个experimental项目,可以将Spark运行在Kubernetes之上:。

基本原理

当我们通过spark-submit将Spark作业提交到Kubernetes集群时,会执行以下流程:

Spark在Kubernetespod中创建Sparkdriver

Driver调用KubernetesAPI创建executorpods,executorpods执行作业代码

计算作业结束,executorpods回收并清理

driverpod处于completed状态,保留日志,直到KubernetesGC或者手动清理

先决条件

+

+

具有Kubernetespods的list,create,edit和delete权限

Kubernetes集群必须正确配置KubernetesDNS[1]

如何集成

Docker镜像

由于Sparkdriver和executor都运行在Kubernetespod中,并且我们使用Docker作为containerruntimeenviroment,所以首先我们需要建立Spark的Docker镜像。

在Sparkdistribution中已包含相应脚本和Dockerfile,可以通过以下命令构建镜像:

$./bin/$./bin/

提交作业

在构建Spark镜像后,我们可以通过以下命令提交作业:

$bin/spark-submit\--masterk8s://https://:\--deploy-modecluster\--namespark-pi\--\--jarshttps://path/to/,https://path/to/://host:port/path/to/file1,hdfs://host:port/path/to/=5\--=\https://path/to/

其中,Sparkmaster是Kubernetesapiserver的地址,可以通过以下命令获取:

$kubectlcluster-infoKubernetesmasterisrunningat

Spark的作业代码和依赖,我们可以在--jars、--files和最后位置指定,协议支持http、https和HDFS。

执行提交命令后,会有以下输出:

任务结束,会输出:

访问SparkDriverUI

我们可以在本地使用kubectlport-forward访问DriverUI:

$kubectlport-forwarddriver-pod-name4040:4040

执行完后通过http://localhost:4040访问。

访问日志

Spark的所有日志都可以通过KubernetesAPI和kubectlCLI进行访问:

$kubectl-n=namespacelogs-fdriver-pod-name
如何实现租户和资源隔离KubernetesNamespace

在Kubernetes中,我们可以使用namespace在多用户间实现资源分配、隔离和配额。SparkOnKubernetes同样支持配置namespace创建Spark作业。

首先,创建一个Kubernetesnamespace:

$kubectlcreatenamespacespark

由于我们的Kubernetes集群使用了RBAC,所以还需创建serviceaccount和绑定role:

$kubectlcreateserviceaccountspark-nspark$kubectlcreateclusterrolebindingspark-role--clusterrole=edit--serviceaccount=spark:spark--namespace=spark

并在spark-submit中新增以下配置:

$bin/spark-submit\--=spark\--=spark\

资源隔离

考虑到我们Spark作业的一些特点和计算资源隔离,前期我们还是选择了较稳妥的物理隔离方案。具体做法是为每个组提供单独的Kubernetesnamespace,计算任务都在各自namespace里提交。计算资源以物理机为单位,折算成cpu和内存,纳入Kubernetes统一管理。在Kubernetes集群里,通过nodelabel和PodNodeSelector将计算资源和namespace关联。从而实现在提交Spark作业时,计算资源总是选择namespace关联的node。

具体做法如下:

1、创建nodelabel

$kubectllabelnodesnode_namespark:spark

2、开启Kubernetesadmissioncontroller

我们是使用kubeadm安装Kubernetes集群,所以修改/etc/kubernetes/manifests/,在--admission-control后添加PodNodeSelector。

$cat/etc/kubernetes/manifests/:v1kind:Podmetadata:annotations:/critical-pod:""creationTimestamp:nulllabels:component:kube-apiservertier:control-planename:kube-apiservernamespace:kube-systemspec:containers:-command:-kube-apiserver---secure-port=6443---proxy-client-cert-file=/etc/kubernetes/pki/=Initializers,NamespaceLifecycle,LimitRanger,ServiceAccount,DefaultStorageClass,DefaultTolerationSeconds,NodeRestriction,ResourceQuota,MutatingAdmissionWebhook,ValidatingAdmissionWebhook,PodNodeSelector

3、配置PodNodeSelector

在namespace的annotations中添加/node-selector:spark=spark。

apiVersion:v1kind:Namespacemetadata:annotations:/node-selector:spark=sparkname:spark

完成以上配置后,可以通过spark-submit测试结果:

$==spark--masterk8s://https://xxxx:6443--=5--=xxxx/library/spark:://xxxx:81//examples/jars/spark-examples_2.11-2.3.0.jar

我们可以看到,Spark作业全分配到了关联的hadooptest-001到003三个node上。

待解决问题

KubernetesHA

Kubernetes的集群状态基本都保存在etcd中,所以etcd是HA的关键所在。由于我们目前还处在半生产状态,HA这方面未过多考虑。有兴趣的同学可以查看:。

日志

在SparkOnYarn下,可以开启将日志收集聚合到HDFS中,以供查看。但是在SparkOnKubernetes中,则缺少这种日志收集机制,我们只能通过Kubernetespod的日志输出,来查看Spark的日志:

$kubectl-n=namespacelogs-fdriver-pod-name

收集和聚合日志,我们后面会和ES结合。

监控

我们TalkingData内部有自己的监控平台OWL[2](已开源),未来我们计划编写metricplugin,将Kubernetes接入OWL中。

混合部署

为了保证Spark作业时刻有可用的计算资源,我们前期采用了物理隔离的方案。显而易见,这种方式大幅降低了物理资源的使用率。下一步我们计划采用混部方案,通过以下三种方式实现:

将HDFS和Kubernetes混合部署

为Spark作业和Kubernetesnode划分优先级,在低优先级的node上同时运行一些无状态的其他生产服务

利用云实现资源水平扩展,以防止资源突增

资源扩展

在采用以下两种方法增加资源使用率时,集群可能会面临资源短缺和可用性的问题:

混合部署

资源超卖

这会导致运行资源大于实际物理资源的情况(我称之为资源挤兑)。一种做法是给资源划分等级,优先保证部分等级的资源供给。另一种做法是实现资源的水平扩展,动态补充可用资源,并在峰值过后自动释放。我在另一篇文章中阐述了这种设计理念:。

TalkingData有自研的多云管理平台,我们的解决方法是实现单独的Kubernetestdcloud-controller-manager作为资源的provider和manager,通过TalkingDataOWL监控告警,实现资源的水平扩展。

作者:开源大数据EMR

版权声明:本站所有作品(图文、音视频)均由用户自行上传分享,仅供网友学习交流,不声明或保证其内容的正确性,如发现本站有涉嫌抄袭侵权/违法违规的内容。请举报,一经查实,本站将立刻删除。

相关推荐