本文最后更新于:2020年7月11日 下午
剖析源码版本为 v0.5.3,commit d0b973be
cmd/tf-operator.v1/main.go:62
func main() {
s := options.NewServerOption()
s.AddFlags(flag.CommandLine)
flag.Parse()
if s.JSONLogFormat {
// Output logs in a json format so that it can be parsed by services like Stackdriver.
log.SetFormatter(&log.JSONFormatter{})
}
startMonitoring(s.MonitoringPort)
if err := app.Run(s); err != nil {
log.Fatalf("%v\n", err)
}
}
cmd/tf-operator.v1/app/server.go:98
通过 BuildConfigFromFlags 配置,创建 kubeClient
// Get kubernetes config.
kcfg, err := clientcmd.BuildConfigFromFlags(opt.MasterURL, opt.Kubeconfig)
if err != nil {
log.Fatalf("Error building kubeconfig: %s", err.Error())
}
// Create clients.
kubeClientSet, leaderElectionClientSet, tfJobClientSet, kubeBatchClientSet, err := createClientSets(kcfg)
if err != nil {
return err
}
cmd/tf-operator.v1/app/server.go:113
创建 kubeInformerFactory 作为各个资源的 Informer 工厂
tfJobInformerFactory 实际上没用到
创建 unstructuredInformer, 即 TFJobInformer,通过 NewTFController 创建 TFJob 控制器
// Create informer factory.
kubeInformerFactory := kubeinformers.NewFilteredSharedInformerFactory(kubeClientSet, opt.ResyncPeriod, opt.Namespace, nil)
tfJobInformerFactory := tfjobinformers.NewSharedInformerFactory(tfJobClientSet, opt.ResyncPeriod)
unstructuredInformer := controller.NewUnstructuredTFJobInformer(kcfg, opt.Namespace)
// Create tf controller.
tc := controller.NewTFController(unstructuredInformer, kubeClientSet, kubeBatchClientSet, tfJobClientSet, kubeInformerFactory, tfJobInformerFactory, *opt)
// Start informer goroutines.
go kubeInformerFactory.Start(stopCh)
// We do not use the generated informer because of
// https://github.com/kubeflow/tf-operator/issues/561
// go tfJobInformerFactory.Start(stopCh)
go unstructuredInformer.Informer().Run(stopCh)
pkg/controller.v1/tensorflow/controller.go:140
设置 TFJob 事件处理回调函数,callBack Funcs 在 List-Watch 新事件时被 Push 进新的 Item
// Set up an event handler for when tfjob resources change.
tfJobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: tc.addTFJob,
UpdateFunc: tc.updateTFJob,
// This will enter the sync loop and no-op,
// because the tfjob has been deleted from the store.
DeleteFunc: tc.enqueueTFJob,
})
tc.tfJobInformer = tfJobInformer.Informer()
tc.tfJobLister = tfJobInformer.Lister()
tc.tfJobInformerSynced = tfJobInformer.Informer().HasSynced
pkg/controller.v1/tensorflow/controller.go:153
设置 Pod 事件处理回调函数,保证 ownerReferences.kind: TFJob 的 Pod 与相对应的 TFJob 副本数正常
// Create pod informer.
podInformer := kubeInformerFactory.Core().V1().Pods()
// Set up an event handler for when pod resources change
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jc.AddPod,
UpdateFunc: jc.UpdatePod,
DeleteFunc: jc.DeletePod,
})
tc.PodLister = podInformer.Lister()
tc.PodInformerSynced = podInformer.Informer().HasSynced
pkg/controller.v1/tensorflow/controller.go:166
同 Pod 原理,设置 Service 事件处理函数,TFJob 中各个角色通过读取 TF_CONFIG 环境变量,利用 Headless Service进行通信
保证 ownerReferences.kind: TFJob 的 Service 与相对应 Pod、TFJob正常
// Create service informer.
serviceInformer := kubeInformerFactory.Core().V1().Services()
// Set up an event handler for when service resources change.
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jc.AddService,
UpdateFunc: jc.UpdateService,
DeleteFunc: jc.DeleteService,
})
tc.ServiceLister = serviceInformer.Lister()
tc.ServiceInformerSynced = serviceInformer.Informer().HasSynced
pkg/controller.v1/tensorflow/controller.go:185
等待 tfjobInformer、podInformer 和 serviceInformer 第一次 List 同步后,启动 Goroutine Worker 处理 Work-Queue
// Run will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (tc *TFController) Run(threadiness int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer tc.WorkQueue.ShutDown()
// Start the informer factories to begin populating the informer caches.
log.Info("Starting TFJob controller")
// Wait for the caches to be synced before starting workers.
log.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, tc.tfJobInformerSynced,
tc.PodInformerSynced, tc.ServiceInformerSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
log.Infof("Starting %v workers", threadiness)
// Launch workers to process TFJob resources.
for i := 0; i < threadiness; i++ {
go wait.Until(tc.runWorker, time.Second, stopCh)
}
log.Info("Started workers")
<-stopCh
log.Info("Shutting down workers")
return nil
}
pkg/controller.v1/tensorflow/controller.go:289
WorkQueue.Get() 的 Key 通过 syncTFJob 进行同步处理
// syncTFJob syncs the tfjob with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods/services created or deleted.
// This function is not meant to be invoked concurrently with the same key.
func (tc *TFController) syncTFJob(key string) (bool, error) {
startTime := time.Now()
logger := tflogger.LoggerForKey(key)
defer func() {
logger.Infof("Finished syncing tfjob %q (%v)", key, time.Since(startTime))
}()
...
}
pkg/controller.v1/tensorflow/controller.go:304
解析 Key 中的 Namespace 和 Name 进行组合后,使用 tc.tfJobInformer.GetIndexer().GetByKey(key) 获取 TFJob obj
sharedTFJob, err := tc.getTFJobFromName(namespace, name)
if err != nil {
if err == errNotExists {
logger.Infof("TFJob has been deleted: %v", key)
tfJobsDeletedCount.Inc()
// jm.expectations.DeleteExpectations(key)
return true, nil
}
return false, err
}
pkg/controller.v1beta2/tensorflow/controller.go:307
解析 TFJob obj 并组装 Expectation Key 检查 TFJob 所期望的各个角色的、类型的资源数量与 Expectation 中保存的数量现状是否一致
expectationPodsKey(namespace/name/replicaType/pods)
expectationServicesKey(namespace/name/replicaType/services)
// satisfiedExpectations returns true if the required adds/dels for the given tfjob have been observed.
// Add/del counts are established by the controller at sync time, and updated as controllees are observed by the controller
// manager.
func (tc *TFController) satisfiedExpectations(tfjob *tfv1beta2.TFJob) bool {
satisfied := false
tfjobKey, err := KeyFunc(tfjob)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for tfjob object %#v: %v", tfjob, err))
return false
}
for rtype := range tfjob.Spec.TFReplicaSpecs {
// Check the expectations of the pods.
expectationPodsKey := jobcontroller.GenExpectationPodsKey(tfjobKey, string(rtype))
satisfied = satisfied || tc.Expectations.SatisfiedExpectations(expectationPodsKey)
// Check the expectations of the services.
expectationServicesKey := jobcontroller.GenExpectationServicesKey(tfjobKey, string(rtype))
satisfied = satisfied || tc.Expectations.SatisfiedExpectations(expectationServicesKey)
}
return satisfied
}
在 podInformer 和 serviceInformer 的 ADD/DELETE callBack Func 中对 Expectations 进行操作,针对 expectation key 所对应的值 +/- 1
// A TTLCache of pod/services creates/deletes each job expects to see
// We use Job namespace/name + ReplicaType + pods/services as an expectation key,
// For example, there is a TFJob with namespace "tf-operator" and name "tfjob-abc":
// {
// "PS": {
// "Replicas": 2,
// },
// "Worker": {
// "Replicas": 4,
// }
// }
// We will create 4 expectations:
// - "tf-operator/tfjob-abc/ps/services", expects 2 adds.
// - "tf-operator/tfjob-abc/ps/pods", expects 2 adds.
// - "tf-operator/tfjob-abc/worker/services", expects 4 adds.
// - "tf-operator/tfjob-abc/worker/pods", expects 4 adds.
Expectations controller.ControllerExpectationsInterface
pkg/controller.v1/tensorflow/controller.go:335
与期望状态不一致的 TFJob 进行调和
// reconcileTFJobs checks and updates replicas for each given TFReplicaSpec.
// It will requeue the tfjob in case of an error while creating/deleting pods/services.
func (tc *TFController) reconcileTFJobs(tfjob *tfv1.TFJob) error {
tfjobKey, err := KeyFunc(tfjob)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for tfjob object %#v: %v", tfjob, err))
return err
}
logger := tflogger.LoggerForJob(tfjob)
logger.Infof("Reconcile TFJobs %s", tfjob.Name)
...
}
pkg/controller.v1/tensorflow/controller.go:399
判断 TFJob.Status 是否为 isSucceeded、Failed 或 Pod backoff 达到 Limit,若为上述状态将删除 TFJob,否则继续执行后续逻辑
// If the TFJob is terminated, delete all pods and services.
if isSucceeded(tfjob.Status) || isFailed(tfjob.Status) || tfJobExceedsLimit {
if err := tc.deletePodsAndServices(tfjob, pods); err != nil {
return err
}
if tfJobExceedsLimit {
tc.Recorder.Event(tfjob, v1.EventTypeNormal, tfJobFailedReason, failureMessage)
if tfjob.Status.CompletionTime == nil {
now := metav1.Now()
tfjob.Status.CompletionTime = &now
}
err := updateTFJobConditions(tfjob, common.JobFailed, tfJobFailedReason, failureMessage)
if err != nil {
tflogger.LoggerForJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
}
}
if err := tc.cleanupTFJob(tfjob); err != nil {
return err
}
if tc.Config.EnableGangScheduling {
if err := tc.DeletePodGroup(tfjob); err != nil {
return err
}
}
// At this point the pods may have been deleted, so if the job succeeded, we need to manually set the replica status.
// If any replicas are still Active, set their status to succeeded.
if isSucceeded(tfjob.Status) {
for rtype := range tfjob.Status.ReplicaStatuses {
tfjob.Status.ReplicaStatuses[rtype].Succeeded += tfjob.Status.ReplicaStatuses[rtype].Active
tfjob.Status.ReplicaStatuses[rtype].Active = 0
}
}
return tc.updateStatusHandler(tfjob)
}
pkg/controller.v1/tensorflow/controller.go:438
判断是否配置了批调度,将会主动创建 podGroup
if tc.Config.EnableGangScheduling {
minAvailableReplicas := getTotalReplicas(tfjob)
_, err := tc.SyncPodGroup(tfjob, minAvailableReplicas)
if err != nil {
logger.Warnf("Sync PodGroup %v: %v", tfjob.Name, err)
}
}
pkg/controller.v1/tensorflow/controller.go:447
分角色开始一次调和 Pods 和 Services 资源
// Save the current state of the replicas
replicasStatus := make(map[string]v1.PodPhase)
// Diff current active pods/services with replicas.
for rtype, spec := range tfjob.Spec.TFReplicaSpecs {
err = tc.reconcilePods(tfjob, pods, rtype, spec, replicasStatus)
if err != nil {
logger.Warnf("reconcilePods error %v", err)
return err
}
err = tc.reconcileServices(tfjob, services, rtype, spec)
if err != nil {
logger.Warnf("reconcileServices error %v", err)
return err
}
}
pkg/controller.v1/tensorflow/pod.go:74
解析某种角色的 Spec,输出根据序号排列的 podSlice[][]
// reconcilePods checks and updates pods for each given TFReplicaSpec.
// It will requeue the tfjob in case of an error while creating/deleting pods.
func (tc *TFController) reconcilePods(
tfjob *tfv1.TFJob,
pods []*v1.Pod,
rtype tfv1.TFReplicaType,
spec *common.ReplicaSpec, rstatus map[string]v1.PodPhase) error {
// Convert TFReplicaType to lower string.
rt := strings.ToLower(string(rtype))
logger := tflogger.LoggerForReplica(tfjob, rt)
// Get all pods for the type rt.
pods, err := tc.FilterPodsForReplicaType(pods, rt)
if err != nil {
return err
}
replicas := int(*spec.Replicas)
restart := false
worker0Completed := false
masterRole := false
initializeTFReplicaStatuses(tfjob, rtype)
podSlices := tc.GetPodSlices(pods, replicas, logger)
...
}
pkg/controller.v1/tensorflow/pod.go:75
判断角色是否为 Chief/Master 或 Worker,如果此刻 TFJob 中某种角色的某个 Index Pod 数量为0,则开始创建此 Index Pod
并设置 OwnerReference,将 Pod 与TFJob 进行级联
for index, podSlice := range podSlices {
masterRole = false
if len(podSlice) > 1 {
logger.Warningf("We have too many pods for %s %d", rt, index)
// TODO(gaocegege): Kill some pods.
} else if len(podSlice) == 0 {
logger.Infof("Need to create new pod: %s-%d", rt, index)
// if master pod is present, select the master pod
// if master is not present, first worker pod is selected as the master.
if ContainChieforMasterSpec(tfjob) {
if tfv1.IsChieforMaster(rtype) {
masterRole = true
}
} else {
if tfv1.IsWorker(rtype) && (index == 0) {
masterRole = true
}
}
err = tc.createNewPod(tfjob, rt, strconv.Itoa(index), spec, masterRole)
if err != nil {
return err
}
} else {
...
}
...
}
...
}
pkg/controller.v1/tensorflow/pod.go:213
创建 Pod 时设置相关 Containers ENV TF_CONFIG,Tensorflow 任务各个角色之间通过 TF_CONFIG 环境变量进行通信
func setClusterSpec(podTemplateSpec *v1.PodTemplateSpec, tfjob *tfv1.TFJob, rt, index string) error {
// Generate TF_CONFIG JSON string.
tfConfigStr, err := genTFConfigJSONStr(tfjob, rt, index)
if err != nil {
return err
}
if tfConfigStr == "" {
return nil
}
// Add TF_CONFIG environment variable.
for i := range podTemplateSpec.Spec.Containers {
if len(podTemplateSpec.Spec.Containers[i].Env) == 0 {
podTemplateSpec.Spec.Containers[i].Env = make([]v1.EnvVar, 0)
}
podTemplateSpec.Spec.Containers[i].Env = append(podTemplateSpec.Spec.Containers[i].Env, v1.EnvVar{
Name: tfConfig,
Value: tfConfigStr,
})
}
return nil
}
TF_CONFIG Cluster 字段与各个角色创建的 Headless Service 一一对应
- name: TF_CONFIG
value: '{"cluster":{"chief":["tplus-mix-tf-158698-qiwu-chief-0:2222"],"ps":["tplus-mix-tf-158698-qiwu-ps-0:2222","tplus-mix-tf-158698-qiwu-ps-1:2222","tplus-mix-tf-158698-qiwu-ps-2:2222","tplus-mix-tf-158698-qiwu-ps-3:2222","tplus-mix-tf-158698-qiwu-ps-4:2222","tplus-mix-tf-158698-qiwu-ps-5:2222","tplus-mix-tf-158698-qiwu-ps-6:2222","tplus-mix-tf-158698-qiwu-ps-7:2222"],"tplusmaster":["tplus-mix-tf-158698-qiwu-tplusmaster-0:2222"],"worker":["tplus-mix-tf-158698-qiwu-worker-0:2222","tplus-mix-tf-158698-qiwu-worker-1:2222","tplus-mix-tf-158698-qiwu-worker-2:2222","tplus-mix-tf-158698-qiwu-worker-3:2222","tplus-mix-tf-158698-qiwu-worker-4:2222"]},"task":{"type":"chief","index":0},"environment":"cloud"}'
如果启用了 EnableGangScheduling,则配置调度器为 Kube-Batch
// if gang-scheduling is enabled:
// 1. if user has specified other scheduler, we report a warning without overriding any fields.
// 2. if no SchedulerName is set for pods, then we set the SchedulerName to "kube-batch".
if tc.Config.EnableGangScheduling {
if isNonGangSchedulerSet(tfjob) {
errMsg := "Another scheduler is specified when gang-scheduling is enabled and it will not be overwritten"
logger.Warning(errMsg)
tc.Recorder.Event(tfjob, v1.EventTypeWarning, podTemplateSchedulerNameReason, errMsg)
} else {
podTemplate.Spec.SchedulerName = gangSchedulerName
}
}
pkg/controller.v1/tensorflow/status.go:61
更新 TFJob 资源状态,当 TFJob 中存在 Chief 或 Master 角色时,以 Chief 或 Master Pod 的副本状态表示整个 TFJob 状态
if ContainChieforMasterSpec(tfjob) {
if tfv1.IsChieforMaster(rtype) {
if running > 0 {
msg := fmt.Sprintf("TFJob %s is running.", tfjob.Name)
err := updateTFJobConditions(tfjob, common.JobRunning, tfJobRunningReason, msg)
if err != nil {
tflogger.LoggerForJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
}
}
if expected == 0 {
msg := fmt.Sprintf("TFJob %s successfully completed.", tfjob.Name)
tc.Recorder.Event(tfjob, v1.EventTypeNormal, tfJobSucceededReason, msg)
if tfjob.Status.CompletionTime == nil {
now := metav1.Now()
tfjob.Status.CompletionTime = &now
}
err := updateTFJobConditions(tfjob, common.JobSucceeded, tfJobSucceededReason, msg)
if err != nil {
tflogger.LoggerForJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
}
tfJobsSuccessCount.Inc()
}
}
}
当不包含 Chief 或 Master 角色时,以 Worker-0 副本的状态表示整个TFJob 状态
else {
if rtype == tfv1.TFReplicaTypeWorker {
// All workers are succeeded or worker 0 completed, leave a succeeded condition.
if expected == 0 || worker0Completed {
msg := fmt.Sprintf("TFJob %s successfully completed.", tfjob.Name)
tc.Recorder.Event(tfjob, v1.EventTypeNormal, tfJobSucceededReason, msg)
if tfjob.Status.CompletionTime == nil {
now := metav1.Now()
tfjob.Status.CompletionTime = &now
}
err := updateTFJobConditions(tfjob, common.JobSucceeded, tfJobSucceededReason, msg)
if err != nil {
tflogger.LoggerForJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
}
tfJobsSuccessCount.Inc()
} else if running > 0 {
// Some workers are still running, leave a running condition.
msg := fmt.Sprintf("TFJob %s is running.", tfjob.Name)
err := updateTFJobConditions(tfjob, common.JobRunning, tfJobRunningReason, msg)
if err != nil {
tflogger.LoggerForJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
}
}
}
}
pkg/controller.v1/tensorflow/controller.go:467
经过调和各个角色 Pod 和 Service 资源后,更新 TFJob 状态
// no need to update the tfjob if the status hasn't changed since last time.
if !reflect.DeepEqual(*oldStatus, tfjob.Status) {
return tc.updateStatusHandler(tfjob)
}
return nil
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!