Kubernetes源码分析:Kubelet

Kubernetes Source Code Reading: Kubelet

Posted by Robin on March 6, 2017

The version of Kubernetes source code is V1.5.3.

Kubelet功能组件

PodWorkers

每个pod都有一个goroutines负责update pod。

  • isWorking:如果pod的一个update正在被处理,其他update会被ignore,但是killing的update除外。killing的update会被临时记录在lastUndeliveredWorkUpdate中,等当前正在处理的update完成后,再立即执行。
  • UpdatePod:执行pod update。如果pod没有update goroutine(新pod,或者kubelet重启),则为该pod新建一个goroutine。

PodManager

增删改查pod,并维护static pod与mirror pod映射关系。

static pod:非来自apiserver的pod,即来自file或者http的pod,apiserver不知道这些pod的存在。kubelet为这些pod,通过apiserver创建mirror pod。static pod与其对应的mirror pod有相同的pod full name(namespace/podname)。

PodDNS

支持两种DNSPolicy:

  • ClusterFirst:如果有Cluster DNS(kube-dns),则优先使用Cluster DNS。否则,回退到Default模式。
  • Default:使用由Kubelet决定的Default DNS。如果Kubelet使用--cluster-dns=1.2.3.4,则该Node上的所有Container的/etc/resolv.conf中都会添加nameserver 1.2.3.4。如果Kubelet没有使用--cluster-dns,则默认使用Node上的DNS解析。

pkg/api/types.go

// DNSPolicy defines how a pod's DNS will be configured.
type DNSPolicy string

const (
	// DNSClusterFirst indicates that the pod should use cluster DNS
	// first, if it is available, then fall back on the default (as
	// determined by kubelet) DNS settings.
	DNSClusterFirst DNSPolicy = "ClusterFirst"

	// DNSDefault indicates that the pod should use the default (as
	// determined by kubelet) DNS settings.
	DNSDefault DNSPolicy = "Default"
)

为Pod获得DNS的具体实现:

pkg/kubelet/kubelet.go

// GetClusterDNS returns a list of the DNS servers and a list of the DNS search
// domains of the cluster.
func (kl *Kubelet) GetClusterDNS(pod *api.Pod) ([]string, []string, error) {
	var hostDNS, hostSearch []string
	// Get host DNS settings
	if kl.resolverConfig != "" {
		f, err := os.Open(kl.resolverConfig)
		if err != nil {
			return nil, nil, err
		}
		defer f.Close()

		hostDNS, hostSearch, err = kl.parseResolvConf(f)
		if err != nil {
			return nil, nil, err
		}
	}
	useClusterFirstPolicy := pod.Spec.DNSPolicy == api.DNSClusterFirst
	if useClusterFirstPolicy && kl.clusterDNS == nil {
		// clusterDNS is not known.
		// pod with ClusterDNSFirst Policy cannot be created
		kl.recorder.Eventf(pod, api.EventTypeWarning, "MissingClusterDNS", "kubelet does not have ClusterDNS IP configured and cannot create Pod using %q policy. Falling back to DNSDefault policy.", pod.Spec.DNSPolicy)
		log := fmt.Sprintf("kubelet does not have ClusterDNS IP configured and cannot create Pod using %q policy. pod: %q. Falling back to DNSDefault policy.", pod.Spec.DNSPolicy, format.Pod(pod))
		kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, "MissingClusterDNS", log)

		// fallback to DNSDefault
		useClusterFirstPolicy = false
	}

	if !useClusterFirstPolicy {
		// When the kubelet --resolv-conf flag is set to the empty string, use
		// DNS settings that override the docker default (which is to use
		// /etc/resolv.conf) and effectively disable DNS lookups. According to
		// the bind documentation, the behavior of the DNS client library when
		// "nameservers" are not specified is to "use the nameserver on the
		// local machine". A nameserver setting of localhost is equivalent to
		// this documented behavior.
		if kl.resolverConfig == "" {
			hostDNS = []string{"127.0.0.1"}
			hostSearch = []string{"."}
		}
		return hostDNS, hostSearch, nil
	}

	// for a pod with DNSClusterFirst policy, the cluster DNS server is the only nameserver configured for
	// the pod. The cluster DNS server itself will forward queries to other nameservers that is configured to use,
	// in case the cluster DNS server cannot resolve the DNS query itself
	dns := []string{kl.clusterDNS.String()}

	var dnsSearch []string
	if kl.clusterDomain != "" {
		nsSvcDomain := fmt.Sprintf("%s.svc.%s", pod.Namespace, kl.clusterDomain)
		svcDomain := fmt.Sprintf("svc.%s", kl.clusterDomain)
		dnsSearch = append([]string{nsSvcDomain, svcDomain, kl.clusterDomain}, hostSearch...)
	} else {
		dnsSearch = hostSearch
	}
	return dns, dnsSearch, nil
}

备注Kubernetes 1.6的Kube DNS支持Private DNS Zones和Upstream Nameservers,使Pod的DNS解析更加灵活。 参考:Configuring Private DNS Zones and Upstream Nameservers in Kubernetes

StatusManager

cache pod status,并同步pod status到apiserver。两种同步触发机制:

  • 隔10秒一次
  • pod状态更新:container ready,或者terminated,或者直接SetPodStatus

pkg/kubelet/status/status_manager.go

const syncPeriod = 10 * time.Second

func (m *manager) Start() {
    // Don't start the status manager if we don't have a client. This will happen
    // on the master, where the kubelet is responsible for bootstrapping the pods
    // of the master components.
    if m.kubeClient == nil {
        glog.Infof("Kubernetes client is nil, not starting status manager.")
        return
    }

    glog.Info("Starting to sync pod status with apiserver")
    syncTicker := time.Tick(syncPeriod)
    // syncPod and syncBatch share the same go routine to avoid sync races.
    go wait.Forever(func() {
        select {
        case syncRequest := <-m.podStatusChannel:
            m.syncPod(syncRequest.podUID, syncRequest.status)
        case <-syncTicker:
            m.syncBatch()
        }
    }, 0)
}

containerRuntime

支持docker和rkt。如果是docker的话,就是dockertools.DockerManager;如果是rkt,就是rkt.Runtime。

  • DockerManager

实现kubecontainer.Runtime接口,封装了DockerInterface的Client,负责与Docker Daemon交互。Runtime接口中的各操作,最终是调用Docker client实现。 部分Docker Client返回的数据,需要从dockertypes类型转化为kubecontainer类型。例如,通过dockertools/convert.go中的toRuntimeImage和toRuntimeContainer分别转化dockertypes的image和container。

  • rkt.Runtime

实现kubecontainer.Runtime接口,封装了rktapi.PublicAPIClient的apisvc,负责与rkt apiserver交互。Runtime接口中的各操作,最终是调用apisvc实现。

ImageGC

ImageGCPolicy支持三个参数:

  • MinAge:image最小存活时间
  • HighThresholdPercent:镜像磁盘使用率高于该值,触发GC。即镜像磁盘的最高使用率
  • LowThresholdPercent: 镜像磁盘使用率低于该值,不触发GC。即镜像磁盘GC后的最低使用率。

pkg/kubelet/images/image_gc_manager.go

// A policy for garbage collecting images. Policy defines an allowed band in
// which garbage collection will be run.
type ImageGCPolicy struct {
    // Any usage above this threshold will always trigger garbage collection.
    // This is the highest usage we will allow.
    HighThresholdPercent int

    // Any usage below this threshold will never trigger garbage collection.
    // This is the lowest threshold we will try to garbage collect to.
    LowThresholdPercent int

    // Minimum age at which an image can be garbage collected.
    MinAge time.Duration
}

ContainerGC

ContainerGCPolicy支持三个参数:

  • MinAge:容器最小存活时间
  • MaxPerPodContainer:每个pod最大容器数
  • MaxContainers:整个node上最大容器数

唯一的接口GarbageCollect实际上是调用containerRuntime(Docker,或者rkt)的GarbageCollect。

pkg/kubelet/container/container_gc.go

// Specified a policy for garbage collecting containers.
 
type ContainerGCPolicy struct {
    // Minimum age at which a container can be garbage collected, zero for no limit.
    MinAge time.Duration
 
    // Max number of dead containers any single pod (UID, container name) pair is
    // allowed to have, less than zero for no limit.
    MaxPerPodContainer int
 
    // Max number of total dead containers, less than zero for no limit.
    MaxContainers int
}
 
// Manages garbage collection of dead containers.
//
// Implementation is thread-compatible.
type ContainerGC interface {
    // Garbage collect containers.
    GarbageCollect(allSourcesReady bool) error
}
 
func (cgc *realContainerGC) GarbageCollect(allSourcesReady bool) error {
    return cgc.runtime.GarbageCollect(cgc.policy, allSourcesReady)
}

Docker的GarbageCollect将非running且超过MinAge的container分成两类:

  • evictUnits:可以解析出container name,根据GCPolicy选择性删除。
  • unidentifiedContainers:不能解析出container name,一定删除。

ImageGC和ContainerGC,都是由StartGarbageCollection()分别单独起一个goroutine,根据GCPeriod定时执行。StartGarbageCollection()与NewMainKubelet()一起,在kubelet/app/server.go的CreateAndInitKubelet()中调用。

cmd/kubelet/app/server.go

func CreateAndInitKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, standaloneMode bool) (k kubelet.KubeletBootstrap, err error) {
	// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
	// up into "per source" synchronizations

	k, err = kubelet.NewMainKubelet(kubeCfg, kubeDeps, standaloneMode)
	if err != nil {
		return nil, err
	}

	k.BirthCry()

	k.StartGarbageCollection()

	return k, nil
}

pkg/kubelet/kubelet.go

// Starts garbage collection threads.
func (kl *Kubelet) StartGarbageCollection() {
	loggedContainerGCFailure := false
	go wait.Until(func() {
		if err := kl.containerGC.GarbageCollect(kl.sourcesReady.AllReady()); err != nil {
			glog.Errorf("Container garbage collection failed: %v", err)
			kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, events.ContainerGCFailed, err.Error())
			loggedContainerGCFailure = true
		} else {
			var vLevel glog.Level = 4
			if loggedContainerGCFailure {
				vLevel = 1
				loggedContainerGCFailure = false
			}

			glog.V(vLevel).Infof("Container garbage collection succeeded")
		}
	}, ContainerGCPeriod, wait.NeverStop)

	loggedImageGCFailure := false
	go wait.Until(func() {
		if err := kl.imageManager.GarbageCollect(); err != nil {
			glog.Errorf("Image garbage collection failed: %v", err)
			kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, events.ImageGCFailed, err.Error())
			loggedImageGCFailure = true
		} else {
			var vLevel glog.Level = 4
			if loggedImageGCFailure {
				vLevel = 1
				loggedImageGCFailure = false
			}

			glog.V(vLevel).Infof("Image garbage collection succeeded")
		}
	}, ImageGCPeriod, wait.NeverStop)
}

NetworkPlugin

Kubelet是通过kubeDeps获得NetworkPlugins信息。kubeDeps.NetworkPlugins是调用ProbeNetworkPlugins()收集所有Network Plugin。除了CNI的network plugin,还会收集NetworkPluginDir中的。

InitNetworkPlugin()时,会根据NetworkPluginName来选择需要的Network Plugin,然后调用plugin的Init()进行初始化。对于CNI plugin,只用到host参数,hairpinMode,nonMasqueradeCIDR以及mtu都没有用到。

networkPlugin初始化好后,会传递到containerRuntime,在Pod的生命周期中负责管理网络。

Kubelet中实现CNI Network Plugin的主要代码在pkg/kubelet/network/cni/cni.go

cniNetwork

cniNetwork主要有两个属性:

  • NetworkConfig:/etc/cni/net.d下面的config
  • CNIConfig:All paths to store CNI binaries, include /opt/cni/bin${vendorCNIDirPrefix}/opt/${pluginType}/bin。并实现CNI接口。

cniNetwork主要有两个方法:

  • addToNetwork:根据Pod信息,buildCNIRuntimeConf(),然后调用CNI接口AddNetwork(),将Pod添加到网络中。
  • deleteFromNetwork:与addToNetwork()类似,只是调用CNI接口DelNetwork(),将Pod从网络中删除。

cniNetworkPlugin

cniNetworkPlugin有两个网络:

  • loNetwork:getLoNetwork()获取loopback,必须有,没有导致无法调用CNI network
      func getLoNetwork(binDir, vendorDirPrefix string) *cniNetwork {
          loConfig, err := libcni.ConfFromBytes([]byte(`{
        "cniVersion": "0.1.0",
        "name": "cni-loopback",
        "type": "loopback"
      }`))
          if err != nil {
              // The hardcoded config above should always be valid and unit tests will
              // catch this
              panic(err)
          }
          cninet := &libcni.CNIConfig{
              Path: []string{vendorCNIDir(vendorDirPrefix, loConfig.Network.Type), binDir},
          }
          loNetwork := &cniNetwork{
              name:          "lo",
              NetworkConfig: loConfig,
              CNIConfig:     cninet,
          }
    	
          return loNetwork
      }
    
  • defaultNetwork:从/etc/cni/net.d下面读取config,只会读取第一个正确的,因此不要有多个
      func getDefaultCNINetwork(pluginDir, binDir, vendorCNIDirPrefix string) (*cniNetwork, error) {
          if pluginDir == "" {
              pluginDir = DefaultNetDir
          }
          files, err := libcni.ConfFiles(pluginDir)
          switch {
          case err != nil:
              return nil, err
          case len(files) == 0:
              return nil, fmt.Errorf("No networks found in %s", pluginDir)
          }
    	
          sort.Strings(files)
          for _, confFile := range files {
              conf, err := libcni.ConfFromFile(confFile)
              if err != nil {
                  glog.Warningf("Error loading CNI config file %s: %v", confFile, err)
                  continue
              }
              // Search for vendor-specific plugins as well as default plugins in the CNI codebase.
              vendorDir := vendorCNIDir(vendorCNIDirPrefix, conf.Network.Type)
              cninet := &libcni.CNIConfig{
                  Path: []string{binDir, vendorDir},
              }
              network := &cniNetwork{name: conf.Network.Name, NetworkConfig: conf, CNIConfig: cninet}
              return network, nil
          }
          return nil, fmt.Errorf("No valid networks found in %s", pluginDir)
      }
    

CNI Interface

CNI接口是由containernetworking定义的。定义的接口非常简单,只有四个方法:

containernetworking/cni/libcni/api.go

type CNI interface {
	AddNetworkList(net *NetworkConfigList, rt *RuntimeConf) (types.Result, error)
	DelNetworkList(net *NetworkConfigList, rt *RuntimeConf) error

	AddNetwork(net *NetworkConfig, rt *RuntimeConf) (types.Result, error)
	DelNetwork(net *NetworkConfig, rt *RuntimeConf) error
}

K8s在管理Pod网络时,主要用到其中的两个接口:AddNetwork()和DelNetwork()。以AddNetwork()为例,来分析其具体实现:先从pluginPath获得plugin的binary,然后injectRuntimeConfig()将网络配置注入到net中,并作为最后plugin执行的stdin,然后还会将network的操作(ADD或者DEL)以及RuntimeConf,作为plugin执行时的环境变量。

containernetworking/cni/libcni/api.go

// AddNetwork executes the plugin with the ADD command
func (c *CNIConfig) AddNetwork(net *NetworkConfig, rt *RuntimeConf) (types.Result, error) {
	pluginPath, err := invoke.FindInPath(net.Network.Type, c.Path)
	if err != nil {
		return nil, err
	}

	net, err = injectRuntimeConfig(net, rt)
	if err != nil {
		return nil, err
	}

	return invoke.ExecPluginWithResult(pluginPath, net.Bytes, c.args("ADD", rt))
}

Kubelet工作原理

Pod创建过程

Kubelet通过syncLoop对开始对Pod的整个生命周期进行管理。来自于配置文件,HTTP请求或者Kube-apiserver的配置更新,都触发syncLoop。

syncLoopIteration会一直监听Pod的配置和生命周期的更新,同时也是定时被触发。

Pod配置更新,支持五种kubetypes: ADD,UPDATE,REMOVE,RECONCILE,DELETE。这些更新会交给SyncHandler接口来处理,其实Kubelet自己实现了这个接口。

各Handler最终都会调用dispatchWork来执行这些操作。其中主要是执行podWorkers中的UpdatePod来执行更新Pod的操作。managePodLoop会调用Kubelet的syncPod方法,来最终同步pod状态到本地。

Reference