本文最后更新于:2020年7月6日 晚上
pkg/scheduler/plugins/gang/gang.go:47
Session 初始化时,Gang Plugin 将注册 validJobFn。validJobFn 会对每一轮调度的 Job 进行过滤
func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) {
validJobFn := func(obj interface{}) *api.ValidateResult {
job, ok := obj.(*api.JobInfo)
if !ok {
return &api.ValidateResult{
Pass: false,
Message: fmt.Sprintf("Failed to convert <%v> to *JobInfo", obj),
}
}
vtn := job.ValidTaskNum()
if vtn < job.MinAvailable {
return &api.ValidateResult{
Pass: false,
Reason: v1alpha1.NotEnoughPodsReason,
Message: fmt.Sprintf("Not enough valid tasks for gang-scheduling, valid: %d, min: %d",
vtn, job.MinAvailable),
}
}
return nil
}
...
}
pkg/scheduler/api/job_info.go:400
Gang jobVaildFn 将判断有效的 Task 个数是否小于 podGroup 中设置的 minMember。若小于为真,则说明此 Job 不能满足 Gang 批调度条件
有效状态的 Task 状态如下,可是这些 TaskStatusIndex 状态是哪里来的呢?
// ValidTaskNum returns the number of tasks that are valid.
func (ji *JobInfo) ValidTaskNum() int32 {
occupied := 0
for status, tasks := range ji.TaskStatusIndex {
if AllocatedStatus(status) ||
status == Succeeded ||
status == Pipelined ||
status == Pending {
occupied = occupied + len(tasks)
}
}
return int32(occupied)
}
pkg/scheduler/cache/cache.go:282
Pod Informer 设置 List-Watch 相关的事件回调函数
// create informer for pod information
sc.podInformer = informerFactory.Core().V1().Pods()
sc.podInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch obj.(type) {
case *v1.Pod:
pod := obj.(*v1.Pod)
if strings.Compare(pod.Spec.SchedulerName, schedulerName) == 0 && pod.Status.Phase == v1.PodPending {
return true
}
return pod.Status.Phase != v1.PodPending
default:
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: sc.AddPod,
UpdateFunc: sc.UpdatePod,
DeleteFunc: sc.DeletePod,
},
})
pkg/scheduler/cache/event_handlers.go:185
解析 Pod 变量,调用 addPod
// AddPod add pod to scheduler cache
func (sc *SchedulerCache) AddPod(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
glog.Errorf("Cannot convert to *v1.Pod: %v", obj)
return
}
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
err := sc.addPod(pod)
if err != nil {
glog.Errorf("Failed to add pod <%s/%s> into cache: %v",
pod.Namespace, pod.Name, err)
return
}
glog.V(3).Infof("Added pod <%s/%v> into cache.", pod.Namespace, pod.Name)
return
}
pkg/scheduler/cache/event_handlers.go:93
调用 NewTaskInfo,将 Pod 转换为 Task
调用 addTask,将构造的新 Task 添加到 Cache 中
// Assumes that lock is already acquired.
func (sc *SchedulerCache) addPod(pod *v1.Pod) error {
pi := kbapi.NewTaskInfo(pod)
return sc.addTask(pi)
}
pkg/scheduler/api/job_info.go:69
构造 Task,调用 getTaskStatus 将 Pod 的状态转换为 task.Status
// NewTaskInfo creates new taskInfo object for a Pod
func NewTaskInfo(pod *v1.Pod) *TaskInfo {
req := GetPodResourceWithoutInitContainers(pod)
initResreq := GetPodResourceRequest(pod)
jobID := getJobID(pod)
ti := &TaskInfo{
UID: TaskID(pod.UID),
Job: jobID,
Name: pod.Name,
Namespace: pod.Namespace,
NodeName: pod.Spec.NodeName,
Status: getTaskStatus(pod),
Priority: 1,
Pod: pod,
Resreq: req,
InitResreq: initResreq,
}
if pod.Spec.Priority != nil {
ti.Priority = *pod.Spec.Priority
}
return ti
}
pkg/scheduler/api/helpers.go:35
解析 pod.Status.Phase 转换为 Task 的状态
func getTaskStatus(pod *v1.Pod) TaskStatus {
switch pod.Status.Phase {
case v1.PodRunning:
if pod.DeletionTimestamp != nil {
return Releasing
}
return Running
case v1.PodPending:
if pod.DeletionTimestamp != nil {
return Releasing
}
if len(pod.Spec.NodeName) == 0 {
return Pending
}
return Bound
case v1.PodUnknown:
return Unknown
case v1.PodSucceeded:
return Succeeded
case v1.PodFailed:
return Failed
}
return Unknown
}
pkg/scheduler/cache/event_handlers.go:72
调用 getOrCreateJob 得到 Task 对应的 Job
调用 AddTaskInfo 将 Task 加入到 Cache Job 中
func (sc *SchedulerCache) addTask(pi *kbapi.TaskInfo) error {
job := sc.getOrCreateJob(pi)
if job != nil {
job.AddTaskInfo(pi)
}
if len(pi.NodeName) != 0 {
if _, found := sc.Nodes[pi.NodeName]; !found {
sc.Nodes[pi.NodeName] = kbapi.NewNodeInfo(nil)
}
node := sc.Nodes[pi.NodeName]
if !isTerminated(pi.Status) {
return node.AddTask(pi)
}
}
return nil
}
pkg/scheduler/api/job_info.go:233
将构造好的 Task 添加到相应的 Cache Job 中
// AddTaskInfo is used to add a task to a job
func (ji *JobInfo) AddTaskInfo(ti *TaskInfo) {
ji.Tasks[ti.UID] = ti
ji.addTaskIndex(ti)
ji.TotalRequest.Add(ti.Resreq)
if AllocatedStatus(ti.Status) {
ji.Allocated.Add(ti.Resreq)
}
}
pkg/scheduler/api/job_info.go:224
现在可以回答一开始提出的问题了,gang jobVaildFn 中 task 的状态是通过pod的status映射到task中的
func (ji *JobInfo) addTaskIndex(ti *TaskInfo) {
if _, found := ji.TaskStatusIndex[ti.Status]; !found {
ji.TaskStatusIndex[ti.Status] = tasksMap{}
}
ji.TaskStatusIndex[ti.Status][ti.UID] = ti
}
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!