server.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. /*
  2. *
  3. * Copyright 2020 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package xds
  19. import (
  20. "context"
  21. "errors"
  22. "fmt"
  23. "net"
  24. "strings"
  25. "sync"
  26. "google.golang.org/grpc"
  27. "google.golang.org/grpc/credentials"
  28. "google.golang.org/grpc/grpclog"
  29. "google.golang.org/grpc/internal"
  30. internalgrpclog "google.golang.org/grpc/internal/grpclog"
  31. "google.golang.org/grpc/internal/grpcsync"
  32. xdsclient "google.golang.org/grpc/xds/internal/client"
  33. "google.golang.org/grpc/xds/internal/client/bootstrap"
  34. "google.golang.org/grpc/xds/internal/server"
  35. )
  36. const serverPrefix = "[xds-server %p] "
  37. var (
  38. // These new functions will be overridden in unit tests.
  39. newXDSClient = func() (xdsClientInterface, error) {
  40. return xdsclient.New()
  41. }
  42. newGRPCServer = func(opts ...grpc.ServerOption) grpcServerInterface {
  43. return grpc.NewServer(opts...)
  44. }
  45. // Unexported function to retrieve transport credentials from a gRPC server.
  46. grpcGetServerCreds = internal.GetServerCredentials.(func(*grpc.Server) credentials.TransportCredentials)
  47. logger = grpclog.Component("xds")
  48. )
  49. func prefixLogger(p *GRPCServer) *internalgrpclog.PrefixLogger {
  50. return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(serverPrefix, p))
  51. }
  52. // xdsClientInterface contains methods from xdsClient.Client which are used by
  53. // the server. This is useful for overriding in unit tests.
  54. type xdsClientInterface interface {
  55. WatchListener(string, func(xdsclient.ListenerUpdate, error)) func()
  56. BootstrapConfig() *bootstrap.Config
  57. Close()
  58. }
  59. // grpcServerInterface contains methods from grpc.Server which are used by the
  60. // GRPCServer type here. This is useful for overriding in unit tests.
  61. type grpcServerInterface interface {
  62. RegisterService(*grpc.ServiceDesc, interface{})
  63. Serve(net.Listener) error
  64. Stop()
  65. GracefulStop()
  66. }
  67. // GRPCServer wraps a gRPC server and provides server-side xDS functionality, by
  68. // communication with a management server using xDS APIs. It implements the
  69. // grpc.ServiceRegistrar interface and can be passed to service registration
  70. // functions in IDL generated code.
  71. //
  72. // Experimental
  73. //
  74. // Notice: This type is EXPERIMENTAL and may be changed or removed in a
  75. // later release.
  76. type GRPCServer struct {
  77. gs grpcServerInterface
  78. quit *grpcsync.Event
  79. logger *internalgrpclog.PrefixLogger
  80. xdsCredsInUse bool
  81. // clientMu is used only in initXDSClient(), which is called at the
  82. // beginning of Serve(), where we have to decide if we have to create a
  83. // client or use an existing one.
  84. clientMu sync.Mutex
  85. xdsC xdsClientInterface
  86. }
  87. // NewGRPCServer creates an xDS-enabled gRPC server using the passed in opts.
  88. // The underlying gRPC server has no service registered and has not started to
  89. // accept requests yet.
  90. //
  91. // Experimental
  92. //
  93. // Notice: This API is EXPERIMENTAL and may be changed or removed in a later
  94. // release.
  95. func NewGRPCServer(opts ...grpc.ServerOption) *GRPCServer {
  96. newOpts := []grpc.ServerOption{
  97. grpc.ChainUnaryInterceptor(xdsUnaryInterceptor),
  98. grpc.ChainStreamInterceptor(xdsStreamInterceptor),
  99. }
  100. newOpts = append(newOpts, opts...)
  101. s := &GRPCServer{
  102. gs: newGRPCServer(newOpts...),
  103. quit: grpcsync.NewEvent(),
  104. }
  105. s.logger = prefixLogger(s)
  106. s.logger.Infof("Created xds.GRPCServer")
  107. // We type assert our underlying gRPC server to the real grpc.Server here
  108. // before trying to retrieve the configured credentials. This approach
  109. // avoids performing the same type assertion in the grpc package which
  110. // provides the implementation for internal.GetServerCredentials, and allows
  111. // us to use a fake gRPC server in tests.
  112. if gs, ok := s.gs.(*grpc.Server); ok {
  113. creds := grpcGetServerCreds(gs)
  114. if xc, ok := creds.(interface{ UsesXDS() bool }); ok && xc.UsesXDS() {
  115. s.xdsCredsInUse = true
  116. }
  117. }
  118. s.logger.Infof("xDS credentials in use: %v", s.xdsCredsInUse)
  119. return s
  120. }
  121. // RegisterService registers a service and its implementation to the underlying
  122. // gRPC server. It is called from the IDL generated code. This must be called
  123. // before invoking Serve.
  124. func (s *GRPCServer) RegisterService(sd *grpc.ServiceDesc, ss interface{}) {
  125. s.gs.RegisterService(sd, ss)
  126. }
  127. // initXDSClient creates a new xdsClient if there is no existing one available.
  128. func (s *GRPCServer) initXDSClient() error {
  129. s.clientMu.Lock()
  130. defer s.clientMu.Unlock()
  131. if s.xdsC != nil {
  132. return nil
  133. }
  134. client, err := newXDSClient()
  135. if err != nil {
  136. return fmt.Errorf("xds: failed to create xds-client: %v", err)
  137. }
  138. s.xdsC = client
  139. s.logger.Infof("Created an xdsClient")
  140. return nil
  141. }
  142. // Serve gets the underlying gRPC server to accept incoming connections on the
  143. // listener lis, which is expected to be listening on a TCP port.
  144. //
  145. // A connection to the management server, to receive xDS configuration, is
  146. // initiated here.
  147. //
  148. // Serve will return a non-nil error unless Stop or GracefulStop is called.
  149. // TODO: Support callback to get notified on serving state changes.
  150. func (s *GRPCServer) Serve(lis net.Listener) error {
  151. s.logger.Infof("Serve() passed a net.Listener on %s", lis.Addr().String())
  152. if _, ok := lis.Addr().(*net.TCPAddr); !ok {
  153. return fmt.Errorf("xds: GRPCServer expects listener to return a net.TCPAddr. Got %T", lis.Addr())
  154. }
  155. // If this is the first time Serve() is being called, we need to initialize
  156. // our xdsClient. If not, we can use the existing one.
  157. if err := s.initXDSClient(); err != nil {
  158. return err
  159. }
  160. cfg := s.xdsC.BootstrapConfig()
  161. if cfg == nil {
  162. return errors.New("bootstrap configuration is empty")
  163. }
  164. // If xds credentials were specified by the user, but bootstrap configs do
  165. // not contain any certificate provider configuration, it is better to fail
  166. // right now rather than failing when attempting to create certificate
  167. // providers after receiving an LDS response with security configuration.
  168. if s.xdsCredsInUse {
  169. if len(cfg.CertProviderConfigs) == 0 {
  170. return errors.New("xds: certificate_providers config missing in bootstrap file")
  171. }
  172. }
  173. // The server listener resource name template from the bootstrap
  174. // configuration contains a template for the name of the Listener resource
  175. // to subscribe to for a gRPC server. If the token `%s` is present in the
  176. // string, it will be replaced with the server's listening "IP:port" (e.g.,
  177. // "0.0.0.0:8080", "[::]:8080"). The absence of a template will be treated
  178. // as an error since we do not have any default value for this.
  179. if cfg.ServerListenerResourceNameTemplate == "" {
  180. return errors.New("missing server_listener_resource_name_template in the bootstrap configuration")
  181. }
  182. name := cfg.ServerListenerResourceNameTemplate
  183. if strings.Contains(cfg.ServerListenerResourceNameTemplate, "%s") {
  184. name = strings.Replace(cfg.ServerListenerResourceNameTemplate, "%s", lis.Addr().String(), -1)
  185. }
  186. // Create a listenerWrapper which handles all functionality required by
  187. // this particular instance of Serve().
  188. lw, goodUpdateCh := server.NewListenerWrapper(server.ListenerWrapperParams{
  189. Listener: lis,
  190. ListenerResourceName: name,
  191. XDSCredsInUse: s.xdsCredsInUse,
  192. XDSClient: s.xdsC,
  193. })
  194. // Block until a good LDS response is received or the server is stopped.
  195. select {
  196. case <-s.quit.Done():
  197. // Since the listener has not yet been handed over to gs.Serve(), we
  198. // need to explicitly close the listener. Cancellation of the xDS watch
  199. // is handled by the listenerWrapper.
  200. lw.Close()
  201. return nil
  202. case <-goodUpdateCh:
  203. }
  204. return s.gs.Serve(lw)
  205. }
  206. // Stop stops the underlying gRPC server. It immediately closes all open
  207. // connections. It cancels all active RPCs on the server side and the
  208. // corresponding pending RPCs on the client side will get notified by connection
  209. // errors.
  210. func (s *GRPCServer) Stop() {
  211. s.quit.Fire()
  212. s.gs.Stop()
  213. if s.xdsC != nil {
  214. s.xdsC.Close()
  215. }
  216. }
  217. // GracefulStop stops the underlying gRPC server gracefully. It stops the server
  218. // from accepting new connections and RPCs and blocks until all the pending RPCs
  219. // are finished.
  220. func (s *GRPCServer) GracefulStop() {
  221. s.quit.Fire()
  222. s.gs.GracefulStop()
  223. if s.xdsC != nil {
  224. s.xdsC.Close()
  225. }
  226. }
  227. // xdsUnaryInterceptor is the unary interceptor added to the gRPC server to
  228. // perform any xDS specific functionality on unary RPCs.
  229. //
  230. // This is a no-op at this point.
  231. func xdsUnaryInterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
  232. return handler(ctx, req)
  233. }
  234. // xdsStreamInterceptor is the stream interceptor added to the gRPC server to
  235. // perform any xDS specific functionality on streaming RPCs.
  236. //
  237. // This is a no-op at this point.
  238. func xdsStreamInterceptor(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
  239. return handler(srv, ss)
  240. }