Argo CD 消息推送之 SSE

服务器发送事件 (Server-Sent Events) 是一种服务器推送技术,使客户端能够通过 HTTP 连接从服务器接收自动更新,并描述了在建立初始客户端连接后服务器如何向客户端发起数据传输。

1.背景

在使用Argo CD的时候可以发现页面可以实时刷新应用的状态,比如同步中,那Argo CD是如何实现消息的实时推送的呢?本文将简要介绍Argo CD使用到的消息推送技术:SSE。

2. 源码解析

通过浏览器开发者工具可以发现前端请求的地址是 stream/applications,响应头表示这是一个服务器发送事件(Server-Sent Events,SSE)。

本文的源码基于2.6.0版本


Content-Type: text/event-stream:这个响应头告诉客户端,服务器将发送的数据是以文本格式的事件流(Event Stream)。

2.1 客户端实现

Argo CD的前端使用的是 React,相关代码 application-details.tsx 实现如下:

private loadAppInfo(name: string, appNamespace: string): Observable<{application: appModels.Application; tree: appModels.ApplicationTree}> {
return from(services.applications.get(name, appNamespace))
.pipe(
mergeMap(app => {
const fallbackTree = {
nodes: app.status.resources.map(res => ({...res, parentRefs: [], info: [], resourceVersion: '', uid: ''})),
orphanedNodes: [],
hosts: []
} as appModels.ApplicationTree;
return combineLatest(
merge(
from([app]),
this.appChanged.pipe(filter(item => !!item)),
AppUtils.handlePageVisibility(() =>
services.applications
.watch({name, appNamespace})
.pipe(
map(watchEvent => {
if (watchEvent.type === 'DELETED') {
this.onAppDeleted();
}
return watchEvent.application;
})
)
.pipe(repeat())
.pipe(retryWhen(errors => errors.pipe(delay(500))))
)
),
merge(
from([fallbackTree]),
services.applications.resourceTree(name, appNamespace).catch(() => fallbackTree),
AppUtils.handlePageVisibility(() =>
services.applications
.watchResourceTree(name, appNamespace)
.pipe(repeat())
.pipe(retryWhen(errors => errors.pipe(delay(500))))
)
)
);
})
)
.pipe(filter(([application, tree]) => !!application && !!tree))
.pipe(map(([application, tree]) => ({application, tree})));
}

loadAppInfo 方法的主要目的是从服务器加载应用信息,并在页面可见时实时更新。它返回一个 Observable,包含应用信息 services.applications.watch 和应用资源树 services.applications.watchResourceTree。我们继续查看应用信息的 watch 函数实现。

public watch(query?: {name?: string; resourceVersion?: string; projects?: string[]; appNamespace?: string}, options?: QueryOptions): Observable<models.ApplicationWatchEvent> {
const search = new URLSearchParams();
if (query) {
if (query.name) {
search.set('name', query.name);
}
if (query.resourceVersion) {
search.set('resourceVersion', query.resourceVersion);
}
if (query.appNamespace) {
search.set('appNamespace', query.appNamespace);
}
}
if (options) {
const searchOptions = optionsToSearch(options);
search.set('fields', searchOptions.fields);
search.set('selector', searchOptions.selector);
search.set('appNamespace', searchOptions.appNamespace);
query?.projects?.forEach(project => search.append('projects', project));
}
const searchStr = search.toString();
const url = `/stream/applications${(searchStr && '?' + searchStr) || ''}`;
return requests
.loadEventSource(url)
.pipe(repeat())
.pipe(retry())
.pipe(map(data => JSON.parse(data).result as models.ApplicationWatchEvent))
.pipe(
map(watchEvent => {
watchEvent.application = this.parseAppFields(watchEvent.application);
return watchEvent;
})
);
}

可以看到该函数使用 requests.loadEventSource(url) 加载服务器发送的事件流(Event Stream),请求地址为 /stream/applicationsloadEventSource 的实现是基于 EventSource 对象的, EventSource 实例设置 onmessage 事件处理程序,当接收到新消息时,调用 observer.next(msg.data) 将消息数据推送到 Observable 实现页面的实时刷新。在文档 Using server-sent events 中有介绍, 这里就不详细介绍了。

export default {
loadEventSource(url: string): Observable<string> {
return Observable.create((observer: Observer<any>) => {
let eventSource = new EventSource(`${apiRoot()}${url}`);
eventSource.onmessage = msg => observer.next(msg.data);
eventSource.onerror = e => () => {
observer.error(e);
onError.next(e);
};

// EventSource does not provide easy way to get notification when connection closed.
// check readyState periodically instead.
const interval = setInterval(() => {
if (eventSource && eventSource.readyState === ReadyState.CLOSED) {
observer.error('connection got closed unexpectedly');
}
}, 500);
return () => {
clearInterval(interval);
eventSource.close();
eventSource = null;
};
});
}
};

2.2 服务端实现

Argo CD的后端使用的是 Go,相关代码 application.go 实现如下:

type ApplicationService_WatchServer interface {
Send(*v1alpha1.ApplicationWatchEvent) error
grpc.ServerStream
}

func (s *Server) Watch(q *application.ApplicationQuery, ws application.ApplicationService_WatchServer) error {
appName := q.GetName()
appNs := s.appNamespaceOrDefault(q.GetAppNamespace())
logCtx := log.NewEntry(log.New())
if q.Name != nil {
logCtx = logCtx.WithField("application", *q.Name)
}
projects := map[string]bool{}
for i := range q.Projects {
projects[q.Projects[i]] = true
}
claims := ws.Context().Value("claims")
selector, err := labels.Parse(q.GetSelector())
if err != nil {
return fmt.Errorf("error parsing labels with selectors: %w", err)
}
minVersion := 0
if q.GetResourceVersion() != "" {
if minVersion, err = strconv.Atoi(q.GetResourceVersion()); err != nil {
minVersion = 0
}
}

// sendIfPermitted is a helper to send the application to the client's streaming channel if the
// caller has RBAC privileges permissions to view it
sendIfPermitted := func(a appv1.Application, eventType watch.EventType) {
if len(projects) > 0 && !projects[a.Spec.GetProject()] {
return
}

if appVersion, err := strconv.Atoi(a.ResourceVersion); err == nil && appVersion < minVersion {
return
}
matchedEvent := (appName == "" || (a.Name == appName && a.Namespace == appNs)) && selector.Matches(labels.Set(a.Labels))
if !matchedEvent {
return
}

if !s.enf.Enforce(claims, rbacpolicy.ResourceApplications, rbacpolicy.ActionGet, a.RBACName(s.ns)) {
// do not emit apps user does not have accessing
return
}
s.inferResourcesStatusHealth(&a)
err := ws.Send(&appv1.ApplicationWatchEvent{
Type: eventType,
Application: a,
})
if err != nil {
logCtx.Warnf("Unable to send stream message: %v", err)
return
}
}

events := make(chan *appv1.ApplicationWatchEvent, watchAPIBufferSize)
// Mimic watch API behavior: send ADDED events if no resource version provided
// If watch API is executed for one application when emit event even if resource version is provided
// This is required since single app watch API is used for during operations like app syncing and it is
// critical to never miss events.
if q.GetResourceVersion() == "" || q.GetName() != "" {
apps, err := s.appLister.List(selector)
if err != nil {
return fmt.Errorf("error listing apps with selector: %w", err)
}
sort.Slice(apps, func(i, j int) bool {
return apps[i].QualifiedName() < apps[j].QualifiedName()
})
for i := range apps {
sendIfPermitted(*apps[i], watch.Added)
}
}
unsubscribe := s.appBroadcaster.Subscribe(events)
defer unsubscribe()
for {
select {
case event := <-events:
sendIfPermitted(event.Application, event.Type)
case <-ws.Context().Done():
return nil
}
}
}

在这段代码中,ws 用于发送实时更新的应用程序事件。当有新的事件发生时,sendIfPermitted 函数会将事件发送到客户端。

NOTE: ws 是一个实现了 ApplicationService_WatchServer 接口的对象。ApplicationService_WatchServer 接口继承了 grpc.ServerStream 接口,并定义了一个名为 Send 的方法。这意味着 ws 可以用作服务器端的流,并具有发送 ApplicationWatchEvent 类型消息的能力。

3 实现一个简单的SSE接口服务样例

相关代码地址,实现如下效果,页面上通过 sse 获取数据(当前时间和推送人),后端服务由 grpc|grpc-gateway stream 推送。

NOTE: 后续补充更新中

3.1 使用 protobuf 定义 gRPC 服务

新建一个项目sse-demo,在项目目录下执行go mod init命令完成go module初始化。 在项目目录下创建一个 proto/chat.proto 文件,其内容如下:

syntax = "proto3";
option go_package = "github.com/blazehu/sse-demo/server/apiclient/chat";

package chat;

import "google/api/annotations.proto";
import "google/protobuf/empty.proto";

service ChatService {
rpc Chat(google.protobuf.Empty) returns (stream Message) {
option (google.api.http) = {
get: "/api/v1/stream/chat"
};
}
}

message Message {
string user = 1;
string content = 2;
}

3.2 生成代码

这用 buf 生成代码,首先在项目目录下执行 buf mod init 生成 buf.yaml 文件,我们需要修改该文件内容如下:

version: v1
name: buf.build/blazehu/sse-demo
deps:
- buf.build/googleapis/googleapis
breaking:
use:
- FILE
lint:
use:
- DEFAULT

NOTE: 这里主要是添加依赖项 googleapis,相关详细文档

修改完成后执行 buf mod update 来选择要使用的依赖项的版本。 然后我们创建一个 buf.gen.yaml 文件用于生成存根。buf.gen.yaml文件内容如下:

version: v1
plugins:
- plugin: go
out: gen
opt: paths=source_relative
- plugin: go-grpc
out: gen
opt: paths=source_relative,require_unimplemented_servers=false
- plugin: grpc-gateway
out: gen
opt: paths=source_relative

这时候我们可以通过 buf generate 生成存根,也可以使用 protoc 来生成存根,这里就不展开说明。

$ buf generate

这时候将会由如下的目录结构:

├── buf.gen.yaml
├── buf.lock
├── buf.yaml
├── gen
│   └── proto
│   ├── chat.pb.go
│   ├── chat.pb.gw.go
│   └── chat_grpc.pb.go
├── go.mod
└── proto
└── chat.proto

3.3 实现 GRPC 服务

上述生成 pbgrpc 相关代码后,实现一个 gRPC Server 服务,相关代码如下:

package main

import (
"github.com/blazehu/sse-demo/gen/proto"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
"log"
"net"
"time"
)

// Server provides chat service
type Server struct {
chat.UnimplementedChatServiceServer
}

// Chat returns chat content
func (s *Server) Chat(_ *emptypb.Empty, stream chat.ChatService_ChatServer) error {
for {
msg := chat.Message{
User: "blazehu",
Content: time.Now().Format(time.RFC3339),
}
if err := stream.Send(&msg); err != nil {
return err
}
time.Sleep(time.Second * 1)
}
}

func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

var opts []grpc.ServerOption
grpcServer := grpc.NewServer(opts...)
chat.RegisterChatServiceServer(grpcServer, &Server{})
grpcServer.Serve(lis)
}

代码逻辑非常简单,定义 Server 对象然后实现了相关的 Chat 方法,然后 main 函数注册并启动了一个 gRPC Server 服务。

NOTE: Chat 方法是每隔一秒钟就发送一条消息,该消息内容就是当前的时间。

3.4 实现 HTTP 服务

新增 main.go 文件,在 main.go 文件中添加和启动 gRPC-Gateway mux相关代码如下:

package main

import (
"context"
"github.com/blazehu/sse-demo/gen/proto"
"github.com/blazehu/sse-demo/util"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
googleproto "google.golang.org/protobuf/proto"
"log"
"net/http"
)

const (
grpcEndpoint = "localhost:50051"
httpPort = ":8080"
)

func myFilter(ctx context.Context, w http.ResponseWriter, resp googleproto.Message) error {
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Cache-Control", "no-cache")
return nil
}

func main() {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// 创建 gRPC 连接
var opts []grpc.DialOption
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
grpcConn, err := grpc.DialContext(ctx, grpcEndpoint, opts...)
if err != nil {
log.Fatalf("Failed to dial server: %v", err)
}
defer grpcConn.Close()

// 创建 gRPC-Gateway 服务器
gwmux := runtime.NewServeMux(
runtime.WithMarshalerOption(runtime.MIMEWildcard, util.NewCustomTranscoder(&runtime.JSONPb{})),
runtime.WithForwardResponseOption(myFilter),
)
err = chat.RegisterChatServiceHandler(ctx, gwmux, grpcConn)
if err != nil {
log.Fatalf("Failed to register gateway: %v", err)
}

// 设置 CORS 策略
mux := http.NewServeMux()
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization")
if r.Method == "OPTIONS" {
w.WriteHeader(http.StatusOK)
return
}
gwmux.ServeHTTP(w, r)
})
mux.Handle("/", handler)

// 启动 gRPC-Gateway 服务器
log.Printf("Starting gRPC-Gateway on %s", httpPort)
if err := http.ListenAndServe(httpPort, mux); err != nil {
log.Fatalf("Failed to serve: %v", err)
}
}

因为前端 sse 访问需要跨域所以需要设置 CORS 策略,并且返回的消息类型为 text/event-stream,所以我们这里自定义一个 CustomTranscoder,它将 JSON 转换为 text/event-stream 格式。相关实现如下:

package util

import (
"bytes"
"encoding/json"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"google.golang.org/protobuf/proto"
"io"
)

type CustomTranscoder struct {
marshaler runtime.Marshaler
}

func NewCustomTranscoder(marshaler runtime.Marshaler) *CustomTranscoder {
return &CustomTranscoder{marshaler: marshaler}
}

func (c *CustomTranscoder) ContentType(_ interface{}) string {
return "text/event-stream"
}

func (c *CustomTranscoder) Marshal(v interface{}) ([]byte, error) {
var jsonBytes []byte
var err error

if pb, ok := v.(proto.Message); ok {
// Marshal message to JSON
jsonBytes, err = c.marshaler.Marshal(pb)
} else {
// If not a proto.Message, try to marshal it as a regular JSON object
jsonBytes, err = json.Marshal(v)
}

if err != nil {
return nil, err
}

var buf bytes.Buffer
buf.WriteString("data: ")
buf.Write(jsonBytes)
buf.WriteString("\n\n")

return buf.Bytes(), nil
}

func (c *CustomTranscoder) Unmarshal(data []byte, v interface{}) error {
return c.marshaler.Unmarshal(data, v)
}

func (c *CustomTranscoder) NewDecoder(r io.Reader) runtime.Decoder {
return c.marshaler.NewDecoder(r)
}

func (c *CustomTranscoder) NewEncoder(w io.Writer) runtime.Encoder {
return c.marshaler.NewEncoder(w)
}

NOTE: 这里需要注意输入值转化为 proto.Message 类型可能会失败,所以我们在 Marshal 方法中添加一个检查,以确保输入值是一个 proto.Message 类型。如果不是,我们可以尝试将其序列化为 JSON

3.5 测试

同时启动 gRPC ServerHTTP Server,然后浏览器访问地址: http://127.0.0.1:8080/api/v1/stream/chat ,可以发现浏览器将会每秒刷新一条数据。

img.png

3.6 前端实现

前端使用 vue2,这里就不详细说明了,页面通过 EventSource 与服务端建立通信,然后一直在接收服务端推送的消息,并将消息更新至页面上。

mounted() {
this.loadData();
},
methods: {
async loadData() {
const vm = this;
const eventSource = new EventSource("http://127.0.0.1:8080/api/v1/stream/chat");
eventSource.onopen = function () {
console.log('connect eventSource success.');
};
eventSource.onmessage = (e) => {
const data = JSON.parse(e.data);
vm.msg = `${data.result.content} - ${data.result.user} send`;
};
eventSource.onerror = function () {
console.log('connect eventSource failed.');
};
this.$forceUpdate();
},
}

参考资料