// Webhook handler for git events (Note: cache timeouts are hardcoded because API server does not write to cache and not really using them) argoDB := db.NewDB(a.Namespace, a.settingsMgr, a.KubeClientset) acdWebhookHandler := webhook.NewHandler(a.Namespace, a.ArgoCDServerOpts.ApplicationNamespaces, a.AppClientset, a.settings, a.settingsMgr, repocache.NewCache(a.Cache.GetCache(), 24*time.Hour, 3*time.Minute), a.Cache, argoDB)
// requestAppRefresh adds a request for given app to the refresh queue. appName // needs to be the qualified name of the application, i.e. <namespace>/<name>. func(ctrl *ApplicationController)requestAppRefresh(appName string, compareWith *CompareWith, after *time.Duration) { key := ctrl.toAppKey(appName)
if compareWith != nil && after != nil { ctrl.appComparisonTypeRefreshQueue.AddAfter(fmt.Sprintf("%s/%d", key, compareWith), *after) } else { if compareWith != nil { ctrl.refreshRequestedAppsMutex.Lock() ctrl.refreshRequestedApps[key] = compareWith.Max(ctrl.refreshRequestedApps[key]) ctrl.refreshRequestedAppsMutex.Unlock() } if after != nil { ctrl.appRefreshQueue.AddAfter(key, *after) ctrl.appOperationQueue.AddAfter(key, *after) } else { ctrl.appRefreshQueue.Add(key) ctrl.appOperationQueue.Add(key) } } }
go ctrl.appInformer.Run(ctx.Done()) go ctrl.projInformer.Run(ctx.Done())
errors.CheckError(ctrl.stateCache.Init())
if !cache.WaitForCacheSync(ctx.Done(), ctrl.appInformer.HasSynced, ctrl.projInformer.HasSynced) { log.Error("Timed out waiting for caches to sync") return }
// autoSync will initiate a sync operation for an application configured with automated sync func(ctrl *ApplicationController)autoSync(app *appv1.Application, syncStatus *appv1.SyncStatus, resources []appv1.ResourceStatus) *appv1.ApplicationCondition
processAppRefreshQueueItem 最后将会调用 persistAppStatus 方法用于持久化,通过调用 k8s api 更新 applicaition 的 status。
// persistAppStatus persists updates to application status. If no changes were made, it is a no-op func(ctrl *ApplicationController)persistAppStatus(orig *appv1.Application, newStatus *appv1.ApplicationStatus) ``` ##### 2.3.5 processAppOperationQueueItem 跟 `processAppRefreshQueueItem` 类似,从 `appOperationQueue` 队列中拿到待执行同步操作的应用实例,判断该应用的 `Operation` 字段是否为空,如果不为空则执行 `processRequestedAppOperation`。`processRequestedAppOperation` 也会进行一些状态校验,比如是否正在同步中等,最终应用下资源的同步将由 `appStateManager` 实例的 `SyncAppState` 实现。 ```go func(ctrl *ApplicationController)processRequestedAppOperation(app *appv1.Application) { // ... if err := argo.ValidateDestination(context.Background(), &app.Spec.Destination, ctrl.db); err != nil { state.Phase = synccommon.OperationFailed state.Message = err.Error() } else { ctrl.appStateManager.SyncAppState(app, state) } // .... }
func(m *appStateManager)SyncAppState(app *v1alpha1.Application, state *v1alpha1.OperationState) { // Sync requests might be requested with ambiguous revisions (e.g. master, HEAD, v1.2.3). // This can change meaning when resuming operations (e.g a hook sync). After calculating a // concrete git commit SHA, the SHA is remembered in the status.operationState.syncResult field. // This ensures that when resuming an operation, we sync to the same revision that we initially // started with. // 省略... syncCtx, cleanup, err := sync.NewSyncContext( compareResult.syncStatus.Revision, reconciliationResult, restConfig, rawConfig, m.kubectl, app.Spec.Destination.Namespace, openAPISchema, opts..., )
if err != nil { state.Phase = common.OperationError state.Message = fmt.Sprintf("failed to initialize sync context: %v", err) return }
// SyncContext defines an interface that allows to execute sync operation step or terminate it. type SyncContext interface { // Terminate terminates sync operation. The method is asynchronous: it starts deletion is related K8S resources // such as in-flight resource hooks, updates operation status, and exists without waiting for resource completion. Terminate() // Executes next synchronization step and updates operation status. Sync() // Returns current sync operation state and information about resources synchronized so far. GetState() (common.OperationPhase, string, []common.ResourceSyncResult) }