本文最后更新于:2020年7月6日 晚上
pkg/scheduler/cache/cache.go:318
podgroup informer 设置相应的 list-watch 回调处理函数
kbinformer := kbinfo.NewSharedInformerFactory(sc.kbclient, 0)
// create informer for PodGroup(v1alpha1) information
sc.podGroupInformerv1alpha1 = kbinformer.Scheduling().V1alpha1().PodGroups()
sc.podGroupInformerv1alpha1.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sc.AddPodGroupAlpha1,
UpdateFunc: sc.UpdatePodGroupAlpha1,
DeleteFunc: sc.DeletePodGroupAlpha1,
})
// create informer for PodGroup(v1alpha2) information
sc.podGroupInformerv1alpha2 = kbinformer.Scheduling().V1alpha2().PodGroups()
sc.podGroupInformerv1alpha2.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sc.AddPodGroupAlpha2,
UpdateFunc: sc.UpdatePodGroupAlpha2,
DeleteFunc: sc.DeletePodGroupAlpha2,
})
pkg/scheduler/cache/event_handlers.go:414
解析 podgroup 信息并调用 setPodGroup
// AddPodGroupAlpha1 add podgroup to scheduler cache
func (sc *SchedulerCache) AddPodGroupAlpha1(obj interface{}) {
ss, ok := obj.(*kbv1.PodGroup)
if !ok {
glog.Errorf("Cannot convert to *kbv1.PodGroup: %v", obj)
return
}
marshalled, err := json.Marshal(*ss)
if err != nil {
glog.Errorf("Failed to Marshal podgroup %s with error: %v", ss.Name, err)
}
pg := &api.PodGroup{}
err = json.Unmarshal(marshalled, pg)
if err != nil {
glog.Errorf("Failed to Unmarshal Data into api.PodGroup type with error: %v", err)
}
pg.Version = api.PodGroupVersionV1Alpha1
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
glog.V(4).Infof("Add PodGroup(%s) into cache, spec(%#v)", ss.Name, ss.Spec)
err = sc.setPodGroup(pg)
if err != nil {
glog.Errorf("Failed to add PodGroup %s into cache: %v", ss.Name, err)
return
}
return
}
pkg/scheduler/cache/event_handlers.go:370
通过 getJobID 设置 podGroup 对应的 Job ID,并将此 podGroup对应的 Job 和 Queue 尝试加入 cache 中
// Assumes that lock is already acquired.
func (sc *SchedulerCache) setPodGroup(ss *api.PodGroup) error {
job := getJobID(ss)
if len(job) == 0 {
return fmt.Errorf("the identity of PodGroup is empty")
}
if _, found := sc.Jobs[job]; !found {
sc.Jobs[job] = kbapi.NewJobInfo(job)
}
sc.Jobs[job].SetPodGroup(ss)
// TODO(k82cn): set default queue in admission.
if len(ss.Spec.Queue) == 0 {
sc.Jobs[job].Queue = kbapi.QueueID(sc.defaultQueue)
}
return nil
}
pkg/scheduler/cache/event_handlers.go:365
组合 pg.Namespace/pg.Name 的字符串为 job ID 唯一标识
func getJobID(pg *api.PodGroup) kbapi.JobID {
return kbapi.JobID(fmt.Sprintf("%s/%s", pg.Namespace, pg.Name))
}
综上所述,在 Kube-Batch 中 Job 与 podGroup 将进行一一映射
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!