服务器发送事件 (Server-Sent Events) 是一种服务器推送技术,使客户端能够通过 HTTP 连接从服务器接收自动更新,并描述了在建立初始客户端连接后服务器如何向客户端发起数据传输。
1.背景 在使用Argo CD的时候可以发现页面可以实时刷新应用的状态,比如同步中,那Argo CD是如何实现消息的实时推送的呢?本文将简要介绍Argo CD使用到的消息推送技术:SSE。
2. 源码解析 通过浏览器开发者工具可以发现前端请求的地址是 stream/applications
,响应头表示这是一个服务器发送事件(Server-Sent Events,SSE)。
本文的源码基于2.6.0版本
Content-Type: text/event-stream:这个响应头告诉客户端,服务器将发送的数据是以文本格式的事件流(Event Stream)。
2.1 客户端实现 Argo CD的前端使用的是 React,相关代码 application-details.tsx 实现如下:
private loadAppInfo(name: string, appNamespace : string): Observable<{application : appModels.Application; tree: appModels.ApplicationTree}> { return from (services.applications.get(name, appNamespace)) .pipe( mergeMap(app => { const fallbackTree = { nodes: app.status.resources.map(res => ({...res, parentRefs : [], info : [], resourceVersion : '' , uid : '' })), orphanedNodes: [], hosts: [] } as appModels.ApplicationTree; return combineLatest( merge( from ([app]), this .appChanged.pipe(filter(item => !!item)), AppUtils.handlePageVisibility(() => services.applications .watch({name, appNamespace}) .pipe( map(watchEvent => { if (watchEvent.type === 'DELETED' ) { this .onAppDeleted(); } return watchEvent.application; }) ) .pipe(repeat()) .pipe(retryWhen(errors => errors.pipe(delay(500 )))) ) ), merge( from ([fallbackTree]), services.applications.resourceTree(name, appNamespace).catch(() => fallbackTree), AppUtils.handlePageVisibility(() => services.applications .watchResourceTree(name, appNamespace) .pipe(repeat()) .pipe(retryWhen(errors => errors.pipe(delay(500 )))) ) ) ); }) ) .pipe(filter(([application, tree] ) => !!application && !!tree)) .pipe(map(([application, tree] ) => ({application, tree}))); }
loadAppInfo
方法的主要目的是从服务器加载应用信息,并在页面可见时实时更新。它返回一个 Observable,包含应用信息 services.applications.watch
和应用资源树 services.applications.watchResourceTree
。我们继续查看应用信息的 watch
函数实现。
public watch(query?: {name?: string; resourceVersion?: string; projects?: string[]; appNamespace?: string}, options?: QueryOptions): Observable<models.ApplicationWatchEvent> { const search = new URLSearchParams(); if (query) { if (query.name) { search.set('name' , query.name); } if (query.resourceVersion) { search.set('resourceVersion' , query.resourceVersion); } if (query.appNamespace) { search.set('appNamespace' , query.appNamespace); } } if (options) { const searchOptions = optionsToSearch(options); search.set('fields' , searchOptions.fields); search.set('selector' , searchOptions.selector); search.set('appNamespace' , searchOptions.appNamespace); query?.projects?.forEach(project => search.append('projects' , project)); } const searchStr = search.toString(); const url = `/stream/applications${(searchStr && '?' + searchStr) || '' } ` ; return requests .loadEventSource(url) .pipe(repeat()) .pipe(retry()) .pipe(map(data => JSON .parse(data).result as models.ApplicationWatchEvent)) .pipe( map(watchEvent => { watchEvent.application = this .parseAppFields(watchEvent.application); return watchEvent; }) ); }
可以看到该函数使用 requests.loadEventSource(url)
加载服务器发送的事件流(Event Stream),请求地址为 /stream/applications
。loadEventSource
的实现是基于 EventSource
对象的, EventSource
实例设置 onmessage
事件处理程序,当接收到新消息时,调用 observer.next(msg.data)
将消息数据推送到 Observable
实现页面的实时刷新。在文档 Using server-sent events 中有介绍, 这里就不详细介绍了。
export default { loadEventSource(url: string): Observable<string> { return Observable.create((observer: Observer<any> ) => { let eventSource = new EventSource(`${apiRoot()} ${url} ` ); eventSource.onmessage = msg => observer.next(msg.data); eventSource.onerror = e => () => { observer.error(e); onError.next(e); }; const interval = setInterval(() => { if (eventSource && eventSource.readyState === ReadyState.CLOSED) { observer.error('connection got closed unexpectedly' ); } }, 500 ); return () => { clearInterval(interval); eventSource.close(); eventSource = null ; }; }); } };
2.2 服务端实现 Argo CD的后端使用的是 Go,相关代码 application.go 实现如下:
type ApplicationService_WatchServer interface { Send(*v1alpha1.ApplicationWatchEvent) error grpc.ServerStream } func (s *Server) Watch (q *application.ApplicationQuery, ws application.ApplicationService_WatchServer) error { appName := q.GetName() appNs := s.appNamespaceOrDefault(q.GetAppNamespace()) logCtx := log.NewEntry(log.New()) if q.Name != nil { logCtx = logCtx.WithField("application" , *q.Name) } projects := map [string ]bool {} for i := range q.Projects { projects[q.Projects[i]] = true } claims := ws.Context().Value("claims" ) selector, err := labels.Parse(q.GetSelector()) if err != nil { return fmt.Errorf("error parsing labels with selectors: %w" , err) } minVersion := 0 if q.GetResourceVersion() != "" { if minVersion, err = strconv.Atoi(q.GetResourceVersion()); err != nil { minVersion = 0 } } sendIfPermitted := func (a appv1.Application, eventType watch.EventType) { if len (projects) > 0 && !projects[a.Spec.GetProject()] { return } if appVersion, err := strconv.Atoi(a.ResourceVersion); err == nil && appVersion < minVersion { return } matchedEvent := (appName == "" || (a.Name == appName && a.Namespace == appNs)) && selector.Matches(labels.Set(a.Labels)) if !matchedEvent { return } if !s.enf.Enforce(claims, rbacpolicy.ResourceApplications, rbacpolicy.ActionGet, a.RBACName(s.ns)) { return } s.inferResourcesStatusHealth(&a) err := ws.Send(&appv1.ApplicationWatchEvent{ Type: eventType, Application: a, }) if err != nil { logCtx.Warnf("Unable to send stream message: %v" , err) return } } events := make (chan *appv1.ApplicationWatchEvent, watchAPIBufferSize) if q.GetResourceVersion() == "" || q.GetName() != "" { apps, err := s.appLister.List(selector) if err != nil { return fmt.Errorf("error listing apps with selector: %w" , err) } sort.Slice(apps, func (i, j int ) bool { return apps[i].QualifiedName() < apps[j].QualifiedName() }) for i := range apps { sendIfPermitted(*apps[i], watch.Added) } } unsubscribe := s.appBroadcaster.Subscribe(events) defer unsubscribe() for { select { case event := <-events: sendIfPermitted(event.Application, event.Type) case <-ws.Context().Done(): return nil } } }
在这段代码中,ws
用于发送实时更新的应用程序事件。当有新的事件发生时,sendIfPermitted
函数会将事件发送到客户端。
NOTE: ws
是一个实现了 ApplicationService_WatchServer
接口的对象。ApplicationService_WatchServer
接口继承了 grpc.ServerStream
接口,并定义了一个名为 Send
的方法。这意味着 ws
可以用作服务器端的流,并具有发送 ApplicationWatchEvent
类型消息的能力。
3 实现一个简单的SSE接口服务样例 相关代码地址 ,实现如下效果,页面上通过 sse 获取数据(当前时间和推送人),后端服务由 grpc|grpc-gateway stream 推送。
NOTE: 后续补充更新中
3.1 使用 protobuf 定义 gRPC 服务 新建一个项目sse-demo ,在项目目录下执行go mod init
命令完成go module
初始化。 在项目目录下创建一个 proto/chat.proto 文件,其内容如下:
syntax = "proto3" ; option go_package = "github.com/blazehu/sse-demo/server/apiclient/chat" ;package chat;import "google/api/annotations.proto" ;import "google/protobuf/empty.proto" ;service ChatService { rpc Chat(google.protobuf.Empty) returns (stream Message) { option (google.api.http) = { get: "/api/v1/stream/chat" } ; } } message Message { string user = 1 ; string content = 2 ; }
3.2 生成代码 这用 buf 生成代码,首先在项目目录下执行 buf mod init
生成 buf.yaml
文件,我们需要修改该文件内容如下:
version: v1 name: buf.build/blazehu/sse-demo deps: - buf.build/googleapis/googleapis breaking: use: - FILE lint: use: - DEFAULT
NOTE: 这里主要是添加依赖项 googleapis,相关详细文档 。
修改完成后执行 buf mod update
来选择要使用的依赖项的版本。 然后我们创建一个 buf.gen.yaml
文件用于生成存根。buf.gen.yaml
文件内容如下:
version: v1 plugins: - plugin: go out: gen opt: paths=source_relative - plugin: go-grpc out: gen opt: paths=source_relative,require_unimplemented_servers=false - plugin: grpc-gateway out: gen opt: paths=source_relative
这时候我们可以通过 buf generate
生成存根,也可以使用 protoc
来生成存根,这里就不展开说明。
这时候将会由如下的目录结构:
├── buf .gen .yaml ├── buf .lock ├── buf .yaml ├── gen │ └── proto │ ├── chat .pb .go │ ├── chat .pb .gw .go │ └── chat_grpc .pb .go ├── go .mod └── proto └── chat .proto
3.3 实现 GRPC 服务 上述生成 pb
和 grpc
相关代码后,实现一个 gRPC Server
服务,相关代码 如下:
package mainimport ( "github.com/blazehu/sse-demo/gen/proto" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/emptypb" "log" "net" "time" ) type Server struct { chat.UnimplementedChatServiceServer } func (s *Server) Chat (_ *emptypb.Empty, stream chat.ChatService_ChatServer) error { for { msg := chat.Message{ User: "blazehu" , Content: time.Now().Format(time.RFC3339), } if err := stream.Send(&msg); err != nil { return err } time.Sleep(time.Second * 1 ) } } func main () { lis, err := net.Listen("tcp" , ":50051" ) if err != nil { log.Fatalf("failed to listen: %v" , err) } var opts []grpc.ServerOption grpcServer := grpc.NewServer(opts...) chat.RegisterChatServiceServer(grpcServer, &Server{}) grpcServer.Serve(lis) }
代码逻辑非常简单,定义 Server
对象然后实现了相关的 Chat
方法,然后 main 函数注册并启动了一个 gRPC Server
服务。
NOTE: Chat
方法是每隔一秒钟就发送一条消息,该消息内容就是当前的时间。
3.4 实现 HTTP 服务 新增 main.go
文件,在 main.go
文件中添加和启动 gRPC-Gateway mux
。相关代码 如下:
package mainimport ( "context" "github.com/blazehu/sse-demo/gen/proto" "github.com/blazehu/sse-demo/util" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" googleproto "google.golang.org/protobuf/proto" "log" "net/http" ) const ( grpcEndpoint = "localhost:50051" httpPort = ":8080" ) func myFilter (ctx context.Context, w http.ResponseWriter, resp googleproto.Message) error { w.Header().Set("Connection" , "keep-alive" ) w.Header().Set("Cache-Control" , "no-cache" ) return nil } func main () { ctx := context.Background() ctx, cancel := context.WithCancel(ctx) defer cancel() var opts []grpc.DialOption opts = append (opts, grpc.WithTransportCredentials(insecure.NewCredentials())) grpcConn, err := grpc.DialContext(ctx, grpcEndpoint, opts...) if err != nil { log.Fatalf("Failed to dial server: %v" , err) } defer grpcConn.Close() gwmux := runtime.NewServeMux( runtime.WithMarshalerOption(runtime.MIMEWildcard, util.NewCustomTranscoder(&runtime.JSONPb{})), runtime.WithForwardResponseOption(myFilter), ) err = chat.RegisterChatServiceHandler(ctx, gwmux, grpcConn) if err != nil { log.Fatalf("Failed to register gateway: %v" , err) } mux := http.NewServeMux() handler := http.HandlerFunc(func (w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin" , "*" ) w.Header().Set("Access-Control-Allow-Methods" , "GET, POST, PUT, DELETE, OPTIONS" ) w.Header().Set("Access-Control-Allow-Headers" , "Content-Type, Authorization" ) if r.Method == "OPTIONS" { w.WriteHeader(http.StatusOK) return } gwmux.ServeHTTP(w, r) }) mux.Handle("/" , handler) log.Printf("Starting gRPC-Gateway on %s" , httpPort) if err := http.ListenAndServe(httpPort, mux); err != nil { log.Fatalf("Failed to serve: %v" , err) } }
因为前端 sse 访问需要跨域所以需要设置 CORS
策略,并且返回的消息类型为 text/event-stream
,所以我们这里自定义一个 CustomTranscoder
,它将 JSON
转换为 text/event-stream
格式。相关实现 如下:
package utilimport ( "bytes" "encoding/json" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "google.golang.org/protobuf/proto" "io" ) type CustomTranscoder struct { marshaler runtime.Marshaler } func NewCustomTranscoder (marshaler runtime.Marshaler) *CustomTranscoder { return &CustomTranscoder{marshaler: marshaler} } func (c *CustomTranscoder) ContentType (_ interface {}) string { return "text/event-stream" } func (c *CustomTranscoder) Marshal (v interface {}) ([]byte , error) { var jsonBytes []byte var err error if pb, ok := v.(proto.Message); ok { jsonBytes, err = c.marshaler.Marshal(pb) } else { jsonBytes, err = json.Marshal(v) } if err != nil { return nil , err } var buf bytes.Buffer buf.WriteString("data: " ) buf.Write(jsonBytes) buf.WriteString("\n\n" ) return buf.Bytes(), nil } func (c *CustomTranscoder) Unmarshal (data []byte , v interface {}) error { return c.marshaler.Unmarshal(data, v) } func (c *CustomTranscoder) NewDecoder (r io.Reader) runtime .Decoder { return c.marshaler.NewDecoder(r) } func (c *CustomTranscoder) NewEncoder (w io.Writer) runtime .Encoder { return c.marshaler.NewEncoder(w) }
NOTE: 这里需要注意输入值转化为 proto.Message
类型可能会失败,所以我们在 Marshal 方法中添加一个检查,以确保输入值是一个 proto.Message
类型。如果不是,我们可以尝试将其序列化为 JSON
。
3.5 测试 同时启动 gRPC Server
和 HTTP Server
,然后浏览器访问地址: http://127.0.0.1:8080/api/v1/stream/chat ,可以发现浏览器将会每秒刷新一条数据。
3.6 前端实现 前端使用 vue2
,这里就不详细说明了,页面通过 EventSource
与服务端建立通信,然后一直在接收服务端推送的消息,并将消息更新至页面上。
mounted() { this .loadData(); }, methods: { async loadData() { const vm = this ; const eventSource = new EventSource("http://127.0.0.1:8080/api/v1/stream/chat" ); eventSource.onopen = function ( ) { console .log('connect eventSource success.' ); }; eventSource.onmessage = (e ) => { const data = JSON .parse(e.data); vm.msg = `${data.result.content} - ${data.result.user} send` ; }; eventSource.onerror = function ( ) { console .log('connect eventSource failed.' ); }; this .$forceUpdate(); }, }
参考资料