三、源码剖析

在介绍代码之前,zouyee先带各位看一看CPU manager的启动图(CPU manager属于Container Manager模块的子系统)

对于上图的内容,zouyee总结流程如下:

1、在命令行启动局部,Kubelet中调用NewContainerManager构建ContainerManager

2、NewContainerManager函数调用topologymanager.NewManager构建拓扑管理器

3、NewContainerManager函数调用cpumanager.NewManager构建CPU管理器

4、拓扑管理器应用AddHintPriovider办法将CPU管理器退出治理

5、回到命令行启动局部,调用NewMainKubelet(),构建Kubelet构造体

6、构建Kubelet构造体时,将CPU管理器跟拓扑管理器封装为InternalContainerLifecycle接口,其实现Pod相干的生命周期资源管理操作,波及CPU相干的是PreStart办法

7、构建Kubelet构造体时,调用AddPodmitHandler将GetAllocateResourcesPodAdmitHandler办法退出到Pod准入插件中,在Pod创立时,资源预调配查看

8、构建Kubelet构造体后,调用ContainerManager的Start办法,ContainerManager在Start办法中调用CPU管理器的Start办法,其做一些解决工作并孵化一个goroutine,执行reconcileState()

上面顺次进行解说。

STEP 1

Kubelet中调用NewContainerManager构建ContainerManager, 波及代码为cmd/kubelet/app/server.go

在run函数中实现ContainerManager初始化工作

func run(ctx context.Context, 参数太长,不写全了){
    ....
    if kubeDeps.ContainerManager == nil {
    ...
    kubeDeps.ContainerManager, err = cm.NewContainerManager(
    ...
    )
    ...
    }
}

STEP 2-4    

NewContainerManager函数调用topologymanager.NewManager构建拓扑管理器,波及代码

pkg/kubelet/cm/container_manager_linux.go

func NewContainerManager(参数太长,不写全了) {
    if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager){
        // 判断个性是否开启,构建拓扑治理
        cm.topologyManager, err = topologymanager.NewManager(
            machineInfo.Topology,
            nodeConfig.ExperimentalTopologyManagerPolicy,
            nodeConfig.ExperimentalTopologyManagerScope,
        )
    }
    // 判断个性是否开启,构建CPU治理
    if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
        cm.cpuManager, err = cpumanager.NewManager(
            nodeConfig.ExperimentalCPUManagerPolicy,
            nodeConfig.ExperimentalCPUManagerReconcilePeriod,
            machineInfo,
            nodeConfig.NodeAllocatableConfig.ReservedSystemCPUs,
            cm.GetNodeAllocatableReservation(),
            nodeConfig.KubeletRootDir,
            cm.topologyManager,
        )
        if err != nil {
            klog.Errorf("failed to initialize cpu manager: %v", err)
            return nil, err
        }
        // 拓扑管理器应用AddHintPriovider办法将CPU管理器退出治理 
        cm.topologyManager.AddHintProvider(cm.cpuManager)
    }
}

其中对于CPU管理器的初始化:

type CPUTopology struct {
    NumCPUs    int
    NumCores   int
    NumSockets int
    CPUDetails CPUDetails
}

type CPUDetails map[int]CPUInfo

type CPUInfo struct {
    NUMANodeID int
    SocketID   int
    CoreID     int
}

func NewManager(参数太多,省略了) (Manager, error) {
    // 依据cpuPolicyName,决定初始化policy,以后反对none和static
    switch policyName(cpuPolicyName) {
        ...
        case PolicyStatic:
            // 1. 依据cadvisor的数据,生产topology构造体
            topo, err = topology.Discover(machineInfo)
            // 2. 查看reserved的CPU是否为0,须要kube+system reserved的CPU > 0
            // 3. 初始化policy
            policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity)
        ...
    }
}
   

STEP 5-7

在run函数中实现ContainerManager初始化工作后,调用RunKubelet函数构建Kubelet构造体,其最终调用NewMainKubelet(),实现Kubelet构造体构建。波及代码pkg/kubelet/kubelet.go

func NewMainKubelet(参数太长,不写全了)(*Kubelet, error) {
    ...
    klet := &Kubelet{
    ...
    containerManager: kubeDeps.ContainerManager,
    ...
    }
    ...
    runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
    ...
    // 构建Kubelet构造体时,将CPU管理器跟拓扑管理器封装为InternalContainerLifecycle接口
    kubeDeps.ContainerManager.InternalContainerLifecycle(),
    ...
    )
    ...
    // 调用AddPodmitHandler将GetAllocateResourcesPodAdmitHandler办法退出到Pod准入插件中,在Pod创立时,资源预调配查看
klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetAllocateResourcesPodAdmitHandler())
    ...
}

   其中在InternalContainerLifecycle接口,波及CPU局部在PreStartContainer办法,波及代码pkg/kubelet/cm/internal_container_lifecycle.go

func (i *internalContainerLifecycleImpl) PreStartContainer(参数太长,不写全了) error {
   if i.cpuManager != nil {
      i.cpuManager.AddContainer(pod, container, containerID)
   }

   if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {
      err := i.topologyManager.AddContainer(pod, containerID)
      if err != nil {
         return err
      }
   }
   return nil
}

那么何时调用呢?

下面咱们提到了kuberuntime.NewKubeGenericRuntimeManager,该函数实例化KubeGenericRuntimeManager构造体(后续具体介绍),而该构造体在startContainer办法中,进行调用,波及代码pkg/kubelet/kuberuntime/kuberuntime_container.go

// 用于启动容器,该构造体实现了Runtime接口
func (m *kubeGenericRuntimeManager) startContainer(参数太多,不写了) (string, error) {
    ...
    err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
    ...
}

另外GetAllocateResourcesPodAdmitHandler 须要实现返回的构造体须要实现Admit接口

波及代码pkg/kubelet/cm/container_manager_linux.go

func (m *resourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
   pod := attrs.Pod

   for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
      ...

      if m.cpuManager != nil {
         err = m.cpuManager.Allocate(pod, &container)
         ...
      }
   }

   return lifecycle.PodAdmitResult{Admit: true}
}

 理论调用逻辑为m.cpuManager.Allocate->m.policy.Allocate->func (p *staticPolicy) Allocate (none策略无需操作),波及代码pkg/kubelet/cm/cpumanager/policy_static.go

func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
     // 1. 如介绍所说,查看是否满足调配,即QOS为Guaranteed,且调配CPU为整型
   if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 {
           // 2. 获取是否调配过,调配过则更新即可
      if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
       ...
      }
      // 3. 获取亲和性拓扑
      hint := p.affinity.GetAffinity(string(pod.UID), container.Name)
      // 4. 依据numa亲和性进行调配
            cpuset, err := p.allocateCPUs(.. )
            // 5. 设置调配后果
            s.SetCPUSet(string(pod.UID), container.Name, cpuset)
            // 6. 设置reuse字段
            p.updateCPUsToReuse(pod, container, cpuset)

    }
    // container belongs in the shared pool (nothing to do; use default cpuset)
    return nil
}

STEP 8

构建实现Kubelet构造体后,在Kubelet办法initializeRuntimeDependentModules中调用ContainerManager的Start办法,波及代码pkg/kubelet/kubelet.go

func (kl *Kubelet) initializeRuntimeDependentModules() {
    ...
    // 这里依据咱们后面阐明的,须要cadvisor的数据,因而须要提前启动
    if err := kl.containerManager.Start(省略); err != nil {
        ...
    }
    ...
}

ContainerManager在Start办法中调用CPU管理器的Start办法,具体步骤如下:

a. 构建Checkpoint,其中蕴含文件及内存的操作

b. 依据初始化的policy,运行Start, 理论只有static起到作用,次要是校验工作

c. 孵化一个goroutine,执行reconcileState()

func (cm *containerManagerImpl) Start(参数太多,省略) error {
    ...
    // 初始化CPU管理器
    if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
        ...
        err = cm.cpuManager.Start(参数太多,省略)
        ...
    }
    ...
}
// 波及代码pkg/kubelet/cm/cpumanager/cpu_manager.go
func (m *manager) Start(参数太多,省略) error {
    ...
    // 该处为Checkpoint解决,理论为文件管理工作,即调配等状况的数据保留
    
    stateImpl, err := state.NewCheckpointState(m.stateFileDirectory, cpuManagerStateFileName, m.policy.Name(), m.containerMap)
    ...
    // 孵化一个goroutine,执行reconcileState()
    // 解决以后理论CPU调配的工作,相似actual与desired
    go wait.Until(func() { m.reconcileState() }, m.reconcilePeriod, wait.NeverStop)
}

    其中reconcileState  次要实现以下工作

    a. 解决以后沉闷Pod,更新containerMap构造体

    b. 通过CRI接口更新容器底层的CPU配置(即m.containerRuntime.UpdateContainerResources)

后续zouyee将带各位看看ContainerManager各大组件:拓扑治理、设施治理、容器治理等。

点击查看全文

后续相干内容,请查看公众号:DCOS

四、参考资料

1、cpuset

2、cpu topology

3、cpu manager policy