本文最后更新于:2020年7月6日 晚上

剖析源码版本为 v0.5.0,commit 010ef715

cmd/kube-batch/main.go:54

if err := app.Run(s); err != nil {
	fmt.Fprintf(os.Stderr, "%v\n", err)
	os.Exit(1)
}

cmd/kube-batch/app/server.go:133

leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
	Lock:          rl,
	LeaseDuration: leaseDuration,
	RenewDeadline: renewDeadline,
	RetryPeriod:   retryPeriod,
	Callbacks: leaderelection.LeaderCallbacks{
		OnStartedLeading: run,
		OnStoppedLeading: func() {
			glog.Fatalf("leaderelection lost")
		},
	},
})

pkg/scheduler/scheduler.go:67

相应的资源 Informer 开始 Iist-Watch 监听事件变化

// Run  starts the schedulerCache
func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
	go sc.pdbInformer.Informer().Run(stopCh)
	go sc.podInformer.Informer().Run(stopCh)
	go sc.nodeInformer.Informer().Run(stopCh)
	go sc.podGroupInformerv1alpha1.Informer().Run(stopCh)
	go sc.podGroupInformerv1alpha2.Informer().Run(stopCh)
	go sc.pvInformer.Informer().Run(stopCh)
	go sc.pvcInformer.Informer().Run(stopCh)
	go sc.scInformer.Informer().Run(stopCh)
	go sc.queueInformerv1alpha1.Informer().Run(stopCh)
	go sc.queueInformerv1alpha2.Informer().Run(stopCh)

	if options.ServerOpts.EnablePriorityClass {
		go sc.pcInformer.Informer().Run(stopCh)
	}

	// Re-sync error tasks.
	go wait.Until(sc.processResyncTask, 0, stopCh)

	// Cleanup jobs.
	go wait.Until(sc.processCleanupJob, 0, stopCh)
}

pkg/scheduler/scheduler.go:71

加载调度规则配置,默认为 defaultSchedulerConf

// Run runs the Scheduler
func (pc *Scheduler) Run(stopCh <-chan struct{}) {
	var err error

	// Start cache for policy.
	go pc.cache.Run(stopCh)
	pc.cache.WaitForCacheSync(stopCh)

	// Load configuration of scheduler
	schedConf := defaultSchedulerConf
	if len(pc.schedulerConf) != 0 {
		if schedConf, err = readSchedulerConf(pc.schedulerConf); err != nil {
			glog.Errorf("Failed to read scheduler configuration '%s', using default configuration: %v",
				pc.schedulerConf, err)
			schedConf = defaultSchedulerConf
		}
	}

	pc.actions, pc.plugins, err = loadSchedulerConf(schedConf)
	if err != nil {
		panic(err)
	}

	go wait.Until(pc.runOnce, pc.schedulePeriod, stopCh)
}

pkg/scheduler/util.go:31

var defaultSchedulerConf = `
actions: "allocate, backfill"
tiers:
- plugins:
  - name: priority
  - name: gang
- plugins:
  - name: drf
  - name: predicates
  - name: proportion
  - name: nodeorder
`

pkg/scheduler/util.go:44

解析调度配置,返回每个 Action 的 Action interface 实例并设置 Tiers 的具体配置参数

func loadSchedulerConf(confStr string) ([]framework.Action, []conf.Tier, error) {
	var actions []framework.Action

	schedulerConf := &conf.SchedulerConfiguration{}

	buf := make([]byte, len(confStr))
	copy(buf, confStr)

	if err := yaml.Unmarshal(buf, schedulerConf); err != nil {
		return nil, nil, err
	}

	// Set default settings for each plugin if not set
	for i, tier := range schedulerConf.Tiers {
		for j := range tier.Plugins {
			plugins.ApplyPluginConfDefaults(&schedulerConf.Tiers[i].Plugins[j])
		}
	}

	actionNames := strings.Split(schedulerConf.Actions, ",")
	for _, actionName := range actionNames {
		if action, found := framework.GetAction(strings.TrimSpace(actionName)); found {
			actions = append(actions, action)
		} else {
			return nil, nil, fmt.Errorf("failed to found Action %s, ignore it", actionName)
		}
	}

	return actions, schedulerConf.Tiers, nil
}

pkg/scheduler/plugins/defaults.go:22

根据配置设置每个 Tier 的具体调度参数

// ApplyPluginConfDefaults sets option's filed to its default value if not set
func ApplyPluginConfDefaults(option *conf.PluginOption) {
	t := true

	if option.EnabledJobOrder == nil {
		option.EnabledJobOrder = &t
	}
	if option.EnabledJobReady == nil {
		option.EnabledJobReady = &t
	}
	if option.EnabledJobPipelined == nil {
		option.EnabledJobPipelined = &t
	}
	if option.EnabledTaskOrder == nil {
		option.EnabledTaskOrder = &t
	}
	if option.EnabledPreemptable == nil {
		option.EnabledPreemptable = &t
	}
	if option.EnabledReclaimable == nil {
		option.EnabledReclaimable = &t
	}
	if option.EnabledQueueOrder == nil {
		option.EnabledQueueOrder = &t
	}
	if option.EnabledPredicate == nil {
		option.EnabledPredicate = &t
	}
	if option.EnabledNodeOrder == nil {
		option.EnabledNodeOrder = &t
	}
}

pkg/scheduler/actions/factory.go:28

内置的 4 个 Action 之前已初始化

func init() {
	framework.RegisterAction(reclaim.New())
	framework.RegisterAction(allocate.New())
	framework.RegisterAction(backfill.New())
	framework.RegisterAction(preempt.New())
}

pkg/scheduler/scheduler.go:85

开始调度,默认间隔1s尝试对所有待调度实例,进行一次调度计算

go wait.Until(pc.runOnce, pc.schedulePeriod, stopCh)

pkg/scheduler/scheduler.go:94

初始化贯穿本次调度生命周期的 session,拥有此次调度的 cache snapshot

ssn := framework.OpenSession(pc.cache, pc.plugins)
defer framework.CloseSession(ssn)

pkg/scheduler/framework/framework.go:31

// OpenSession start the session
func OpenSession(cache cache.Cache, tiers []conf.Tier) *Session {
	ssn := openSession(cache)
	ssn.Tiers = tiers

	for _, tier := range tiers {
		for _, plugin := range tier.Plugins {
			if pb, found := GetPluginBuilder(plugin.Name); !found {
				glog.Errorf("Failed to get plugin %s.", plugin.Name)
			} else {
				plugin := pb(plugin.Arguments)
				ssn.plugins[plugin.Name()] = plugin
			}
		}
	}

	for _, plugin := range ssn.plugins {
		onSessionOpenStart := time.Now()
		plugin.OnSessionOpen(ssn)
		metrics.UpdatePluginDuration(plugin.Name(), metrics.OnSessionOpen, metrics.Duration(onSessionOpenStart))
	}

	return ssn
}

pkg/scheduler/framework/session.go:86

初始化 ssn 并保存当前时刻集群某些资源的 Cache SnapShot 快照

func openSession(cache cache.Cache) *Session {
	ssn := &Session{
		UID:   uuid.NewUUID(),
		cache: cache,

		Jobs:   map[api.JobID]*api.JobInfo{},
		Nodes:  map[string]*api.NodeInfo{},
		Queues: map[api.QueueID]*api.QueueInfo{},

		plugins:          map[string]Plugin{},
		jobOrderFns:      map[string]api.CompareFn{},
		queueOrderFns:    map[string]api.CompareFn{},
		taskOrderFns:     map[string]api.CompareFn{},
		predicateFns:     map[string]api.PredicateFn{},
		preemptableFns:   map[string]api.EvictableFn{},
		reclaimableFns:   map[string]api.EvictableFn{},
		overusedFns:      map[string]api.ValidateFn{},
		jobReadyFns:      map[string]api.ValidateFn{},
		jobPipelinedFns:  map[string]api.ValidateFn{},
		jobValidFns:      map[string]api.ValidateExFn{},
		nodePrioritizers: map[string][]algorithm.PriorityConfig{},
	}

	snapshot := cache.Snapshot()
	...
}

pkg/scheduler/cache/cache.go:616

Cache SnapShot 包含 Ready Node 信息、Queue 信息和指定了 podGroup 和 Queue 的 Job,并为 Job 设置 Priority

snapshot := &kbapi.ClusterInfo{
	Nodes:  make(map[string]*kbapi.NodeInfo),
	Jobs:   make(map[kbapi.JobID]*kbapi.JobInfo),
	Queues: make(map[kbapi.QueueID]*kbapi.QueueInfo),
}

for _, value := range sc.Nodes {
	if !value.Ready() {
		continue
	}

	snapshot.Nodes[value.Name] = value.Clone()
}

for _, value := range sc.Queues {
	snapshot.Queues[value.UID] = value.Clone()
}

for _, value := range sc.Jobs {
	// If no scheduling spec, does not handle it.
	if value.PodGroup == nil && value.PDB == nil {
		glog.V(4).Infof("The scheduling spec of Job <%v:%s/%s> is nil, ignore it.",
			value.UID, value.Namespace, value.Name)

		continue
	}

	if _, found := snapshot.Queues[value.Queue]; !found {
		glog.V(3).Infof("The Queue <%v> of Job <%v/%v> does not exist, ignore it.",
			value.Queue, value.Namespace, value.Name)
		continue
	}

	if value.PodGroup != nil {
		value.Priority = sc.defaultPriority

		priName := value.PodGroup.Spec.PriorityClassName
		if priorityClass, found := sc.PriorityClasses[priName]; found {
			value.Priority = priorityClass.Value
		}

		glog.V(4).Infof("The priority of job <%s/%s> is <%s/%d>",
			value.Namespace, value.Name, priName, value.Priority)
	}

	snapshot.Jobs[value.UID] = value.Clone()
}

pkg/scheduler/framework/session.go:89

过滤无效的,不需要进行调度计算的 Job,无效的 Job 将从 Cache SnapShot 中被删除

ssn.Jobs = snapshot.Jobs
for _, job := range ssn.Jobs {
	if vjr := ssn.JobValid(job); vjr != nil {
		if !vjr.Pass {
			jc := &api.PodGroupCondition{
				Type:               api.PodGroupUnschedulableType,
				Status:             v1.ConditionTrue,
				LastTransitionTime: metav1.Now(),
				TransitionID:       string(ssn.UID),
				Reason:             vjr.Reason,
				Message:            vjr.Message,
			}

			if err := ssn.UpdateJobCondition(job, jc); err != nil {
				glog.Errorf("Failed to update job condition: %v", err)
			}
		}

		delete(ssn.Jobs, job.UID)
	}
}

pkg/scheduler/framework/session_plugins.go:224

根据 ssn 中注册的 Tiers Plugin 所提供的 jobVaildFn(此时 ssn 中的 jobVaildFn 还并未注册,后续通过每个 Plugin 的构造函数,将注册 Plugin 所包含的各个维度的调度函数)

最短路径只要遇到 vaildFn 判断为无效的 Job 即立刻返回

// JobValid invoke jobvalid function of the plugins
func (ssn *Session) JobValid(obj interface{}) *api.ValidateResult {
	for _, tier := range ssn.Tiers {
		for _, plugin := range tier.Plugins {
			jrf, found := ssn.jobValidFns[plugin.Name]
			if !found {
				continue
			}

			if vr := jrf(obj); vr != nil && !vr.Pass {
				return vr
			}

		}
	}

	return nil
}

pkg/scheduler/framework/framework.go:36

初始化 ssn 后,使用 pluginBuilders 中存储的 Plugin 对应 New 方法实例化 Plugin 对象

// GetPluginBuilder get the pluginbuilder by name
func GetPluginBuilder(name string) (PluginBuilder, bool) {
	pluginMutex.Lock()
	defer pluginMutex.Unlock()

	pb, found := pluginBuilders[name]
	return pb, found
}

pkg/scheduler/plugins/factory.go:31

Plugin 的 Builder 方法在之前也已注册

func init() {
	// Plugins for Jobs
	framework.RegisterPluginBuilder("drf", drf.New)
	framework.RegisterPluginBuilder("gang", gang.New)
	framework.RegisterPluginBuilder("predicates", predicates.New)
	framework.RegisterPluginBuilder("priority", priority.New)
	framework.RegisterPluginBuilder("nodeorder", nodeorder.New)
	framework.RegisterPluginBuilder("conformance", conformance.New)

	// Plugins for Queues
	framework.RegisterPluginBuilder("proportion", proportion.New)
}

pkg/scheduler/framework/framework.go:47

进行 Plugin 函数注册,此时才将 Plugin 各个调度维度的函数注册

for _, plugin := range ssn.plugins {
	onSessionOpenStart := time.Now()
	plugin.OnSessionOpen(ssn)
	metrics.UpdatePluginDuration(plugin.Name(), metrics.OnSessionOpen, metrics.Duration(onSessionOpenStart))
}

return ssn

pkg/scheduler/plugins/gang/gang.go:69

举例 Gang Plugin,将会注册 JobVaildFn 方法,在过滤 Cache SnapShot 中无效 Job 时被调用

判断 Job 中包含的 Vaild Task 数量是否小于 podGroup 中设置的 minMember, 因为 Vaild Task 小于 minMember 的不满足 Gang 批调度条件

func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) {
	validJobFn := func(obj interface{}) *api.ValidateResult {
		job, ok := obj.(*api.JobInfo)
		if !ok {
			return &api.ValidateResult{
				Pass:    false,
				Message: fmt.Sprintf("Failed to convert <%v> to *JobInfo", obj),
			}
		}

		vtn := job.ValidTaskNum()
		if vtn < job.MinAvailable {
			return &api.ValidateResult{
				Pass:   false,
				Reason: v1alpha1.NotEnoughPodsReason,
				Message: fmt.Sprintf("Not enough valid tasks for gang-scheduling, valid: %d, min: %d",
					vtn, job.MinAvailable),
			}
		}
		return nil
	}

	ssn.AddJobValidFn(gp.Name(), validJobFn)
  ...
}

pkg/scheduler/scheduler.go:99

此时本轮调度的 Session 已初始化完成,包括集群资源的 Cache SnapShot、各个 Plugin 也已注册,开始根据配置的 Action 并按配置的顺序执行

func (pc *Scheduler) runOnce() {
	glog.V(4).Infof("Start scheduling ...")
	scheduleStartTime := time.Now()
	defer glog.V(4).Infof("End scheduling ...")
	defer metrics.UpdateE2eDuration(metrics.Duration(scheduleStartTime))

	ssn := framework.OpenSession(pc.cache, pc.plugins)
	defer framework.CloseSession(ssn)

	for _, action := range pc.actions {
		actionStartTime := time.Now()
		action.Execute(ssn)
		metrics.UpdateActionDuration(action.Name(), metrics.Duration(actionStartTime))
	}
}