Argo CD 源码解析之自动同步

Argo CD 的自动同步功能通过监控 Git 仓库中的更改来自动部署和更新应用程序。这确保了 Kubernetes 集群中的应用程序始终与 Git 仓库中的配置保持一致。开发团队只需将应用程序的描述和配置存储在 Git 仓库中,Argo CD 会根据这些信息自动部署和更新应用程序。

1.背景

Argo CD 是一个开源的持续部署工具,专为 Kubernetes 应用程序设计。它遵循 GitOps 原则,将 Git 仓库作为应用程序部署和基础设施管理的“单一真实来源”。架构上 Argo CD 采用基于组件的架构设计,将不同可部署单元的职责分开,以提高系统的灵活性、可维护性和可扩展性。

dependencies

有关架构的详细介绍可以阅读这篇文档

通过阅读 Argo CD 官方文档我们可以知道 Argo CD 每三分钟轮询一次 Git 存储库,以检测清单的更改。为了消除轮询延迟,Argo CD API server 支持 配置 Git Webhook

2. 源码解析

本文的源码基于2.6.0版本

2.1 main.go 入口函数

Argo CD 使用 cobra 来构建应用程序。通过 cmd 目录下的 main.go 入口函数,我们可以很轻易的找到每个组件。根据架构可知 API Server 是控制平面中的唯一入口。

command = apiserver.NewCommand()

2.2 argocd-server

阅读 ArgoCDServer 实例的 Run 方法,可以发现 ArgoCDServer 使用 cmux 库在多路复用,在同一端口上处理标准 HTTP 和 gRPC 请求。

grpcS, appResourceTreeFn := a.newGRPCServer() 
grpcWebS := grpcweb.WrapServer(grpcS)
// 省略...
httpS = a.newHTTPServer(ctx, a.ListenPort, grpcWebS, appResourceTreeFn, listeners.GatewayConn)

tcpm := cmux.New(listeners.Main)

if !a.useTLS() {
httpL = tcpm.Match(cmux.HTTP1Fast())
grpcL = tcpm.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))

} else {
// 省略...
}

go func() { a.checkServeErr("grpcS", grpcS.Serve(grpcL)) }()
go func() { a.checkServeErr("httpS", httpS.Serve(httpL)) }()

我在官方文档上找到 Argo CD 中如何实现身份验证(authn)和授权(authz)的一张图,可以发现当我们通过 Web 页面或者 CLI 调用 apiserver 的时候首先经过 cmux 检查匹配,如果请求是 http1.x 将由 http mux 处理,如果是 http2 并且 content-type: application/grpc 则由 grpc Server 处理。

apiserver

由于 Argo CD apiserver 绝大多数的 API 服务是通过 gRPC 实现的,所以这里引入了 gRPC Gateway 来将 gRPC 服务转换为 RESTful API。

2.2.1 ArgoCDWebhookHandler

配置 Git Webhook 中找到 webhook events 的 endpoint 是 /api/webhook 。是走的 http1.x,我们查看 ArgoCDServer 实例的 newHTTPServer 方法,路径为 “/api/webhook” 的 HTTP 请求映射的是 acdWebhookHandler.Handler

// Webhook handler for git events (Note: cache timeouts are hardcoded because API server does not write to cache and not really using them)
argoDB := db.NewDB(a.Namespace, a.settingsMgr, a.KubeClientset)
acdWebhookHandler := webhook.NewHandler(a.Namespace, a.ArgoCDServerOpts.ApplicationNamespaces, a.AppClientset, a.settings, a.settingsMgr, repocache.NewCache(a.Cache.GetCache(), 24*time.Hour, 3*time.Minute), a.Cache, argoDB)

mux.HandleFunc("/api/webhook", acdWebhookHandler.Handler)

继续往下看 ArgoCDWebhookHandlerHandle 方法的具体实现,根据请求的 Header 解析得到不同 Git 服务提供商的 Git 事件的数据,然后交给 HandleEvent 方法来处理,HandleEvent 经过一系列的校验检查后执行 RefreshApp 刷新应用。

// HandleEvent handles webhook events for repo push events
func (a *ArgoCDWebhookHandler) HandleEvent(payload interface{})
2.2.2 RefreshApp

注释写的很清楚,RefreshApp 通过更新应用的注解,强制控制器处理它。

// RefreshApp updates the refresh annotation of an application to coerce the controller to process it
func RefreshApp(appIf v1alpha1.ApplicationInterface, name string, refreshType argoappv1.RefreshType) (*argoappv1.Application, error) {
metadata := map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": map[string]string{
argoappv1.AnnotationKeyRefresh: string(refreshType),
},
},
}
var err error
patch, err := json.Marshal(metadata)
if err != nil {
return nil, fmt.Errorf("error marshaling metadata: %w", err)
}
for attempt := 0; attempt < 5; attempt++ {
app, err := appIf.Patch(context.Background(), name, types.MergePatchType, patch, metav1.PatchOptions{})
if err != nil {
if !apierr.IsConflict(err) {
return nil, fmt.Errorf("error patching annotations in application %q: %w", name, err)
}
} else {
log.Infof("Requested app '%s' refresh", name)
return app, nil
}
time.Sleep(100 * time.Millisecond)
}
return nil, err
}

2.3 application-controller

沿着之前的路径,从入口函数找到应用控制器的实现,appcontroller 中定义了默认的同步周期为180s。控制器通过 newApplicationInformerAndLister 创建 ApplicationInformer 监听应用的事件并加入到队列中。

const (
// Default time in seconds for application resync period
defaultAppResyncPeriod = 180
)
// 省略...
resyncDuration = time.Duration(appResyncPeriod) * time.Second
// 省略...
appController, err = controller.NewApplicationController(
namespace,
settingsMgr,
kubeClient,
appClient,
repoClientset,
cache,
kubectl,
resyncDuration,
hardResyncDuration,
time.Duration(selfHealTimeoutSeconds)*time.Second,
metricsPort,
metricsCacheExpiration,
metricsAplicationLabels,
kubectlParallelismLimit,
persistResourceHealth,
clusterFilter,
applicationNamespaces)
// 省略...
go appController.Run(ctx, statusProcessors, operationProcessors)
2.3.1 newApplicationInformerAndLister

上文中 RefreshApp 更新应用的注解将会产生一个 Update Event,将会走到 requestAppRefresh

informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if !ctrl.canProcessApp(obj) {
return
}
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
ctrl.appRefreshQueue.Add(key)
ctrl.appOperationQueue.Add(key)
}
},
UpdateFunc: func(old, new interface{}) {
if !ctrl.canProcessApp(new) {
return
}

key, err := cache.MetaNamespaceKeyFunc(new)
if err != nil {
return
}
var compareWith *CompareWith
oldApp, oldOK := old.(*appv1.Application)
newApp, newOK := new.(*appv1.Application)
if oldOK && newOK && automatedSyncEnabled(oldApp, newApp) {
log.WithField("application", newApp.QualifiedName()).Info("Enabled automated sync")
compareWith = CompareWithLatest.Pointer()
}
ctrl.requestAppRefresh(newApp.QualifiedName(), compareWith, nil)
ctrl.appOperationQueue.Add(key)
},
DeleteFunc: func(obj interface{}) {
if !ctrl.canProcessApp(obj) {
return
}
// IndexerInformer uses a delta queue, therefore for deletes we have to use this
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
ctrl.appRefreshQueue.Add(key)
}
},
},
)
2.3.2 requestAppRefresh

compareWithCompareWithLatestafternil。requestAppRefresh 方法将会在 appRefreshQueueappOperationQueue 队列中添加该更新事件。

// requestAppRefresh adds a request for given app to the refresh queue. appName
// needs to be the qualified name of the application, i.e. <namespace>/<name>.
func (ctrl *ApplicationController) requestAppRefresh(appName string, compareWith *CompareWith, after *time.Duration) {
key := ctrl.toAppKey(appName)

if compareWith != nil && after != nil {
ctrl.appComparisonTypeRefreshQueue.AddAfter(fmt.Sprintf("%s/%d", key, compareWith), *after)
} else {
if compareWith != nil {
ctrl.refreshRequestedAppsMutex.Lock()
ctrl.refreshRequestedApps[key] = compareWith.Max(ctrl.refreshRequestedApps[key])
ctrl.refreshRequestedAppsMutex.Unlock()
}
if after != nil {
ctrl.appRefreshQueue.AddAfter(key, *after)
ctrl.appOperationQueue.AddAfter(key, *after)
} else {
ctrl.appRefreshQueue.Add(key)
ctrl.appOperationQueue.Add(key)
}
}
}
2.3.3 run

控制器使用两个单独的队列来处理应用的协调(appRefreshQueue)和同步(appOperationQueue),这两个队列分别通过 processAppRefreshQueueItemprocessAppOperationQueueItem 来处理。

statusProcessorsoperationProcessors 来控制启动协程的数量。

// Run starts the Application CRD controller.
func (ctrl *ApplicationController) Run(ctx context.Context, statusProcessors int, operationProcessors int) {
defer runtime.HandleCrash()
defer ctrl.appRefreshQueue.ShutDown()
defer ctrl.appComparisonTypeRefreshQueue.ShutDown()
defer ctrl.appOperationQueue.ShutDown()
defer ctrl.projectRefreshQueue.ShutDown()

ctrl.metricsServer.RegisterClustersInfoSource(ctx, ctrl.stateCache)
ctrl.RegisterClusterSecretUpdater(ctx)

go ctrl.appInformer.Run(ctx.Done())
go ctrl.projInformer.Run(ctx.Done())

errors.CheckError(ctrl.stateCache.Init())

if !cache.WaitForCacheSync(ctx.Done(), ctrl.appInformer.HasSynced, ctrl.projInformer.HasSynced) {
log.Error("Timed out waiting for caches to sync")
return
}

go func() { errors.CheckError(ctrl.stateCache.Run(ctx)) }()
go func() { errors.CheckError(ctrl.metricsServer.ListenAndServe()) }()

for i := 0; i < statusProcessors; i++ {
go wait.Until(func() {
for ctrl.processAppRefreshQueueItem() {
}
}, time.Second, ctx.Done())
}

for i := 0; i < operationProcessors; i++ {
go wait.Until(func() {
for ctrl.processAppOperationQueueItem() {
}
}, time.Second, ctx.Done())
}

go wait.Until(func() {
for ctrl.processAppComparisonTypeQueueItem() {
}
}, time.Second, ctx.Done())

go wait.Until(func() {
for ctrl.processProjectQueueItem() {
}
}, time.Second, ctx.Done())
<-ctx.Done()
}
2.3.4 processAppRefreshQueueItem

appRefreshQueue 获取到上文中更新注解的事件后调用 needRefreshAppStatusneedRefresh, refreshType, comparisonLevel = true, RefreshTypeNormal, CompareWithLatestForceResolve。然后通过 CompareAppState 使用指定的版本和提供的源来比较应用程序 git 状态与实时应用程序状态。

appKey, shutdown := ctrl.appRefreshQueue.Get()

obj, exists, err := ctrl.appInformer.GetIndexer().GetByKey(appKey.(string))

origApp, ok := obj.(*appv1.Application)

needRefresh, refreshType, comparisonLevel := ctrl.needRefreshAppStatus(origApp, ctrl.statusRefreshTimeout, ctrl.statusHardRefreshTimeout)

compareResult := ctrl.appStateManager.CompareAppState(app, project, revisions, sources,
refreshType == appv1.RefreshTypeHard,
comparisonLevel == CompareWithLatestForceResolve, localManifests, hasMultipleSources)

上文提到的三分钟定时轮训也是在 needRefreshAppStatus 中实现。

CompareAppState 方法中会调用 appStateManager 实例的 getRepoObjs 来获取 Git 仓库中渲染出的清单文件。getRepoObjs 通过 gRPC 调用 reposerver 的 GenerateManifest 方法获取渲染出的清单文件。

// AppStateManager defines methods which allow to compare application spec and actual application state.
type AppStateManager interface {
CompareAppState(app *v1alpha1.Application, project *appv1.AppProject, revisions []string, sources []v1alpha1.ApplicationSource, noCache bool, noRevisionCache bool, localObjects []string, hasMultipleSources bool) *comparisonResult
SyncAppState(app *v1alpha1.Application, state *v1alpha1.OperationState)
}
// repoClientset 初始化后层层传递至 `AppStateManager` 实例中
repoClientset := apiclient.NewRepoServerClientset(repoServerAddress, repoServerTimeoutSeconds, tlsConfig)

// getRepoObjs 通过 gRPC 调用 reposerver 的 GenerateManifest 方法
manifestInfo, err := repoClient.GenerateManifest(context.Background(), &apiclient.ManifestRequest{
Repo: repo,
Repos: permittedHelmRepos,
Revision: revisions[i],
NoCache: noCache,
NoRevisionCache: noRevisionCache,
AppLabelKey: appLabelKey,
AppName: app.InstanceName(m.namespace),
Namespace: app.Spec.Destination.Namespace,
ApplicationSource: &source,
Plugins: tools,
KustomizeOptions: kustomizeOptions,
KubeVersion: serverVersion,
ApiVersions: argo.APIResourcesToStrings(apiResources, true),
VerifySignature: verifySignature,
HelmRepoCreds: permittedHelmCredentials,
TrackingMethod: string(argo.GetTrackingMethod(m.settingsMgr)),
EnabledSourceTypes: enabledSourceTypes,
HelmOptions: helmOptions,
HasMultipleSources: app.Spec.HasMultipleSources(),
RefSources: refSources,
})

得到 compareResult 后会调用 autoSync 方法,如果应用开启了自动同步,将会更新 ApplicationOperation ,来启动同步操作。

// autoSync will initiate a sync operation for an application configured with automated sync
func (ctrl *ApplicationController) autoSync(app *appv1.Application, syncStatus *appv1.SyncStatus, resources []appv1.ResourceStatus) *appv1.ApplicationCondition

processAppRefreshQueueItem 最后将会调用 persistAppStatus 方法用于持久化,通过调用 k8s api 更新 applicaition 的 status。

// persistAppStatus persists updates to application status. If no changes were made, it is a no-op
func (ctrl *ApplicationController) persistAppStatus(orig *appv1.Application, newStatus *appv1.ApplicationStatus)
```

##### 2.3.5 processAppOperationQueueItem
跟 `processAppRefreshQueueItem` 类似,从 `appOperationQueue` 队列中拿到待执行同步操作的应用实例,判断该应用的 `Operation` 字段是否为空,如果不为空则执行 `processRequestedAppOperation`。`processRequestedAppOperation` 也会进行一些状态校验,比如是否正在同步中等,最终应用下资源的同步将由 `appStateManager` 实例的 `SyncAppState` 实现。

```go
func (ctrl *ApplicationController) processRequestedAppOperation(app *appv1.Application) {
// ...
if err := argo.ValidateDestination(context.Background(), &app.Spec.Destination, ctrl.db); err != nil {
state.Phase = synccommon.OperationFailed
state.Message = err.Error()
} else {
ctrl.appStateManager.SyncAppState(app, state)
}
// ....
}
func (m *appStateManager) SyncAppState(app *v1alpha1.Application, state *v1alpha1.OperationState) {
// Sync requests might be requested with ambiguous revisions (e.g. master, HEAD, v1.2.3).
// This can change meaning when resuming operations (e.g a hook sync). After calculating a
// concrete git commit SHA, the SHA is remembered in the status.operationState.syncResult field.
// This ensures that when resuming an operation, we sync to the same revision that we initially
// started with.

// 省略...
syncCtx, cleanup, err := sync.NewSyncContext(
compareResult.syncStatus.Revision,
reconciliationResult,
restConfig,
rawConfig,
m.kubectl,
app.Spec.Destination.Namespace,
openAPISchema,
opts...,
)

if err != nil {
state.Phase = common.OperationError
state.Message = fmt.Sprintf("failed to initialize sync context: %v", err)
return
}

defer cleanup()

start := time.Now()

if state.Phase == common.OperationTerminating {
syncCtx.Terminate()
} else {
syncCtx.Sync()
}
// 省略...
}

syncCtx 我们这里看接口定义大致了解即可,具体实现这里就不再展开了。

// SyncContext defines an interface that allows to execute sync operation step or terminate it.
type SyncContext interface {
// Terminate terminates sync operation. The method is asynchronous: it starts deletion is related K8S resources
// such as in-flight resource hooks, updates operation status, and exists without waiting for resource completion.
Terminate()
// Executes next synchronization step and updates operation status.
Sync()
// Returns current sync operation state and information about resources synchronized so far.
GetState() (common.OperationPhase, string, []common.ResourceSyncResult)
}

这里的 SyncContext 是由 gitops-engine 库实现。

3. 源码流程图

request

4. 总结

自动同步是 Argo CD 的核心功能,了解其底层实现原理和源码有助于拓展技术视野,深入理解 Argo CD 的工作原理,并在遇到问题时提供解决方案。

参考文档