The version of Kubernetes source code is
V1.5.3
.
Kubelet功能组件
PodWorkers
每个pod都有一个goroutines负责update pod。
- isWorking:如果pod的一个update正在被处理,其他update会被ignore,但是killing的update除外。killing的update会被临时记录在lastUndeliveredWorkUpdate中,等当前正在处理的update完成后,再立即执行。
- UpdatePod:执行pod update。如果pod没有update goroutine(新pod,或者kubelet重启),则为该pod新建一个goroutine。
PodManager
增删改查pod,并维护static pod与mirror pod映射关系。
static pod:非来自apiserver的pod,即来自file或者http的pod,apiserver不知道这些pod的存在。kubelet为这些pod,通过apiserver创建mirror pod。static pod与其对应的mirror pod有相同的pod full name(namespace/podname)。
PodDNS
支持两种DNSPolicy:
- ClusterFirst:如果有Cluster DNS(kube-dns),则优先使用Cluster DNS。否则,回退到
Default
模式。 - Default:使用由Kubelet决定的Default DNS。如果Kubelet使用
--cluster-dns=1.2.3.4
,则该Node上的所有Container的/etc/resolv.conf
中都会添加nameserver 1.2.3.4
。如果Kubelet没有使用--cluster-dns
,则默认使用Node上的DNS解析。
// DNSPolicy defines how a pod's DNS will be configured.
type DNSPolicy string
const (
// DNSClusterFirst indicates that the pod should use cluster DNS
// first, if it is available, then fall back on the default (as
// determined by kubelet) DNS settings.
DNSClusterFirst DNSPolicy = "ClusterFirst"
// DNSDefault indicates that the pod should use the default (as
// determined by kubelet) DNS settings.
DNSDefault DNSPolicy = "Default"
)
为Pod获得DNS的具体实现:
// GetClusterDNS returns a list of the DNS servers and a list of the DNS search
// domains of the cluster.
func (kl *Kubelet) GetClusterDNS(pod *api.Pod) ([]string, []string, error) {
var hostDNS, hostSearch []string
// Get host DNS settings
if kl.resolverConfig != "" {
f, err := os.Open(kl.resolverConfig)
if err != nil {
return nil, nil, err
}
defer f.Close()
hostDNS, hostSearch, err = kl.parseResolvConf(f)
if err != nil {
return nil, nil, err
}
}
useClusterFirstPolicy := pod.Spec.DNSPolicy == api.DNSClusterFirst
if useClusterFirstPolicy && kl.clusterDNS == nil {
// clusterDNS is not known.
// pod with ClusterDNSFirst Policy cannot be created
kl.recorder.Eventf(pod, api.EventTypeWarning, "MissingClusterDNS", "kubelet does not have ClusterDNS IP configured and cannot create Pod using %q policy. Falling back to DNSDefault policy.", pod.Spec.DNSPolicy)
log := fmt.Sprintf("kubelet does not have ClusterDNS IP configured and cannot create Pod using %q policy. pod: %q. Falling back to DNSDefault policy.", pod.Spec.DNSPolicy, format.Pod(pod))
kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, "MissingClusterDNS", log)
// fallback to DNSDefault
useClusterFirstPolicy = false
}
if !useClusterFirstPolicy {
// When the kubelet --resolv-conf flag is set to the empty string, use
// DNS settings that override the docker default (which is to use
// /etc/resolv.conf) and effectively disable DNS lookups. According to
// the bind documentation, the behavior of the DNS client library when
// "nameservers" are not specified is to "use the nameserver on the
// local machine". A nameserver setting of localhost is equivalent to
// this documented behavior.
if kl.resolverConfig == "" {
hostDNS = []string{"127.0.0.1"}
hostSearch = []string{"."}
}
return hostDNS, hostSearch, nil
}
// for a pod with DNSClusterFirst policy, the cluster DNS server is the only nameserver configured for
// the pod. The cluster DNS server itself will forward queries to other nameservers that is configured to use,
// in case the cluster DNS server cannot resolve the DNS query itself
dns := []string{kl.clusterDNS.String()}
var dnsSearch []string
if kl.clusterDomain != "" {
nsSvcDomain := fmt.Sprintf("%s.svc.%s", pod.Namespace, kl.clusterDomain)
svcDomain := fmt.Sprintf("svc.%s", kl.clusterDomain)
dnsSearch = append([]string{nsSvcDomain, svcDomain, kl.clusterDomain}, hostSearch...)
} else {
dnsSearch = hostSearch
}
return dns, dnsSearch, nil
}
备注:Kubernetes 1.6的Kube DNS支持Private DNS Zones和Upstream Nameservers,使Pod的DNS解析更加灵活。 参考:Configuring Private DNS Zones and Upstream Nameservers in Kubernetes
StatusManager
cache pod status,并同步pod status到apiserver。两种同步触发机制:
- 隔10秒一次
- pod状态更新:container ready,或者terminated,或者直接SetPodStatus
pkg/kubelet/status/status_manager.go
const syncPeriod = 10 * time.Second
func (m *manager) Start() {
// Don't start the status manager if we don't have a client. This will happen
// on the master, where the kubelet is responsible for bootstrapping the pods
// of the master components.
if m.kubeClient == nil {
glog.Infof("Kubernetes client is nil, not starting status manager.")
return
}
glog.Info("Starting to sync pod status with apiserver")
syncTicker := time.Tick(syncPeriod)
// syncPod and syncBatch share the same go routine to avoid sync races.
go wait.Forever(func() {
select {
case syncRequest := <-m.podStatusChannel:
m.syncPod(syncRequest.podUID, syncRequest.status)
case <-syncTicker:
m.syncBatch()
}
}, 0)
}
containerRuntime
支持docker和rkt。如果是docker的话,就是dockertools.DockerManager;如果是rkt,就是rkt.Runtime。
- DockerManager
实现kubecontainer.Runtime接口,封装了DockerInterface的Client,负责与Docker Daemon交互。Runtime接口中的各操作,最终是调用Docker client实现。 部分Docker Client返回的数据,需要从dockertypes类型转化为kubecontainer类型。例如,通过dockertools/convert.go中的toRuntimeImage和toRuntimeContainer分别转化dockertypes的image和container。
- rkt.Runtime
实现kubecontainer.Runtime接口,封装了rktapi.PublicAPIClient的apisvc,负责与rkt apiserver交互。Runtime接口中的各操作,最终是调用apisvc实现。
ImageGC
ImageGCPolicy支持三个参数:
- MinAge:image最小存活时间
- HighThresholdPercent:镜像磁盘使用率高于该值,触发GC。即镜像磁盘的最高使用率
- LowThresholdPercent: 镜像磁盘使用率低于该值,不触发GC。即镜像磁盘GC后的最低使用率。
pkg/kubelet/images/image_gc_manager.go
// A policy for garbage collecting images. Policy defines an allowed band in
// which garbage collection will be run.
type ImageGCPolicy struct {
// Any usage above this threshold will always trigger garbage collection.
// This is the highest usage we will allow.
HighThresholdPercent int
// Any usage below this threshold will never trigger garbage collection.
// This is the lowest threshold we will try to garbage collect to.
LowThresholdPercent int
// Minimum age at which an image can be garbage collected.
MinAge time.Duration
}
ContainerGC
ContainerGCPolicy支持三个参数:
- MinAge:容器最小存活时间
- MaxPerPodContainer:每个pod最大容器数
- MaxContainers:整个node上最大容器数
唯一的接口GarbageCollect实际上是调用containerRuntime(Docker,或者rkt)的GarbageCollect。
pkg/kubelet/container/container_gc.go
// Specified a policy for garbage collecting containers.
type ContainerGCPolicy struct {
// Minimum age at which a container can be garbage collected, zero for no limit.
MinAge time.Duration
// Max number of dead containers any single pod (UID, container name) pair is
// allowed to have, less than zero for no limit.
MaxPerPodContainer int
// Max number of total dead containers, less than zero for no limit.
MaxContainers int
}
// Manages garbage collection of dead containers.
//
// Implementation is thread-compatible.
type ContainerGC interface {
// Garbage collect containers.
GarbageCollect(allSourcesReady bool) error
}
func (cgc *realContainerGC) GarbageCollect(allSourcesReady bool) error {
return cgc.runtime.GarbageCollect(cgc.policy, allSourcesReady)
}
Docker的GarbageCollect将非running且超过MinAge的container分成两类:
- evictUnits:可以解析出container name,根据GCPolicy选择性删除。
- unidentifiedContainers:不能解析出container name,一定删除。
ImageGC和ContainerGC,都是由StartGarbageCollection()分别单独起一个goroutine,根据GCPeriod定时执行。StartGarbageCollection()与NewMainKubelet()一起,在kubelet/app/server.go的CreateAndInitKubelet()中调用。
func CreateAndInitKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, standaloneMode bool) (k kubelet.KubeletBootstrap, err error) {
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations
k, err = kubelet.NewMainKubelet(kubeCfg, kubeDeps, standaloneMode)
if err != nil {
return nil, err
}
k.BirthCry()
k.StartGarbageCollection()
return k, nil
}
// Starts garbage collection threads.
func (kl *Kubelet) StartGarbageCollection() {
loggedContainerGCFailure := false
go wait.Until(func() {
if err := kl.containerGC.GarbageCollect(kl.sourcesReady.AllReady()); err != nil {
glog.Errorf("Container garbage collection failed: %v", err)
kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, events.ContainerGCFailed, err.Error())
loggedContainerGCFailure = true
} else {
var vLevel glog.Level = 4
if loggedContainerGCFailure {
vLevel = 1
loggedContainerGCFailure = false
}
glog.V(vLevel).Infof("Container garbage collection succeeded")
}
}, ContainerGCPeriod, wait.NeverStop)
loggedImageGCFailure := false
go wait.Until(func() {
if err := kl.imageManager.GarbageCollect(); err != nil {
glog.Errorf("Image garbage collection failed: %v", err)
kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, events.ImageGCFailed, err.Error())
loggedImageGCFailure = true
} else {
var vLevel glog.Level = 4
if loggedImageGCFailure {
vLevel = 1
loggedImageGCFailure = false
}
glog.V(vLevel).Infof("Image garbage collection succeeded")
}
}, ImageGCPeriod, wait.NeverStop)
}
NetworkPlugin
Kubelet是通过kubeDeps获得NetworkPlugins信息。kubeDeps.NetworkPlugins是调用ProbeNetworkPlugins()收集所有Network Plugin。除了CNI的network plugin,还会收集NetworkPluginDir中的。
InitNetworkPlugin()时,会根据NetworkPluginName来选择需要的Network Plugin,然后调用plugin的Init()进行初始化。对于CNI plugin,只用到host参数,hairpinMode,nonMasqueradeCIDR以及mtu都没有用到。
networkPlugin初始化好后,会传递到containerRuntime,在Pod的生命周期中负责管理网络。
Kubelet中实现CNI Network Plugin的主要代码在pkg/kubelet/network/cni/cni.go。
cniNetwork
cniNetwork主要有两个属性:
- NetworkConfig:/etc/cni/net.d下面的config
- CNIConfig:All paths to store CNI binaries, include
/opt/cni/bin
,${vendorCNIDirPrefix}/opt/${pluginType}/bin
。并实现CNI接口。
cniNetwork主要有两个方法:
- addToNetwork:根据Pod信息,buildCNIRuntimeConf(),然后调用CNI接口AddNetwork(),将Pod添加到网络中。
- deleteFromNetwork:与addToNetwork()类似,只是调用CNI接口DelNetwork(),将Pod从网络中删除。
cniNetworkPlugin
cniNetworkPlugin有两个网络:
- loNetwork:getLoNetwork()获取loopback,必须有,没有导致无法调用CNI network
func getLoNetwork(binDir, vendorDirPrefix string) *cniNetwork { loConfig, err := libcni.ConfFromBytes([]byte(`{ "cniVersion": "0.1.0", "name": "cni-loopback", "type": "loopback" }`)) if err != nil { // The hardcoded config above should always be valid and unit tests will // catch this panic(err) } cninet := &libcni.CNIConfig{ Path: []string{vendorCNIDir(vendorDirPrefix, loConfig.Network.Type), binDir}, } loNetwork := &cniNetwork{ name: "lo", NetworkConfig: loConfig, CNIConfig: cninet, } return loNetwork }
- defaultNetwork:从/etc/cni/net.d下面读取config,只会读取第一个正确的,因此不要有多个
func getDefaultCNINetwork(pluginDir, binDir, vendorCNIDirPrefix string) (*cniNetwork, error) { if pluginDir == "" { pluginDir = DefaultNetDir } files, err := libcni.ConfFiles(pluginDir) switch { case err != nil: return nil, err case len(files) == 0: return nil, fmt.Errorf("No networks found in %s", pluginDir) } sort.Strings(files) for _, confFile := range files { conf, err := libcni.ConfFromFile(confFile) if err != nil { glog.Warningf("Error loading CNI config file %s: %v", confFile, err) continue } // Search for vendor-specific plugins as well as default plugins in the CNI codebase. vendorDir := vendorCNIDir(vendorCNIDirPrefix, conf.Network.Type) cninet := &libcni.CNIConfig{ Path: []string{binDir, vendorDir}, } network := &cniNetwork{name: conf.Network.Name, NetworkConfig: conf, CNIConfig: cninet} return network, nil } return nil, fmt.Errorf("No valid networks found in %s", pluginDir) }
CNI Interface
CNI接口是由containernetworking定义的。定义的接口非常简单,只有四个方法:
containernetworking/cni/libcni/api.go
type CNI interface {
AddNetworkList(net *NetworkConfigList, rt *RuntimeConf) (types.Result, error)
DelNetworkList(net *NetworkConfigList, rt *RuntimeConf) error
AddNetwork(net *NetworkConfig, rt *RuntimeConf) (types.Result, error)
DelNetwork(net *NetworkConfig, rt *RuntimeConf) error
}
K8s在管理Pod网络时,主要用到其中的两个接口:AddNetwork()和DelNetwork()。以AddNetwork()为例,来分析其具体实现:先从pluginPath获得plugin的binary,然后injectRuntimeConfig()将网络配置注入到net中,并作为最后plugin执行的stdin,然后还会将network的操作(ADD或者DEL)以及RuntimeConf,作为plugin执行时的环境变量。
containernetworking/cni/libcni/api.go
// AddNetwork executes the plugin with the ADD command
func (c *CNIConfig) AddNetwork(net *NetworkConfig, rt *RuntimeConf) (types.Result, error) {
pluginPath, err := invoke.FindInPath(net.Network.Type, c.Path)
if err != nil {
return nil, err
}
net, err = injectRuntimeConfig(net, rt)
if err != nil {
return nil, err
}
return invoke.ExecPluginWithResult(pluginPath, net.Bytes, c.args("ADD", rt))
}
Kubelet工作原理
Pod创建过程
Kubelet通过syncLoop对开始对Pod的整个生命周期进行管理。来自于配置文件,HTTP请求或者Kube-apiserver的配置更新,都触发syncLoop。
syncLoopIteration会一直监听Pod的配置和生命周期的更新,同时也是定时被触发。
Pod配置更新,支持五种kubetypes: ADD,UPDATE,REMOVE,RECONCILE,DELETE。这些更新会交给SyncHandler接口来处理,其实Kubelet自己实现了这个接口。
各Handler最终都会调用dispatchWork来执行这些操作。其中主要是执行podWorkers中的UpdatePod来执行更新Pod的操作。managePodLoop会调用Kubelet的syncPod方法,来最终同步pod状态到本地。