本文最后更新于:2020年7月6日 晚上

pkg/scheduler/plugins/gang/gang.go:47

Session 初始化时,Gang Plugin 将注册 validJobFn。validJobFn 会对每一轮调度的 Job 进行过滤

func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) {
	validJobFn := func(obj interface{}) *api.ValidateResult {
		job, ok := obj.(*api.JobInfo)
		if !ok {
			return &api.ValidateResult{
				Pass:    false,
				Message: fmt.Sprintf("Failed to convert <%v> to *JobInfo", obj),
			}
		}

		vtn := job.ValidTaskNum()
		if vtn < job.MinAvailable {
			return &api.ValidateResult{
				Pass:   false,
				Reason: v1alpha1.NotEnoughPodsReason,
				Message: fmt.Sprintf("Not enough valid tasks for gang-scheduling, valid: %d, min: %d",
					vtn, job.MinAvailable),
			}
		}
		return nil
	}
  ...
}

pkg/scheduler/api/job_info.go:400

Gang jobVaildFn 将判断有效的 Task 个数是否小于 podGroup 中设置的 minMember。若小于为真,则说明此 Job 不能满足 Gang 批调度条件

有效状态的 Task 状态如下,可是这些 TaskStatusIndex 状态是哪里来的呢?

// ValidTaskNum returns the number of tasks that are valid.
func (ji *JobInfo) ValidTaskNum() int32 {
	occupied := 0
	for status, tasks := range ji.TaskStatusIndex {
		if AllocatedStatus(status) ||
			status == Succeeded ||
			status == Pipelined ||
			status == Pending {
			occupied = occupied + len(tasks)
		}
	}

	return int32(occupied)
}

pkg/scheduler/cache/cache.go:282

Pod Informer 设置 List-Watch 相关的事件回调函数

// create informer for pod information
sc.podInformer = informerFactory.Core().V1().Pods()
sc.podInformer.Informer().AddEventHandler(
	cache.FilteringResourceEventHandler{
		FilterFunc: func(obj interface{}) bool {
			switch obj.(type) {
			case *v1.Pod:
				pod := obj.(*v1.Pod)
				if strings.Compare(pod.Spec.SchedulerName, schedulerName) == 0 && pod.Status.Phase == v1.PodPending {
					return true
				}
				return pod.Status.Phase != v1.PodPending
			default:
				return false
			}
		},
		Handler: cache.ResourceEventHandlerFuncs{
			AddFunc:    sc.AddPod,
			UpdateFunc: sc.UpdatePod,
			DeleteFunc: sc.DeletePod,
		},
	})

pkg/scheduler/cache/event_handlers.go:185

解析 Pod 变量,调用 addPod

// AddPod add pod to scheduler cache
func (sc *SchedulerCache) AddPod(obj interface{}) {
	pod, ok := obj.(*v1.Pod)
	if !ok {
		glog.Errorf("Cannot convert to *v1.Pod: %v", obj)
		return
	}

	sc.Mutex.Lock()
	defer sc.Mutex.Unlock()

	err := sc.addPod(pod)
	if err != nil {
		glog.Errorf("Failed to add pod <%s/%s> into cache: %v",
			pod.Namespace, pod.Name, err)
		return
	}
	glog.V(3).Infof("Added pod <%s/%v> into cache.", pod.Namespace, pod.Name)
	return
}

pkg/scheduler/cache/event_handlers.go:93

调用 NewTaskInfo,将 Pod 转换为 Task

调用 addTask,将构造的新 Task 添加到 Cache 中

// Assumes that lock is already acquired.
func (sc *SchedulerCache) addPod(pod *v1.Pod) error {
	pi := kbapi.NewTaskInfo(pod)

	return sc.addTask(pi)
}

pkg/scheduler/api/job_info.go:69

构造 Task,调用 getTaskStatus 将 Pod 的状态转换为 task.Status

// NewTaskInfo creates new taskInfo object for a Pod
func NewTaskInfo(pod *v1.Pod) *TaskInfo {
	req := GetPodResourceWithoutInitContainers(pod)
	initResreq := GetPodResourceRequest(pod)

	jobID := getJobID(pod)

	ti := &TaskInfo{
		UID:        TaskID(pod.UID),
		Job:        jobID,
		Name:       pod.Name,
		Namespace:  pod.Namespace,
		NodeName:   pod.Spec.NodeName,
		Status:     getTaskStatus(pod),
		Priority:   1,
		Pod:        pod,
		Resreq:     req,
		InitResreq: initResreq,
	}

	if pod.Spec.Priority != nil {
		ti.Priority = *pod.Spec.Priority
	}

	return ti
}

pkg/scheduler/api/helpers.go:35

解析 pod.Status.Phase 转换为 Task 的状态

func getTaskStatus(pod *v1.Pod) TaskStatus {
	switch pod.Status.Phase {
	case v1.PodRunning:
		if pod.DeletionTimestamp != nil {
			return Releasing
		}

		return Running
	case v1.PodPending:
		if pod.DeletionTimestamp != nil {
			return Releasing
		}

		if len(pod.Spec.NodeName) == 0 {
			return Pending
		}
		return Bound
	case v1.PodUnknown:
		return Unknown
	case v1.PodSucceeded:
		return Succeeded
	case v1.PodFailed:
		return Failed
	}

	return Unknown
}

pkg/scheduler/cache/event_handlers.go:72

调用 getOrCreateJob 得到 Task 对应的 Job

调用 AddTaskInfo 将 Task 加入到 Cache Job 中

func (sc *SchedulerCache) addTask(pi *kbapi.TaskInfo) error {
	job := sc.getOrCreateJob(pi)
	if job != nil {
		job.AddTaskInfo(pi)
	}

	if len(pi.NodeName) != 0 {
		if _, found := sc.Nodes[pi.NodeName]; !found {
			sc.Nodes[pi.NodeName] = kbapi.NewNodeInfo(nil)
		}

		node := sc.Nodes[pi.NodeName]
		if !isTerminated(pi.Status) {
			return node.AddTask(pi)
		}
	}

	return nil
}

pkg/scheduler/api/job_info.go:233

将构造好的 Task 添加到相应的 Cache Job 中

// AddTaskInfo is used to add a task to a job
func (ji *JobInfo) AddTaskInfo(ti *TaskInfo) {
	ji.Tasks[ti.UID] = ti
	ji.addTaskIndex(ti)

	ji.TotalRequest.Add(ti.Resreq)

	if AllocatedStatus(ti.Status) {
		ji.Allocated.Add(ti.Resreq)
	}
}

pkg/scheduler/api/job_info.go:224

现在可以回答一开始提出的问题了,gang jobVaildFn 中 task 的状态是通过pod的status映射到task中的


func (ji *JobInfo) addTaskIndex(ti *TaskInfo) {
	if _, found := ji.TaskStatusIndex[ti.Status]; !found {
		ji.TaskStatusIndex[ti.Status] = tasksMap{}
	}

	ji.TaskStatusIndex[ti.Status][ti.UID] = ti
}