本文最后更新于:2020年7月11日 下午

剖析源码版本为 v0.5.3,commit d0b973be

cmd/tf-operator.v1/main.go:62

func main() {
	s := options.NewServerOption()
	s.AddFlags(flag.CommandLine)

	flag.Parse()

	if s.JSONLogFormat {
		// Output logs in a json format so that it can be parsed by services like Stackdriver.
		log.SetFormatter(&log.JSONFormatter{})
	}

	startMonitoring(s.MonitoringPort)

	if err := app.Run(s); err != nil {
		log.Fatalf("%v\n", err)
	}

}

cmd/tf-operator.v1/app/server.go:98

通过 BuildConfigFromFlags 配置,创建 kubeClient

// Get kubernetes config.
kcfg, err := clientcmd.BuildConfigFromFlags(opt.MasterURL, opt.Kubeconfig)
if err != nil {
	log.Fatalf("Error building kubeconfig: %s", err.Error())
}

// Create clients.
kubeClientSet, leaderElectionClientSet, tfJobClientSet, kubeBatchClientSet, err := createClientSets(kcfg)
if err != nil {
	return err
}

cmd/tf-operator.v1/app/server.go:113

创建 kubeInformerFactory 作为各个资源的 Informer 工厂

tfJobInformerFactory 实际上没用到

创建 unstructuredInformer, 即 TFJobInformer,通过 NewTFController 创建 TFJob 控制器

// Create informer factory.
kubeInformerFactory := kubeinformers.NewFilteredSharedInformerFactory(kubeClientSet, opt.ResyncPeriod, opt.Namespace, nil)
tfJobInformerFactory := tfjobinformers.NewSharedInformerFactory(tfJobClientSet, opt.ResyncPeriod)

unstructuredInformer := controller.NewUnstructuredTFJobInformer(kcfg, opt.Namespace)

// Create tf controller.
tc := controller.NewTFController(unstructuredInformer, kubeClientSet, kubeBatchClientSet, tfJobClientSet, kubeInformerFactory, tfJobInformerFactory, *opt)

// Start informer goroutines.
go kubeInformerFactory.Start(stopCh)

// We do not use the generated informer because of
// https://github.com/kubeflow/tf-operator/issues/561
// go tfJobInformerFactory.Start(stopCh)
go unstructuredInformer.Informer().Run(stopCh)

pkg/controller.v1/tensorflow/controller.go:140

设置 TFJob 事件处理回调函数,callBack Funcs 在 List-Watch 新事件时被 Push 进新的 Item

// Set up an event handler for when tfjob resources change.
tfJobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
	AddFunc:    tc.addTFJob,
	UpdateFunc: tc.updateTFJob,
	// This will enter the sync loop and no-op,
	// because the tfjob has been deleted from the store.
	DeleteFunc: tc.enqueueTFJob,
})

tc.tfJobInformer = tfJobInformer.Informer()
tc.tfJobLister = tfJobInformer.Lister()
tc.tfJobInformerSynced = tfJobInformer.Informer().HasSynced

pkg/controller.v1/tensorflow/controller.go:153

设置 Pod 事件处理回调函数,保证 ownerReferences.kind: TFJob 的 Pod 与相对应的 TFJob 副本数正常

// Create pod informer.
podInformer := kubeInformerFactory.Core().V1().Pods()

// Set up an event handler for when pod resources change
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
	AddFunc:    jc.AddPod,
	UpdateFunc: jc.UpdatePod,
	DeleteFunc: jc.DeletePod,
})

tc.PodLister = podInformer.Lister()
tc.PodInformerSynced = podInformer.Informer().HasSynced

pkg/controller.v1/tensorflow/controller.go:166

同 Pod 原理,设置 Service 事件处理函数,TFJob 中各个角色通过读取 TF_CONFIG 环境变量,利用 Headless Service进行通信

保证 ownerReferences.kind: TFJob 的 Service 与相对应 Pod、TFJob正常

// Create service informer.
	serviceInformer := kubeInformerFactory.Core().V1().Services()

	// Set up an event handler for when service resources change.
	serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    jc.AddService,
		UpdateFunc: jc.UpdateService,
		DeleteFunc: jc.DeleteService,
	})

	tc.ServiceLister = serviceInformer.Lister()
	tc.ServiceInformerSynced = serviceInformer.Informer().HasSynced

pkg/controller.v1/tensorflow/controller.go:185

等待 tfjobInformer、podInformer 和 serviceInformer 第一次 List 同步后,启动 Goroutine Worker 处理 Work-Queue

// Run will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (tc *TFController) Run(threadiness int, stopCh <-chan struct{}) error {
	defer utilruntime.HandleCrash()
	defer tc.WorkQueue.ShutDown()

	// Start the informer factories to begin populating the informer caches.
	log.Info("Starting TFJob controller")

	// Wait for the caches to be synced before starting workers.
	log.Info("Waiting for informer caches to sync")

	if ok := cache.WaitForCacheSync(stopCh, tc.tfJobInformerSynced,
		tc.PodInformerSynced, tc.ServiceInformerSynced); !ok {
		return fmt.Errorf("failed to wait for caches to sync")
	}
	log.Infof("Starting %v workers", threadiness)
	// Launch workers to process TFJob resources.
	for i := 0; i < threadiness; i++ {
		go wait.Until(tc.runWorker, time.Second, stopCh)
	}

	log.Info("Started workers")
	<-stopCh
	log.Info("Shutting down workers")

	return nil
}

pkg/controller.v1/tensorflow/controller.go:289

WorkQueue.Get() 的 Key 通过 syncTFJob 进行同步处理

// syncTFJob syncs the tfjob with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods/services created or deleted.
// This function is not meant to be invoked concurrently with the same key.
func (tc *TFController) syncTFJob(key string) (bool, error) {
	startTime := time.Now()
	logger := tflogger.LoggerForKey(key)
	defer func() {
		logger.Infof("Finished syncing tfjob %q (%v)", key, time.Since(startTime))
	}()
	...
}

pkg/controller.v1/tensorflow/controller.go:304

解析 Key 中的 Namespace 和 Name 进行组合后,使用 tc.tfJobInformer.GetIndexer().GetByKey(key) 获取 TFJob obj

sharedTFJob, err := tc.getTFJobFromName(namespace, name)
if err != nil {
	if err == errNotExists {
		logger.Infof("TFJob has been deleted: %v", key)
		tfJobsDeletedCount.Inc()
		// jm.expectations.DeleteExpectations(key)
		return true, nil
	}
	return false, err
}

pkg/controller.v1beta2/tensorflow/controller.go:307

解析 TFJob obj 并组装 Expectation Key 检查 TFJob 所期望的各个角色的、类型的资源数量与 Expectation 中保存的数量现状是否一致

expectationPodsKey(namespace/name/replicaType/pods)

expectationServicesKey(namespace/name/replicaType/services)

// satisfiedExpectations returns true if the required adds/dels for the given tfjob have been observed.
// Add/del counts are established by the controller at sync time, and updated as controllees are observed by the controller
// manager.
func (tc *TFController) satisfiedExpectations(tfjob *tfv1beta2.TFJob) bool {
   satisfied := false
   tfjobKey, err := KeyFunc(tfjob)
   if err != nil {
      utilruntime.HandleError(fmt.Errorf("couldn't get key for tfjob object %#v: %v", tfjob, err))
      return false
   }

   for rtype := range tfjob.Spec.TFReplicaSpecs {
      // Check the expectations of the pods.
      expectationPodsKey := jobcontroller.GenExpectationPodsKey(tfjobKey, string(rtype))
      satisfied = satisfied || tc.Expectations.SatisfiedExpectations(expectationPodsKey)

      // Check the expectations of the services.
      expectationServicesKey := jobcontroller.GenExpectationServicesKey(tfjobKey, string(rtype))
      satisfied = satisfied || tc.Expectations.SatisfiedExpectations(expectationServicesKey)
   }

   return satisfied
}

在 podInformer 和 serviceInformer 的 ADD/DELETE callBack Func 中对 Expectations 进行操作,针对 expectation key 所对应的值 +/- 1

// A TTLCache of pod/services creates/deletes each job expects to see
// We use Job namespace/name + ReplicaType + pods/services as an expectation key,
// For example, there is a TFJob with namespace "tf-operator" and name "tfjob-abc":
// {
//     "PS": {
//         "Replicas": 2,
//     },
//     "Worker": {
//         "Replicas": 4,
//     }
// }
// We will create 4 expectations:
// - "tf-operator/tfjob-abc/ps/services", expects 2 adds.
// - "tf-operator/tfjob-abc/ps/pods", expects 2 adds.
// - "tf-operator/tfjob-abc/worker/services", expects 4 adds.
// - "tf-operator/tfjob-abc/worker/pods", expects 4 adds.
Expectations controller.ControllerExpectationsInterface

pkg/controller.v1/tensorflow/controller.go:335

与期望状态不一致的 TFJob 进行调和

// reconcileTFJobs checks and updates replicas for each given TFReplicaSpec.
// It will requeue the tfjob in case of an error while creating/deleting pods/services.
func (tc *TFController) reconcileTFJobs(tfjob *tfv1.TFJob) error {
	tfjobKey, err := KeyFunc(tfjob)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("couldn't get key for tfjob object %#v: %v", tfjob, err))
		return err
	}
	logger := tflogger.LoggerForJob(tfjob)
	logger.Infof("Reconcile TFJobs %s", tfjob.Name)
	...
}

pkg/controller.v1/tensorflow/controller.go:399

判断 TFJob.Status 是否为 isSucceeded、Failed 或 Pod backoff 达到 Limit,若为上述状态将删除 TFJob,否则继续执行后续逻辑

// If the TFJob is terminated, delete all pods and services.
if isSucceeded(tfjob.Status) || isFailed(tfjob.Status) || tfJobExceedsLimit {
	if err := tc.deletePodsAndServices(tfjob, pods); err != nil {
		return err
	}

	if tfJobExceedsLimit {
		tc.Recorder.Event(tfjob, v1.EventTypeNormal, tfJobFailedReason, failureMessage)
		if tfjob.Status.CompletionTime == nil {
			now := metav1.Now()
			tfjob.Status.CompletionTime = &now
		}
		err := updateTFJobConditions(tfjob, common.JobFailed, tfJobFailedReason, failureMessage)
		if err != nil {
			tflogger.LoggerForJob(tfjob).Infof("Append tfjob condition error: %v", err)
			return err
		}
	}

	if err := tc.cleanupTFJob(tfjob); err != nil {
		return err
	}

	if tc.Config.EnableGangScheduling {
		if err := tc.DeletePodGroup(tfjob); err != nil {
			return err
		}
	}

	// At this point the pods may have been deleted, so if the job succeeded, we need to manually set the replica status.
	// If any replicas are still Active, set their status to succeeded.
	if isSucceeded(tfjob.Status) {
		for rtype := range tfjob.Status.ReplicaStatuses {
			tfjob.Status.ReplicaStatuses[rtype].Succeeded += tfjob.Status.ReplicaStatuses[rtype].Active
			tfjob.Status.ReplicaStatuses[rtype].Active = 0
		}
	}
	return tc.updateStatusHandler(tfjob)
}

pkg/controller.v1/tensorflow/controller.go:438

判断是否配置了批调度,将会主动创建 podGroup

if tc.Config.EnableGangScheduling {
	minAvailableReplicas := getTotalReplicas(tfjob)
	_, err := tc.SyncPodGroup(tfjob, minAvailableReplicas)
	if err != nil {
		logger.Warnf("Sync PodGroup %v: %v", tfjob.Name, err)
	}
}

pkg/controller.v1/tensorflow/controller.go:447

分角色开始一次调和 Pods 和 Services 资源

// Save the current state of the replicas
replicasStatus := make(map[string]v1.PodPhase)

// Diff current active pods/services with replicas.
for rtype, spec := range tfjob.Spec.TFReplicaSpecs {
	err = tc.reconcilePods(tfjob, pods, rtype, spec, replicasStatus)
	if err != nil {
		logger.Warnf("reconcilePods error %v", err)
		return err
	}

	err = tc.reconcileServices(tfjob, services, rtype, spec)

	if err != nil {
		logger.Warnf("reconcileServices error %v", err)
		return err
	}
}

pkg/controller.v1/tensorflow/pod.go:74

解析某种角色的 Spec,输出根据序号排列的 podSlice[][]

// reconcilePods checks and updates pods for each given TFReplicaSpec.
// It will requeue the tfjob in case of an error while creating/deleting pods.
func (tc *TFController) reconcilePods(
	tfjob *tfv1.TFJob,
	pods []*v1.Pod,
	rtype tfv1.TFReplicaType,
	spec *common.ReplicaSpec, rstatus map[string]v1.PodPhase) error {

	// Convert TFReplicaType to lower string.
	rt := strings.ToLower(string(rtype))
	logger := tflogger.LoggerForReplica(tfjob, rt)
	// Get all pods for the type rt.
	pods, err := tc.FilterPodsForReplicaType(pods, rt)
	if err != nil {
		return err
	}
	replicas := int(*spec.Replicas)
	restart := false
	worker0Completed := false
	masterRole := false

	initializeTFReplicaStatuses(tfjob, rtype)

	podSlices := tc.GetPodSlices(pods, replicas, logger)
	...
}

pkg/controller.v1/tensorflow/pod.go:75

判断角色是否为 Chief/Master 或 Worker,如果此刻 TFJob 中某种角色的某个 Index Pod 数量为0,则开始创建此 Index Pod

并设置 OwnerReference,将 Pod 与TFJob 进行级联

	for index, podSlice := range podSlices {
		masterRole = false
		if len(podSlice) > 1 {
			logger.Warningf("We have too many pods for %s %d", rt, index)
			// TODO(gaocegege): Kill some pods.
		} else if len(podSlice) == 0 {
			logger.Infof("Need to create new pod: %s-%d", rt, index)

			// if master pod is present, select the master pod
			// if master is not present, first worker pod is selected as the master.
			if ContainChieforMasterSpec(tfjob) {
				if tfv1.IsChieforMaster(rtype) {
					masterRole = true
				}
			} else {
				if tfv1.IsWorker(rtype) && (index == 0) {
					masterRole = true
				}
			}
			err = tc.createNewPod(tfjob, rt, strconv.Itoa(index), spec, masterRole)
			if err != nil {
				return err
			}
		} else {
      ...
    }
    ...
  }
  ...
}

pkg/controller.v1/tensorflow/pod.go:213

创建 Pod 时设置相关 Containers ENV TF_CONFIG,Tensorflow 任务各个角色之间通过 TF_CONFIG 环境变量进行通信

func setClusterSpec(podTemplateSpec *v1.PodTemplateSpec, tfjob *tfv1.TFJob, rt, index string) error {
	// Generate TF_CONFIG JSON string.
	tfConfigStr, err := genTFConfigJSONStr(tfjob, rt, index)
	if err != nil {
		return err
	}

	if tfConfigStr == "" {
		return nil
	}
	// Add TF_CONFIG environment variable.
	for i := range podTemplateSpec.Spec.Containers {
		if len(podTemplateSpec.Spec.Containers[i].Env) == 0 {
			podTemplateSpec.Spec.Containers[i].Env = make([]v1.EnvVar, 0)
		}
		podTemplateSpec.Spec.Containers[i].Env = append(podTemplateSpec.Spec.Containers[i].Env, v1.EnvVar{
			Name:  tfConfig,
			Value: tfConfigStr,
		})
	}
	return nil
}

TF_CONFIG Cluster 字段与各个角色创建的 Headless Service 一一对应

- name: TF_CONFIG
      value: '{"cluster":{"chief":["tplus-mix-tf-158698-qiwu-chief-0:2222"],"ps":["tplus-mix-tf-158698-qiwu-ps-0:2222","tplus-mix-tf-158698-qiwu-ps-1:2222","tplus-mix-tf-158698-qiwu-ps-2:2222","tplus-mix-tf-158698-qiwu-ps-3:2222","tplus-mix-tf-158698-qiwu-ps-4:2222","tplus-mix-tf-158698-qiwu-ps-5:2222","tplus-mix-tf-158698-qiwu-ps-6:2222","tplus-mix-tf-158698-qiwu-ps-7:2222"],"tplusmaster":["tplus-mix-tf-158698-qiwu-tplusmaster-0:2222"],"worker":["tplus-mix-tf-158698-qiwu-worker-0:2222","tplus-mix-tf-158698-qiwu-worker-1:2222","tplus-mix-tf-158698-qiwu-worker-2:2222","tplus-mix-tf-158698-qiwu-worker-3:2222","tplus-mix-tf-158698-qiwu-worker-4:2222"]},"task":{"type":"chief","index":0},"environment":"cloud"}'

如果启用了 EnableGangScheduling,则配置调度器为 Kube-Batch

// if gang-scheduling is enabled:
// 1. if user has specified other scheduler, we report a warning without overriding any fields.
// 2. if no SchedulerName is set for pods, then we set the SchedulerName to "kube-batch".
if tc.Config.EnableGangScheduling {
	if isNonGangSchedulerSet(tfjob) {
		errMsg := "Another scheduler is specified when gang-scheduling is enabled and it will not be overwritten"
		logger.Warning(errMsg)
		tc.Recorder.Event(tfjob, v1.EventTypeWarning, podTemplateSchedulerNameReason, errMsg)
	} else {
		podTemplate.Spec.SchedulerName = gangSchedulerName
	}
}

pkg/controller.v1/tensorflow/status.go:61

更新 TFJob 资源状态,当 TFJob 中存在 Chief 或 Master 角色时,以 Chief 或 Master Pod 的副本状态表示整个 TFJob 状态

if ContainChieforMasterSpec(tfjob) {
		if tfv1.IsChieforMaster(rtype) {
			if running > 0 {
				msg := fmt.Sprintf("TFJob %s is running.", tfjob.Name)
				err := updateTFJobConditions(tfjob, common.JobRunning, tfJobRunningReason, msg)
				if err != nil {
					tflogger.LoggerForJob(tfjob).Infof("Append tfjob condition error: %v", err)
					return err
				}
			}
			if expected == 0 {
				msg := fmt.Sprintf("TFJob %s successfully completed.", tfjob.Name)
				tc.Recorder.Event(tfjob, v1.EventTypeNormal, tfJobSucceededReason, msg)
				if tfjob.Status.CompletionTime == nil {
					now := metav1.Now()
					tfjob.Status.CompletionTime = &now
				}
				err := updateTFJobConditions(tfjob, common.JobSucceeded, tfJobSucceededReason, msg)
				if err != nil {
					tflogger.LoggerForJob(tfjob).Infof("Append tfjob condition error: %v", err)
					return err
				}
				tfJobsSuccessCount.Inc()
			}
		}
	}

当不包含 Chief 或 Master 角色时,以 Worker-0 副本的状态表示整个TFJob 状态

else {
		if rtype == tfv1.TFReplicaTypeWorker {
			// All workers are succeeded or worker 0 completed, leave a succeeded condition.
			if expected == 0 || worker0Completed {
				msg := fmt.Sprintf("TFJob %s successfully completed.", tfjob.Name)
				tc.Recorder.Event(tfjob, v1.EventTypeNormal, tfJobSucceededReason, msg)
				if tfjob.Status.CompletionTime == nil {
					now := metav1.Now()
					tfjob.Status.CompletionTime = &now
				}
				err := updateTFJobConditions(tfjob, common.JobSucceeded, tfJobSucceededReason, msg)
				if err != nil {
					tflogger.LoggerForJob(tfjob).Infof("Append tfjob condition error: %v", err)
					return err
				}
				tfJobsSuccessCount.Inc()
			} else if running > 0 {
				// Some workers are still running, leave a running condition.
				msg := fmt.Sprintf("TFJob %s is running.", tfjob.Name)
				err := updateTFJobConditions(tfjob, common.JobRunning, tfJobRunningReason, msg)
				if err != nil {
					tflogger.LoggerForJob(tfjob).Infof("Append tfjob condition error: %v", err)
					return err
				}
			}
		}
	}

pkg/controller.v1/tensorflow/controller.go:467

经过调和各个角色 Pod 和 Service 资源后,更新 TFJob 状态

// no need to update the tfjob if the status hasn't changed since last time.
if !reflect.DeepEqual(*oldStatus, tfjob.Status) {
	return tc.updateStatusHandler(tfjob)
}
return nil