本文最后更新于:2020年7月6日 晚上
剖析源码版本为 v0.5.0,commit 010ef715
cmd/kube-batch/main.go:54
if err := app.Run(s); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
cmd/kube-batch/app/server.go:133
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: leaseDuration,
RenewDeadline: renewDeadline,
RetryPeriod: retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("leaderelection lost")
},
},
})
pkg/scheduler/scheduler.go:67
相应的资源 Informer 开始 Iist-Watch 监听事件变化
// Run starts the schedulerCache
func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
go sc.pdbInformer.Informer().Run(stopCh)
go sc.podInformer.Informer().Run(stopCh)
go sc.nodeInformer.Informer().Run(stopCh)
go sc.podGroupInformerv1alpha1.Informer().Run(stopCh)
go sc.podGroupInformerv1alpha2.Informer().Run(stopCh)
go sc.pvInformer.Informer().Run(stopCh)
go sc.pvcInformer.Informer().Run(stopCh)
go sc.scInformer.Informer().Run(stopCh)
go sc.queueInformerv1alpha1.Informer().Run(stopCh)
go sc.queueInformerv1alpha2.Informer().Run(stopCh)
if options.ServerOpts.EnablePriorityClass {
go sc.pcInformer.Informer().Run(stopCh)
}
// Re-sync error tasks.
go wait.Until(sc.processResyncTask, 0, stopCh)
// Cleanup jobs.
go wait.Until(sc.processCleanupJob, 0, stopCh)
}
pkg/scheduler/scheduler.go:71
加载调度规则配置,默认为 defaultSchedulerConf
// Run runs the Scheduler
func (pc *Scheduler) Run(stopCh <-chan struct{}) {
var err error
// Start cache for policy.
go pc.cache.Run(stopCh)
pc.cache.WaitForCacheSync(stopCh)
// Load configuration of scheduler
schedConf := defaultSchedulerConf
if len(pc.schedulerConf) != 0 {
if schedConf, err = readSchedulerConf(pc.schedulerConf); err != nil {
glog.Errorf("Failed to read scheduler configuration '%s', using default configuration: %v",
pc.schedulerConf, err)
schedConf = defaultSchedulerConf
}
}
pc.actions, pc.plugins, err = loadSchedulerConf(schedConf)
if err != nil {
panic(err)
}
go wait.Until(pc.runOnce, pc.schedulePeriod, stopCh)
}
pkg/scheduler/util.go:31
var defaultSchedulerConf = `
actions: "allocate, backfill"
tiers:
- plugins:
- name: priority
- name: gang
- plugins:
- name: drf
- name: predicates
- name: proportion
- name: nodeorder
`
pkg/scheduler/util.go:44
解析调度配置,返回每个 Action 的 Action interface 实例并设置 Tiers 的具体配置参数
func loadSchedulerConf(confStr string) ([]framework.Action, []conf.Tier, error) {
var actions []framework.Action
schedulerConf := &conf.SchedulerConfiguration{}
buf := make([]byte, len(confStr))
copy(buf, confStr)
if err := yaml.Unmarshal(buf, schedulerConf); err != nil {
return nil, nil, err
}
// Set default settings for each plugin if not set
for i, tier := range schedulerConf.Tiers {
for j := range tier.Plugins {
plugins.ApplyPluginConfDefaults(&schedulerConf.Tiers[i].Plugins[j])
}
}
actionNames := strings.Split(schedulerConf.Actions, ",")
for _, actionName := range actionNames {
if action, found := framework.GetAction(strings.TrimSpace(actionName)); found {
actions = append(actions, action)
} else {
return nil, nil, fmt.Errorf("failed to found Action %s, ignore it", actionName)
}
}
return actions, schedulerConf.Tiers, nil
}
pkg/scheduler/plugins/defaults.go:22
根据配置设置每个 Tier 的具体调度参数
// ApplyPluginConfDefaults sets option's filed to its default value if not set
func ApplyPluginConfDefaults(option *conf.PluginOption) {
t := true
if option.EnabledJobOrder == nil {
option.EnabledJobOrder = &t
}
if option.EnabledJobReady == nil {
option.EnabledJobReady = &t
}
if option.EnabledJobPipelined == nil {
option.EnabledJobPipelined = &t
}
if option.EnabledTaskOrder == nil {
option.EnabledTaskOrder = &t
}
if option.EnabledPreemptable == nil {
option.EnabledPreemptable = &t
}
if option.EnabledReclaimable == nil {
option.EnabledReclaimable = &t
}
if option.EnabledQueueOrder == nil {
option.EnabledQueueOrder = &t
}
if option.EnabledPredicate == nil {
option.EnabledPredicate = &t
}
if option.EnabledNodeOrder == nil {
option.EnabledNodeOrder = &t
}
}
pkg/scheduler/actions/factory.go:28
内置的 4 个 Action 之前已初始化
func init() {
framework.RegisterAction(reclaim.New())
framework.RegisterAction(allocate.New())
framework.RegisterAction(backfill.New())
framework.RegisterAction(preempt.New())
}
pkg/scheduler/scheduler.go:85
开始调度,默认间隔1s尝试对所有待调度实例,进行一次调度计算
go wait.Until(pc.runOnce, pc.schedulePeriod, stopCh)
pkg/scheduler/scheduler.go:94
初始化贯穿本次调度生命周期的 session,拥有此次调度的 cache snapshot
ssn := framework.OpenSession(pc.cache, pc.plugins)
defer framework.CloseSession(ssn)
pkg/scheduler/framework/framework.go:31
// OpenSession start the session
func OpenSession(cache cache.Cache, tiers []conf.Tier) *Session {
ssn := openSession(cache)
ssn.Tiers = tiers
for _, tier := range tiers {
for _, plugin := range tier.Plugins {
if pb, found := GetPluginBuilder(plugin.Name); !found {
glog.Errorf("Failed to get plugin %s.", plugin.Name)
} else {
plugin := pb(plugin.Arguments)
ssn.plugins[plugin.Name()] = plugin
}
}
}
for _, plugin := range ssn.plugins {
onSessionOpenStart := time.Now()
plugin.OnSessionOpen(ssn)
metrics.UpdatePluginDuration(plugin.Name(), metrics.OnSessionOpen, metrics.Duration(onSessionOpenStart))
}
return ssn
}
pkg/scheduler/framework/session.go:86
初始化 ssn 并保存当前时刻集群某些资源的 Cache SnapShot 快照
func openSession(cache cache.Cache) *Session {
ssn := &Session{
UID: uuid.NewUUID(),
cache: cache,
Jobs: map[api.JobID]*api.JobInfo{},
Nodes: map[string]*api.NodeInfo{},
Queues: map[api.QueueID]*api.QueueInfo{},
plugins: map[string]Plugin{},
jobOrderFns: map[string]api.CompareFn{},
queueOrderFns: map[string]api.CompareFn{},
taskOrderFns: map[string]api.CompareFn{},
predicateFns: map[string]api.PredicateFn{},
preemptableFns: map[string]api.EvictableFn{},
reclaimableFns: map[string]api.EvictableFn{},
overusedFns: map[string]api.ValidateFn{},
jobReadyFns: map[string]api.ValidateFn{},
jobPipelinedFns: map[string]api.ValidateFn{},
jobValidFns: map[string]api.ValidateExFn{},
nodePrioritizers: map[string][]algorithm.PriorityConfig{},
}
snapshot := cache.Snapshot()
...
}
pkg/scheduler/cache/cache.go:616
Cache SnapShot 包含 Ready Node 信息、Queue 信息和指定了 podGroup 和 Queue 的 Job,并为 Job 设置 Priority
snapshot := &kbapi.ClusterInfo{
Nodes: make(map[string]*kbapi.NodeInfo),
Jobs: make(map[kbapi.JobID]*kbapi.JobInfo),
Queues: make(map[kbapi.QueueID]*kbapi.QueueInfo),
}
for _, value := range sc.Nodes {
if !value.Ready() {
continue
}
snapshot.Nodes[value.Name] = value.Clone()
}
for _, value := range sc.Queues {
snapshot.Queues[value.UID] = value.Clone()
}
for _, value := range sc.Jobs {
// If no scheduling spec, does not handle it.
if value.PodGroup == nil && value.PDB == nil {
glog.V(4).Infof("The scheduling spec of Job <%v:%s/%s> is nil, ignore it.",
value.UID, value.Namespace, value.Name)
continue
}
if _, found := snapshot.Queues[value.Queue]; !found {
glog.V(3).Infof("The Queue <%v> of Job <%v/%v> does not exist, ignore it.",
value.Queue, value.Namespace, value.Name)
continue
}
if value.PodGroup != nil {
value.Priority = sc.defaultPriority
priName := value.PodGroup.Spec.PriorityClassName
if priorityClass, found := sc.PriorityClasses[priName]; found {
value.Priority = priorityClass.Value
}
glog.V(4).Infof("The priority of job <%s/%s> is <%s/%d>",
value.Namespace, value.Name, priName, value.Priority)
}
snapshot.Jobs[value.UID] = value.Clone()
}
pkg/scheduler/framework/session.go:89
过滤无效的,不需要进行调度计算的 Job,无效的 Job 将从 Cache SnapShot 中被删除
ssn.Jobs = snapshot.Jobs
for _, job := range ssn.Jobs {
if vjr := ssn.JobValid(job); vjr != nil {
if !vjr.Pass {
jc := &api.PodGroupCondition{
Type: api.PodGroupUnschedulableType,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
TransitionID: string(ssn.UID),
Reason: vjr.Reason,
Message: vjr.Message,
}
if err := ssn.UpdateJobCondition(job, jc); err != nil {
glog.Errorf("Failed to update job condition: %v", err)
}
}
delete(ssn.Jobs, job.UID)
}
}
pkg/scheduler/framework/session_plugins.go:224
根据 ssn 中注册的 Tiers Plugin 所提供的 jobVaildFn(此时 ssn 中的 jobVaildFn 还并未注册,后续通过每个 Plugin 的构造函数,将注册 Plugin 所包含的各个维度的调度函数)
最短路径只要遇到 vaildFn 判断为无效的 Job 即立刻返回
// JobValid invoke jobvalid function of the plugins
func (ssn *Session) JobValid(obj interface{}) *api.ValidateResult {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
jrf, found := ssn.jobValidFns[plugin.Name]
if !found {
continue
}
if vr := jrf(obj); vr != nil && !vr.Pass {
return vr
}
}
}
return nil
}
pkg/scheduler/framework/framework.go:36
初始化 ssn 后,使用 pluginBuilders 中存储的 Plugin 对应 New 方法实例化 Plugin 对象
// GetPluginBuilder get the pluginbuilder by name
func GetPluginBuilder(name string) (PluginBuilder, bool) {
pluginMutex.Lock()
defer pluginMutex.Unlock()
pb, found := pluginBuilders[name]
return pb, found
}
pkg/scheduler/plugins/factory.go:31
Plugin 的 Builder 方法在之前也已注册
func init() {
// Plugins for Jobs
framework.RegisterPluginBuilder("drf", drf.New)
framework.RegisterPluginBuilder("gang", gang.New)
framework.RegisterPluginBuilder("predicates", predicates.New)
framework.RegisterPluginBuilder("priority", priority.New)
framework.RegisterPluginBuilder("nodeorder", nodeorder.New)
framework.RegisterPluginBuilder("conformance", conformance.New)
// Plugins for Queues
framework.RegisterPluginBuilder("proportion", proportion.New)
}
pkg/scheduler/framework/framework.go:47
进行 Plugin 函数注册,此时才将 Plugin 各个调度维度的函数注册
for _, plugin := range ssn.plugins {
onSessionOpenStart := time.Now()
plugin.OnSessionOpen(ssn)
metrics.UpdatePluginDuration(plugin.Name(), metrics.OnSessionOpen, metrics.Duration(onSessionOpenStart))
}
return ssn
pkg/scheduler/plugins/gang/gang.go:69
举例 Gang Plugin,将会注册 JobVaildFn 方法,在过滤 Cache SnapShot 中无效 Job 时被调用
判断 Job 中包含的 Vaild Task 数量是否小于 podGroup 中设置的 minMember, 因为 Vaild Task 小于 minMember 的不满足 Gang 批调度条件
func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) {
validJobFn := func(obj interface{}) *api.ValidateResult {
job, ok := obj.(*api.JobInfo)
if !ok {
return &api.ValidateResult{
Pass: false,
Message: fmt.Sprintf("Failed to convert <%v> to *JobInfo", obj),
}
}
vtn := job.ValidTaskNum()
if vtn < job.MinAvailable {
return &api.ValidateResult{
Pass: false,
Reason: v1alpha1.NotEnoughPodsReason,
Message: fmt.Sprintf("Not enough valid tasks for gang-scheduling, valid: %d, min: %d",
vtn, job.MinAvailable),
}
}
return nil
}
ssn.AddJobValidFn(gp.Name(), validJobFn)
...
}
pkg/scheduler/scheduler.go:99
此时本轮调度的 Session 已初始化完成,包括集群资源的 Cache SnapShot、各个 Plugin 也已注册,开始根据配置的 Action 并按配置的顺序执行
func (pc *Scheduler) runOnce() {
glog.V(4).Infof("Start scheduling ...")
scheduleStartTime := time.Now()
defer glog.V(4).Infof("End scheduling ...")
defer metrics.UpdateE2eDuration(metrics.Duration(scheduleStartTime))
ssn := framework.OpenSession(pc.cache, pc.plugins)
defer framework.CloseSession(ssn)
for _, action := range pc.actions {
actionStartTime := time.Now()
action.Execute(ssn)
metrics.UpdateActionDuration(action.Name(), metrics.Duration(actionStartTime))
}
}
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!