本文最后更新于:2020年7月6日 晚上

pkg/scheduler/cache/cache.go:318
podgroup informer 设置相应的 list-watch 回调处理函数

kbinformer := kbinfo.NewSharedInformerFactory(sc.kbclient, 0)
// create informer for PodGroup(v1alpha1) information
sc.podGroupInformerv1alpha1 = kbinformer.Scheduling().V1alpha1().PodGroups()
sc.podGroupInformerv1alpha1.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
	AddFunc:    sc.AddPodGroupAlpha1,
	UpdateFunc: sc.UpdatePodGroupAlpha1,
	DeleteFunc: sc.DeletePodGroupAlpha1,
})

// create informer for PodGroup(v1alpha2) information
sc.podGroupInformerv1alpha2 = kbinformer.Scheduling().V1alpha2().PodGroups()
sc.podGroupInformerv1alpha2.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
	AddFunc:    sc.AddPodGroupAlpha2,
	UpdateFunc: sc.UpdatePodGroupAlpha2,
	DeleteFunc: sc.DeletePodGroupAlpha2,
})

pkg/scheduler/cache/event_handlers.go:414
解析 podgroup 信息并调用 setPodGroup

// AddPodGroupAlpha1 add podgroup to scheduler cache
func (sc *SchedulerCache) AddPodGroupAlpha1(obj interface{}) {
	ss, ok := obj.(*kbv1.PodGroup)
	if !ok {
		glog.Errorf("Cannot convert to *kbv1.PodGroup: %v", obj)
		return
	}

	marshalled, err := json.Marshal(*ss)
	if err != nil {
		glog.Errorf("Failed to Marshal podgroup %s with error: %v", ss.Name, err)
	}

	pg := &api.PodGroup{}
	err = json.Unmarshal(marshalled, pg)
	if err != nil {
		glog.Errorf("Failed to Unmarshal Data into api.PodGroup type with error: %v", err)
	}
	pg.Version = api.PodGroupVersionV1Alpha1

	sc.Mutex.Lock()
	defer sc.Mutex.Unlock()

	glog.V(4).Infof("Add PodGroup(%s) into cache, spec(%#v)", ss.Name, ss.Spec)

	err = sc.setPodGroup(pg)
	if err != nil {
		glog.Errorf("Failed to add PodGroup %s into cache: %v", ss.Name, err)
		return
	}
	return
}

pkg/scheduler/cache/event_handlers.go:370
通过 getJobID 设置 podGroup 对应的 Job ID,并将此 podGroup对应的 Job 和 Queue 尝试加入 cache 中

// Assumes that lock is already acquired.
func (sc *SchedulerCache) setPodGroup(ss *api.PodGroup) error {
	job := getJobID(ss)

	if len(job) == 0 {
		return fmt.Errorf("the identity of PodGroup is empty")
	}

	if _, found := sc.Jobs[job]; !found {
		sc.Jobs[job] = kbapi.NewJobInfo(job)
	}

	sc.Jobs[job].SetPodGroup(ss)

	// TODO(k82cn): set default queue in admission.
	if len(ss.Spec.Queue) == 0 {
		sc.Jobs[job].Queue = kbapi.QueueID(sc.defaultQueue)
	}

	return nil
}

pkg/scheduler/cache/event_handlers.go:365
组合 pg.Namespace/pg.Name 的字符串为 job ID 唯一标识

func getJobID(pg *api.PodGroup) kbapi.JobID {
	return kbapi.JobID(fmt.Sprintf("%s/%s", pg.Namespace, pg.Name))
}

综上所述,在 Kube-Batch 中 Job 与 podGroup 将进行一一映射