
Kubernetes Source Code Reading: Kubelet

Posted by Robin on March 6, 2017

The version of Kubernetes source code is V1.5.3.



每个pod都有一个goroutines负责update pod。

  • isWorking:如果pod的一个update正在被处理,其他update会被ignore,但是killing的update除外。killing的update会被临时记录在lastUndeliveredWorkUpdate中,等当前正在处理的update完成后,再立即执行。
  • UpdatePod:执行pod update。如果pod没有update goroutine(新pod,或者kubelet重启),则为该pod新建一个goroutine。


增删改查pod,并维护static pod与mirror pod映射关系。

static pod:非来自apiserver的pod,即来自file或者http的pod,apiserver不知道这些pod的存在。kubelet为这些pod,通过apiserver创建mirror pod。static pod与其对应的mirror pod有相同的pod full name(namespace/podname)。



  • ClusterFirst:如果有Cluster DNS(kube-dns),则优先使用Cluster DNS。否则,回退到Default模式。
  • Default:使用由Kubelet决定的Default DNS。如果Kubelet使用--cluster-dns=,则该Node上的所有Container的/etc/resolv.conf中都会添加nameserver。如果Kubelet没有使用--cluster-dns,则默认使用Node上的DNS解析。


// DNSPolicy defines how a pod's DNS will be configured.
type DNSPolicy string

const (
	// DNSClusterFirst indicates that the pod should use cluster DNS
	// first, if it is available, then fall back on the default (as
	// determined by kubelet) DNS settings.
	DNSClusterFirst DNSPolicy = "ClusterFirst"

	// DNSDefault indicates that the pod should use the default (as
	// determined by kubelet) DNS settings.
	DNSDefault DNSPolicy = "Default"



// GetClusterDNS returns a list of the DNS servers and a list of the DNS search
// domains of the cluster.
func (kl *Kubelet) GetClusterDNS(pod *api.Pod) ([]string, []string, error) {
	var hostDNS, hostSearch []string
	// Get host DNS settings
	if kl.resolverConfig != "" {
		f, err := os.Open(kl.resolverConfig)
		if err != nil {
			return nil, nil, err
		defer f.Close()

		hostDNS, hostSearch, err = kl.parseResolvConf(f)
		if err != nil {
			return nil, nil, err
	useClusterFirstPolicy := pod.Spec.DNSPolicy == api.DNSClusterFirst
	if useClusterFirstPolicy && kl.clusterDNS == nil {
		// clusterDNS is not known.
		// pod with ClusterDNSFirst Policy cannot be created
		kl.recorder.Eventf(pod, api.EventTypeWarning, "MissingClusterDNS", "kubelet does not have ClusterDNS IP configured and cannot create Pod using %q policy. Falling back to DNSDefault policy.", pod.Spec.DNSPolicy)
		log := fmt.Sprintf("kubelet does not have ClusterDNS IP configured and cannot create Pod using %q policy. pod: %q. Falling back to DNSDefault policy.", pod.Spec.DNSPolicy, format.Pod(pod))
		kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, "MissingClusterDNS", log)

		// fallback to DNSDefault
		useClusterFirstPolicy = false

	if !useClusterFirstPolicy {
		// When the kubelet --resolv-conf flag is set to the empty string, use
		// DNS settings that override the docker default (which is to use
		// /etc/resolv.conf) and effectively disable DNS lookups. According to
		// the bind documentation, the behavior of the DNS client library when
		// "nameservers" are not specified is to "use the nameserver on the
		// local machine". A nameserver setting of localhost is equivalent to
		// this documented behavior.
		if kl.resolverConfig == "" {
			hostDNS = []string{""}
			hostSearch = []string{"."}
		return hostDNS, hostSearch, nil

	// for a pod with DNSClusterFirst policy, the cluster DNS server is the only nameserver configured for
	// the pod. The cluster DNS server itself will forward queries to other nameservers that is configured to use,
	// in case the cluster DNS server cannot resolve the DNS query itself
	dns := []string{kl.clusterDNS.String()}

	var dnsSearch []string
	if kl.clusterDomain != "" {
		nsSvcDomain := fmt.Sprintf("%s.svc.%s", pod.Namespace, kl.clusterDomain)
		svcDomain := fmt.Sprintf("svc.%s", kl.clusterDomain)
		dnsSearch = append([]string{nsSvcDomain, svcDomain, kl.clusterDomain}, hostSearch...)
	} else {
		dnsSearch = hostSearch
	return dns, dnsSearch, nil

备注Kubernetes 1.6的Kube DNS支持Private DNS Zones和Upstream Nameservers,使Pod的DNS解析更加灵活。 参考:Configuring Private DNS Zones and Upstream Nameservers in Kubernetes


cache pod status,并同步pod status到apiserver。两种同步触发机制:

  • 隔10秒一次
  • pod状态更新:container ready,或者terminated,或者直接SetPodStatus


const syncPeriod = 10 * time.Second

func (m *manager) Start() {
    // Don't start the status manager if we don't have a client. This will happen
    // on the master, where the kubelet is responsible for bootstrapping the pods
    // of the master components.
    if m.kubeClient == nil {
        glog.Infof("Kubernetes client is nil, not starting status manager.")

    glog.Info("Starting to sync pod status with apiserver")
    syncTicker := time.Tick(syncPeriod)
    // syncPod and syncBatch share the same go routine to avoid sync races.
    go wait.Forever(func() {
        select {
        case syncRequest := <-m.podStatusChannel:
            m.syncPod(syncRequest.podUID, syncRequest.status)
        case <-syncTicker:
    }, 0)



  • DockerManager

实现kubecontainer.Runtime接口,封装了DockerInterface的Client,负责与Docker Daemon交互。Runtime接口中的各操作,最终是调用Docker client实现。 部分Docker Client返回的数据,需要从dockertypes类型转化为kubecontainer类型。例如,通过dockertools/convert.go中的toRuntimeImage和toRuntimeContainer分别转化dockertypes的image和container。

  • rkt.Runtime

实现kubecontainer.Runtime接口,封装了rktapi.PublicAPIClient的apisvc,负责与rkt apiserver交互。Runtime接口中的各操作,最终是调用apisvc实现。



  • MinAge:image最小存活时间
  • HighThresholdPercent:镜像磁盘使用率高于该值,触发GC。即镜像磁盘的最高使用率
  • LowThresholdPercent: 镜像磁盘使用率低于该值,不触发GC。即镜像磁盘GC后的最低使用率。


// A policy for garbage collecting images. Policy defines an allowed band in
// which garbage collection will be run.
type ImageGCPolicy struct {
    // Any usage above this threshold will always trigger garbage collection.
    // This is the highest usage we will allow.
    HighThresholdPercent int

    // Any usage below this threshold will never trigger garbage collection.
    // This is the lowest threshold we will try to garbage collect to.
    LowThresholdPercent int

    // Minimum age at which an image can be garbage collected.
    MinAge time.Duration



  • MinAge:容器最小存活时间
  • MaxPerPodContainer:每个pod最大容器数
  • MaxContainers:整个node上最大容器数



// Specified a policy for garbage collecting containers.
type ContainerGCPolicy struct {
    // Minimum age at which a container can be garbage collected, zero for no limit.
    MinAge time.Duration
    // Max number of dead containers any single pod (UID, container name) pair is
    // allowed to have, less than zero for no limit.
    MaxPerPodContainer int
    // Max number of total dead containers, less than zero for no limit.
    MaxContainers int
// Manages garbage collection of dead containers.
// Implementation is thread-compatible.
type ContainerGC interface {
    // Garbage collect containers.
    GarbageCollect(allSourcesReady bool) error
func (cgc *realContainerGC) GarbageCollect(allSourcesReady bool) error {
    return cgc.runtime.GarbageCollect(cgc.policy, allSourcesReady)


  • evictUnits:可以解析出container name,根据GCPolicy选择性删除。
  • unidentifiedContainers:不能解析出container name,一定删除。



func CreateAndInitKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, standaloneMode bool) (k kubelet.KubeletBootstrap, err error) {
	// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
	// up into "per source" synchronizations

	k, err = kubelet.NewMainKubelet(kubeCfg, kubeDeps, standaloneMode)
	if err != nil {
		return nil, err



	return k, nil


// Starts garbage collection threads.
func (kl *Kubelet) StartGarbageCollection() {
	loggedContainerGCFailure := false
	go wait.Until(func() {
		if err := kl.containerGC.GarbageCollect(kl.sourcesReady.AllReady()); err != nil {
			glog.Errorf("Container garbage collection failed: %v", err)
			kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, events.ContainerGCFailed, err.Error())
			loggedContainerGCFailure = true
		} else {
			var vLevel glog.Level = 4
			if loggedContainerGCFailure {
				vLevel = 1
				loggedContainerGCFailure = false

			glog.V(vLevel).Infof("Container garbage collection succeeded")
	}, ContainerGCPeriod, wait.NeverStop)

	loggedImageGCFailure := false
	go wait.Until(func() {
		if err := kl.imageManager.GarbageCollect(); err != nil {
			glog.Errorf("Image garbage collection failed: %v", err)
			kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, events.ImageGCFailed, err.Error())
			loggedImageGCFailure = true
		} else {
			var vLevel glog.Level = 4
			if loggedImageGCFailure {
				vLevel = 1
				loggedImageGCFailure = false

			glog.V(vLevel).Infof("Image garbage collection succeeded")
	}, ImageGCPeriod, wait.NeverStop)


Kubelet是通过kubeDeps获得NetworkPlugins信息。kubeDeps.NetworkPlugins是调用ProbeNetworkPlugins()收集所有Network Plugin。除了CNI的network plugin,还会收集NetworkPluginDir中的。

InitNetworkPlugin()时,会根据NetworkPluginName来选择需要的Network Plugin,然后调用plugin的Init()进行初始化。对于CNI plugin,只用到host参数,hairpinMode,nonMasqueradeCIDR以及mtu都没有用到。


Kubelet中实现CNI Network Plugin的主要代码在pkg/kubelet/network/cni/cni.go



  • NetworkConfig:/etc/cni/net.d下面的config
  • CNIConfig:All paths to store CNI binaries, include /opt/cni/bin${vendorCNIDirPrefix}/opt/${pluginType}/bin。并实现CNI接口。


  • addToNetwork:根据Pod信息,buildCNIRuntimeConf(),然后调用CNI接口AddNetwork(),将Pod添加到网络中。
  • deleteFromNetwork:与addToNetwork()类似,只是调用CNI接口DelNetwork(),将Pod从网络中删除。



  • loNetwork:getLoNetwork()获取loopback,必须有,没有导致无法调用CNI network
      func getLoNetwork(binDir, vendorDirPrefix string) *cniNetwork {
          loConfig, err := libcni.ConfFromBytes([]byte(`{
        "cniVersion": "0.1.0",
        "name": "cni-loopback",
        "type": "loopback"
          if err != nil {
              // The hardcoded config above should always be valid and unit tests will
              // catch this
          cninet := &libcni.CNIConfig{
              Path: []string{vendorCNIDir(vendorDirPrefix, loConfig.Network.Type), binDir},
          loNetwork := &cniNetwork{
              name:          "lo",
              NetworkConfig: loConfig,
              CNIConfig:     cninet,
          return loNetwork
  • defaultNetwork:从/etc/cni/net.d下面读取config,只会读取第一个正确的,因此不要有多个
      func getDefaultCNINetwork(pluginDir, binDir, vendorCNIDirPrefix string) (*cniNetwork, error) {
          if pluginDir == "" {
              pluginDir = DefaultNetDir
          files, err := libcni.ConfFiles(pluginDir)
          switch {
          case err != nil:
              return nil, err
          case len(files) == 0:
              return nil, fmt.Errorf("No networks found in %s", pluginDir)
          for _, confFile := range files {
              conf, err := libcni.ConfFromFile(confFile)
              if err != nil {
                  glog.Warningf("Error loading CNI config file %s: %v", confFile, err)
              // Search for vendor-specific plugins as well as default plugins in the CNI codebase.
              vendorDir := vendorCNIDir(vendorCNIDirPrefix, conf.Network.Type)
              cninet := &libcni.CNIConfig{
                  Path: []string{binDir, vendorDir},
              network := &cniNetwork{name: conf.Network.Name, NetworkConfig: conf, CNIConfig: cninet}
              return network, nil
          return nil, fmt.Errorf("No valid networks found in %s", pluginDir)

CNI Interface



type CNI interface {
	AddNetworkList(net *NetworkConfigList, rt *RuntimeConf) (types.Result, error)
	DelNetworkList(net *NetworkConfigList, rt *RuntimeConf) error

	AddNetwork(net *NetworkConfig, rt *RuntimeConf) (types.Result, error)
	DelNetwork(net *NetworkConfig, rt *RuntimeConf) error



// AddNetwork executes the plugin with the ADD command
func (c *CNIConfig) AddNetwork(net *NetworkConfig, rt *RuntimeConf) (types.Result, error) {
	pluginPath, err := invoke.FindInPath(net.Network.Type, c.Path)
	if err != nil {
		return nil, err

	net, err = injectRuntimeConfig(net, rt)
	if err != nil {
		return nil, err

	return invoke.ExecPluginWithResult(pluginPath, net.Bytes, c.args("ADD", rt))





Pod配置更新,支持五种kubetypes: ADD,UPDATE,REMOVE,RECONCILE,DELETE。这些更新会交给SyncHandler接口来处理,其实Kubelet自己实现了这个接口。

