服务器发送事件 (Server-Sent Events) 是一种服务器推送技术,使客户端能够通过 HTTP 连接从服务器接收自动更新,并描述了在建立初始客户端连接后服务器如何向客户端发起数据传输。
1. 背景 在使用Argo CD的时候可以发现页面可以实时刷新应用的状态,比如同步中,那Argo CD是如何实现消息的实时推送的呢?本文将简要介绍Argo CD使用到的消息推送技术:SSE。
2. 源码解析 通过浏览器开发者工具可以发现前端请求的地址是 stream/applications,响应头表示这是一个服务器发送事件(Server-Sent Events,SSE)。
本文的源码基于2.6.0版本
Content-Type: text/event-stream:这个响应头告诉客户端,服务器将发送的数据是以文本格式的事件流(Event Stream)。
2.1 客户端实现 Argo CD的前端使用的是 React,相关代码 application-details.tsx  实现如下:
private loadAppInfo(name: string, appNamespace : string): Observable<{application : appModels.Application; tree: appModels.ApplicationTree}> {     return  from (services.applications.get(name, appNamespace))         .pipe(             mergeMap(app  =>                 const  fallbackTree = {                     nodes: app.status.resources.map(res  =>parentRefs : [], info : [], resourceVersion : '' , uid : '' })),                     orphanedNodes: [],                     hosts: []                 } as  appModels.ApplicationTree;                 return  combineLatest(                     merge(                         from ([app]),                         this .appChanged.pipe(filter(item  =>                         AppUtils.handlePageVisibility(()  =>                             services.applications                                 .watch({name, appNamespace})                                 .pipe(                                     map(watchEvent  =>                                         if  (watchEvent.type === 'DELETED' ) {                                             this .onAppDeleted();                                         }                                         return  watchEvent.application;                                     })                                 )                                 .pipe(repeat())                                 .pipe(retryWhen(errors  =>500 ))))                         )                     ),                     merge(                         from ([fallbackTree]),                         services.applications.resourceTree(name, appNamespace).catch(()  =>                         AppUtils.handlePageVisibility(()  =>                             services.applications                                 .watchResourceTree(name, appNamespace)                                 .pipe(repeat())                                 .pipe(retryWhen(errors  =>500 ))))                         )                     )                 );             })         )         .pipe(filter(([application, tree] ) =>  !!application && !!tree))         .pipe(map(([application, tree] ) =>  ({application, tree}))); } 
loadAppInfo 方法的主要目的是从服务器加载应用信息,并在页面可见时实时更新。它返回一个 Observable,包含应用信息 services.applications.watch 和应用资源树 services.applications.watchResourceTree。我们继续查看应用信息的 watch 函数实现。
public watch(query?: {name?: string; resourceVersion?: string; projects?: string[]; appNamespace?: string}, options?: QueryOptions): Observable<models.ApplicationWatchEvent> {     const  search = new  URLSearchParams();     if  (query) {         if  (query.name) {             search.set('name' , query.name);         }         if  (query.resourceVersion) {             search.set('resourceVersion' , query.resourceVersion);         }         if  (query.appNamespace) {             search.set('appNamespace' , query.appNamespace);         }     }     if  (options) {         const  searchOptions = optionsToSearch(options);         search.set('fields' , searchOptions.fields);         search.set('selector' , searchOptions.selector);         search.set('appNamespace' , searchOptions.appNamespace);         query?.projects?.forEach(project  =>'projects' , project));     }     const  searchStr = search.toString();     const  url = `/stream/applications${(searchStr && '?'  + searchStr) || '' } ` ;     return  requests         .loadEventSource(url)         .pipe(repeat())         .pipe(retry())         .pipe(map(data  =>JSON .parse(data).result as  models.ApplicationWatchEvent))         .pipe(             map(watchEvent  =>                 watchEvent.application = this .parseAppFields(watchEvent.application);                 return  watchEvent;             })         ); } 
可以看到该函数使用 requests.loadEventSource(url) 加载服务器发送的事件流(Event Stream),请求地址为 /stream/applications。loadEventSource 的实现是基于 EventSource 对象的, EventSource 实例设置 onmessage 事件处理程序,当接收到新消息时,调用 observer.next(msg.data) 将消息数据推送到 Observable 实现页面的实时刷新。在文档 Using server-sent events  中有介绍, 这里就不详细介绍了。
export  default  {    loadEventSource(url: string): Observable<string> {         return  Observable.create((observer: Observer<any> ) =>  {             let  eventSource = new  EventSource(`${apiRoot()} ${url} ` );             eventSource.onmessage = msg  =>             eventSource.onerror = e  =>()  =>                 observer.error(e);                 onError.next(e);             };                                       const  interval = setInterval(()  =>                 if  (eventSource && eventSource.readyState === ReadyState.CLOSED) {                     observer.error('connection got closed unexpectedly' );                 }             }, 500 );             return  ()  =>                 clearInterval(interval);                 eventSource.close();                 eventSource = null ;             };         });     } }; 
2.2 服务端实现 Argo CD的后端使用的是 Go,相关代码 application.go  实现如下:
type  ApplicationService_WatchServer interface  {    Send(*v1alpha1.ApplicationWatchEvent) error     grpc.ServerStream } func  (s *Server)  Watch (q *application.ApplicationQuery, ws application.ApplicationService_WatchServer)  error 	appName := q.GetName() 	appNs := s.appNamespaceOrDefault(q.GetAppNamespace()) 	logCtx := log.NewEntry(log.New()) 	if  q.Name != nil  { 		logCtx = logCtx.WithField("application" , *q.Name) 	} 	projects := map [string ]bool {} 	for  i := range  q.Projects { 		projects[q.Projects[i]] = true  	} 	claims := ws.Context().Value("claims" ) 	selector, err := labels.Parse(q.GetSelector()) 	if  err != nil  { 		return  fmt.Errorf("error parsing labels with selectors: %w" , err) 	} 	minVersion := 0  	if  q.GetResourceVersion() != ""  { 		if  minVersion, err = strconv.Atoi(q.GetResourceVersion()); err != nil  { 			minVersion = 0  		} 	} 	 	 	sendIfPermitted := func (a appv1.Application, eventType watch.EventType)  		if  len (projects) > 0  && !projects[a.Spec.GetProject()] { 			return  		} 		if  appVersion, err := strconv.Atoi(a.ResourceVersion); err == nil  && appVersion < minVersion { 			return  		} 		matchedEvent := (appName == ""  || (a.Name == appName && a.Namespace == appNs)) && selector.Matches(labels.Set(a.Labels)) 		if  !matchedEvent { 			return  		} 		if  !s.enf.Enforce(claims, rbacpolicy.ResourceApplications, rbacpolicy.ActionGet, a.RBACName(s.ns)) { 			 			return  		} 		s.inferResourcesStatusHealth(&a) 		err := ws.Send(&appv1.ApplicationWatchEvent{ 			Type:        eventType, 			Application: a, 		}) 		if  err != nil  { 			logCtx.Warnf("Unable to send stream message: %v" , err) 			return  		} 	} 	events := make (chan  *appv1.ApplicationWatchEvent, watchAPIBufferSize) 	 	 	 	 	if  q.GetResourceVersion() == ""  || q.GetName() != ""  { 		apps, err := s.appLister.List(selector) 		if  err != nil  { 			return  fmt.Errorf("error listing apps with selector: %w" , err) 		} 		sort.Slice(apps, func (i, j int )  bool  			return  apps[i].QualifiedName() < apps[j].QualifiedName() 		}) 		for  i := range  apps { 			sendIfPermitted(*apps[i], watch.Added) 		} 	} 	unsubscribe := s.appBroadcaster.Subscribe(events) 	defer  unsubscribe() 	for  { 		select  { 		case  event := <-events: 			sendIfPermitted(event.Application, event.Type) 		case  <-ws.Context().Done(): 			return  nil  		} 	} } 
在这段代码中,ws 用于发送实时更新的应用程序事件。当有新的事件发生时,sendIfPermitted 函数会将事件发送到客户端。
NOTE: ws 是一个实现了 ApplicationService_WatchServer 接口的对象。ApplicationService_WatchServer 接口继承了 grpc.ServerStream 接口,并定义了一个名为 Send 的方法。这意味着 ws 可以用作服务器端的流,并具有发送 ApplicationWatchEvent 类型消息的能力。
3 实现一个简单的SSE接口服务样例 相关代码地址 ,实现如下效果,页面上通过 sse 获取数据(当前时间和推送人),后端服务由 grpc|grpc-gateway stream 推送。
NOTE: 后续补充更新中
3.1 使用 protobuf 定义 gRPC 服务 新建一个项目sse-demo ,在项目目录下执行go mod init命令完成go module初始化。 在项目目录下创建一个 proto/chat.proto  文件,其内容如下:
syntax = "proto3" ; option  go_package = "github.com/blazehu/sse-demo/server/apiclient/chat" ;package  chat;import  "google/api/annotations.proto" ;import  "google/protobuf/empty.proto" ;service  ChatService    rpc  Chat(google.protobuf.Empty) returns  (stream Message) {     option (google.api.http) = {       get: "/api/v1/stream/chat"     } ;  } } message  Message    string  user = 1 ;   string  content = 2 ; } 
3.2 生成代码 这用 buf  生成代码,首先在项目目录下执行 buf mod init 生成 buf.yaml 文件,我们需要修改该文件内容如下:
version:  v1 name:  buf.build/blazehu/sse-demo deps:   -  buf.build/googleapis/googleapis  breaking:   use:      -  FILE  lint:   use:      -  DEFAULT  
NOTE: 这里主要是添加依赖项 googleapis,相关详细文档 。
修改完成后执行 buf mod update 来选择要使用的依赖项的版本。 然后我们创建一个 buf.gen.yaml 文件用于生成存根。buf.gen.yaml文件内容如下:
version:  v1 plugins:   -  plugin:  go      out:  gen      opt:  paths=source_relative    -  plugin:  go-grpc      out:  gen      opt:  paths=source_relative,require_unimplemented_servers=false    -  plugin:  grpc-gateway      out:  gen      opt:  paths=source_relative  
这时候我们可以通过 buf generate 生成存根,也可以使用 protoc 来生成存根,这里就不展开说明。
这时候将会由如下的目录结构:
├── buf .gen .yaml  ├── buf .lock  ├── buf .yaml  ├── gen  │   └── proto  │       ├── chat .pb .go  │       ├── chat .pb .gw .go  │       └── chat_grpc .pb .go  ├── go .mod  └── proto      └── chat .proto  
3.3 实现 GRPC 服务 上述生成 pb 和 grpc 相关代码后,实现一个 gRPC Server 服务,相关代码 如下:
package  mainimport  (	"github.com/blazehu/sse-demo/gen/proto"  	"google.golang.org/grpc"  	"google.golang.org/protobuf/types/known/emptypb"  	"log"  	"net"  	"time"  ) type  Server struct  {	chat.UnimplementedChatServiceServer } func  (s *Server)  Chat (_ *emptypb.Empty, stream chat.ChatService_ChatServer)  error 	for  { 		msg := chat.Message{ 			User:    "blazehu" , 			Content: time.Now().Format(time.RFC3339), 		} 		if  err := stream.Send(&msg); err != nil  { 			return  err 		} 		time.Sleep(time.Second * 1 ) 	} } func  main () 	lis, err := net.Listen("tcp" , ":50051" ) 	if  err != nil  { 		log.Fatalf("failed to listen: %v" , err) 	} 	var  opts []grpc.ServerOption 	grpcServer := grpc.NewServer(opts...) 	chat.RegisterChatServiceServer(grpcServer, &Server{}) 	grpcServer.Serve(lis) } 
代码逻辑非常简单,定义 Server 对象然后实现了相关的 Chat 方法,然后 main 函数注册并启动了一个 gRPC Server 服务。
NOTE: Chat 方法是每隔一秒钟就发送一条消息,该消息内容就是当前的时间。
3.4 实现 HTTP 服务 新增 main.go 文件,在 main.go 文件中添加和启动 gRPC-Gateway mux。相关代码 如下:
package  mainimport  (	"context"  	"github.com/blazehu/sse-demo/gen/proto"  	"github.com/blazehu/sse-demo/util"  	"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"  	"google.golang.org/grpc"  	"google.golang.org/grpc/credentials/insecure"  	googleproto "google.golang.org/protobuf/proto"  	"log"  	"net/http"  ) const  (	grpcEndpoint = "localhost:50051"  	httpPort     = ":8080"  ) func  myFilter (ctx context.Context, w http.ResponseWriter, resp googleproto.Message)  error 	w.Header().Set("Connection" , "keep-alive" ) 	w.Header().Set("Cache-Control" , "no-cache" ) 	return  nil  } func  main () 	ctx := context.Background() 	ctx, cancel := context.WithCancel(ctx) 	defer  cancel() 	 	var  opts []grpc.DialOption 	opts = append (opts, grpc.WithTransportCredentials(insecure.NewCredentials())) 	grpcConn, err := grpc.DialContext(ctx, grpcEndpoint, opts...) 	if  err != nil  { 		log.Fatalf("Failed to dial server: %v" , err) 	} 	defer  grpcConn.Close() 	 	gwmux := runtime.NewServeMux( 		runtime.WithMarshalerOption(runtime.MIMEWildcard, util.NewCustomTranscoder(&runtime.JSONPb{})), 		runtime.WithForwardResponseOption(myFilter), 	) 	err = chat.RegisterChatServiceHandler(ctx, gwmux, grpcConn) 	if  err != nil  { 		log.Fatalf("Failed to register gateway: %v" , err) 	} 	 	mux := http.NewServeMux() 	handler := http.HandlerFunc(func (w http.ResponseWriter, r *http.Request)  		w.Header().Set("Access-Control-Allow-Origin" , "*" ) 		w.Header().Set("Access-Control-Allow-Methods" , "GET, POST, PUT, DELETE, OPTIONS" ) 		w.Header().Set("Access-Control-Allow-Headers" , "Content-Type, Authorization" ) 		if  r.Method == "OPTIONS"  { 			w.WriteHeader(http.StatusOK) 			return  		} 		gwmux.ServeHTTP(w, r) 	}) 	mux.Handle("/" , handler) 	 	log.Printf("Starting gRPC-Gateway on %s" , httpPort) 	if  err := http.ListenAndServe(httpPort, mux); err != nil  { 		log.Fatalf("Failed to serve: %v" , err) 	} } 
因为前端 sse 访问需要跨域所以需要设置 CORS 策略,并且返回的消息类型为 text/event-stream,所以我们这里自定义一个 CustomTranscoder,它将 JSON 转换为 text/event-stream 格式。相关实现 如下:
package  utilimport  (	"bytes"  	"encoding/json"  	"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"  	"google.golang.org/protobuf/proto"  	"io"  ) type  CustomTranscoder struct  {	marshaler runtime.Marshaler } func  NewCustomTranscoder (marshaler runtime.Marshaler)  *CustomTranscoder 	return  &CustomTranscoder{marshaler: marshaler} } func  (c *CustomTranscoder)  ContentType (_ interface {})  string 	return  "text/event-stream"  } func  (c *CustomTranscoder)  Marshal (v interface {})  ([]byte , error) 	var  jsonBytes []byte  	var  err error 	if  pb, ok := v.(proto.Message); ok { 		 		jsonBytes, err = c.marshaler.Marshal(pb) 	} else  { 		 		jsonBytes, err = json.Marshal(v) 	} 	if  err != nil  { 		return  nil , err 	} 	var  buf bytes.Buffer 	buf.WriteString("data: " ) 	buf.Write(jsonBytes) 	buf.WriteString("\n\n" ) 	return  buf.Bytes(), nil  } func  (c *CustomTranscoder)  Unmarshal (data []byte , v interface {})  error 	return  c.marshaler.Unmarshal(data, v) } func  (c *CustomTranscoder)  NewDecoder (r io.Reader)  runtime .Decoder 	return  c.marshaler.NewDecoder(r) } func  (c *CustomTranscoder)  NewEncoder (w io.Writer)  runtime .Encoder 	return  c.marshaler.NewEncoder(w) } 
NOTE: 这里需要注意输入值转化为 proto.Message 类型可能会失败,所以我们在 Marshal 方法中添加一个检查,以确保输入值是一个 proto.Message 类型。如果不是,我们可以尝试将其序列化为 JSON。
3.5 测试 同时启动 gRPC Server 和 HTTP Server,然后浏览器访问地址: http://127.0.0.1:8080/api/v1/stream/chat  ,可以发现浏览器将会每秒刷新一条数据。
3.6 前端实现 前端使用 vue2,这里就不详细说明了,页面通过 EventSource 与服务端建立通信,然后一直在接收服务端推送的消息,并将消息更新至页面上。
mounted() {     this .loadData(); }, methods: {     async  loadData() {         const  vm = this ;         const  eventSource = new  EventSource("http://127.0.0.1:8080/api/v1/stream/chat" );         eventSource.onopen = function  (             console .log('connect eventSource success.' );         };         eventSource.onmessage =  (e ) =>  {             const  data = JSON .parse(e.data);             vm.msg = `${data.result.content}  - ${data.result.user}  send` ;         };         eventSource.onerror = function  (             console .log('connect eventSource failed.' );         };         this .$forceUpdate();     }, } 
4. 参考资料