server.go 54 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "errors"
  22. "fmt"
  23. "io"
  24. "math"
  25. "net"
  26. "net/http"
  27. "reflect"
  28. "runtime"
  29. "strings"
  30. "sync"
  31. "sync/atomic"
  32. "time"
  33. "golang.org/x/net/trace"
  34. "google.golang.org/grpc/codes"
  35. "google.golang.org/grpc/credentials"
  36. "google.golang.org/grpc/encoding"
  37. "google.golang.org/grpc/encoding/proto"
  38. "google.golang.org/grpc/grpclog"
  39. "google.golang.org/grpc/internal"
  40. "google.golang.org/grpc/internal/binarylog"
  41. "google.golang.org/grpc/internal/channelz"
  42. "google.golang.org/grpc/internal/grpcrand"
  43. "google.golang.org/grpc/internal/grpcsync"
  44. "google.golang.org/grpc/internal/transport"
  45. "google.golang.org/grpc/keepalive"
  46. "google.golang.org/grpc/metadata"
  47. "google.golang.org/grpc/peer"
  48. "google.golang.org/grpc/stats"
  49. "google.golang.org/grpc/status"
  50. "google.golang.org/grpc/tap"
  51. )
  52. const (
  53. defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
  54. defaultServerMaxSendMessageSize = math.MaxInt32
  55. )
  56. func init() {
  57. internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
  58. return srv.opts.creds
  59. }
  60. }
  61. var statusOK = status.New(codes.OK, "")
  62. var logger = grpclog.Component("core")
  63. type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
  64. // MethodDesc represents an RPC service's method specification.
  65. type MethodDesc struct {
  66. MethodName string
  67. Handler methodHandler
  68. }
  69. // ServiceDesc represents an RPC service's specification.
  70. type ServiceDesc struct {
  71. ServiceName string
  72. // The pointer to the service interface. Used to check whether the user
  73. // provided implementation satisfies the interface requirements.
  74. HandlerType interface{}
  75. Methods []MethodDesc
  76. Streams []StreamDesc
  77. Metadata interface{}
  78. }
  79. // serviceInfo wraps information about a service. It is very similar to
  80. // ServiceDesc and is constructed from it for internal purposes.
  81. type serviceInfo struct {
  82. // Contains the implementation for the methods in this service.
  83. serviceImpl interface{}
  84. methods map[string]*MethodDesc
  85. streams map[string]*StreamDesc
  86. mdata interface{}
  87. }
  88. type serverWorkerData struct {
  89. st transport.ServerTransport
  90. wg *sync.WaitGroup
  91. stream *transport.Stream
  92. }
  93. // Server is a gRPC server to serve RPC requests.
  94. type Server struct {
  95. opts serverOptions
  96. mu sync.Mutex // guards following
  97. lis map[net.Listener]bool
  98. conns map[transport.ServerTransport]bool
  99. serve bool
  100. drain bool
  101. cv *sync.Cond // signaled when connections close for GracefulStop
  102. services map[string]*serviceInfo // service name -> service info
  103. events trace.EventLog
  104. quit *grpcsync.Event
  105. done *grpcsync.Event
  106. channelzRemoveOnce sync.Once
  107. serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
  108. channelzID int64 // channelz unique identification number
  109. czData *channelzData
  110. serverWorkerChannels []chan *serverWorkerData
  111. }
  112. type serverOptions struct {
  113. creds credentials.TransportCredentials
  114. codec baseCodec
  115. cp Compressor
  116. dc Decompressor
  117. unaryInt UnaryServerInterceptor
  118. streamInt StreamServerInterceptor
  119. chainUnaryInts []UnaryServerInterceptor
  120. chainStreamInts []StreamServerInterceptor
  121. inTapHandle tap.ServerInHandle
  122. statsHandler stats.Handler
  123. maxConcurrentStreams uint32
  124. maxReceiveMessageSize int
  125. maxSendMessageSize int
  126. unknownStreamDesc *StreamDesc
  127. keepaliveParams keepalive.ServerParameters
  128. keepalivePolicy keepalive.EnforcementPolicy
  129. initialWindowSize int32
  130. initialConnWindowSize int32
  131. writeBufferSize int
  132. readBufferSize int
  133. connectionTimeout time.Duration
  134. maxHeaderListSize *uint32
  135. headerTableSize *uint32
  136. numServerWorkers uint32
  137. }
  138. var defaultServerOptions = serverOptions{
  139. maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
  140. maxSendMessageSize: defaultServerMaxSendMessageSize,
  141. connectionTimeout: 120 * time.Second,
  142. writeBufferSize: defaultWriteBufSize,
  143. readBufferSize: defaultReadBufSize,
  144. }
  145. // A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
  146. type ServerOption interface {
  147. apply(*serverOptions)
  148. }
  149. // EmptyServerOption does not alter the server configuration. It can be embedded
  150. // in another structure to build custom server options.
  151. //
  152. // Experimental
  153. //
  154. // Notice: This type is EXPERIMENTAL and may be changed or removed in a
  155. // later release.
  156. type EmptyServerOption struct{}
  157. func (EmptyServerOption) apply(*serverOptions) {}
  158. // funcServerOption wraps a function that modifies serverOptions into an
  159. // implementation of the ServerOption interface.
  160. type funcServerOption struct {
  161. f func(*serverOptions)
  162. }
  163. func (fdo *funcServerOption) apply(do *serverOptions) {
  164. fdo.f(do)
  165. }
  166. func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
  167. return &funcServerOption{
  168. f: f,
  169. }
  170. }
  171. // WriteBufferSize determines how much data can be batched before doing a write on the wire.
  172. // The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
  173. // The default value for this buffer is 32KB.
  174. // Zero will disable the write buffer such that each write will be on underlying connection.
  175. // Note: A Send call may not directly translate to a write.
  176. func WriteBufferSize(s int) ServerOption {
  177. return newFuncServerOption(func(o *serverOptions) {
  178. o.writeBufferSize = s
  179. })
  180. }
  181. // ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
  182. // for one read syscall.
  183. // The default value for this buffer is 32KB.
  184. // Zero will disable read buffer for a connection so data framer can access the underlying
  185. // conn directly.
  186. func ReadBufferSize(s int) ServerOption {
  187. return newFuncServerOption(func(o *serverOptions) {
  188. o.readBufferSize = s
  189. })
  190. }
  191. // InitialWindowSize returns a ServerOption that sets window size for stream.
  192. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  193. func InitialWindowSize(s int32) ServerOption {
  194. return newFuncServerOption(func(o *serverOptions) {
  195. o.initialWindowSize = s
  196. })
  197. }
  198. // InitialConnWindowSize returns a ServerOption that sets window size for a connection.
  199. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  200. func InitialConnWindowSize(s int32) ServerOption {
  201. return newFuncServerOption(func(o *serverOptions) {
  202. o.initialConnWindowSize = s
  203. })
  204. }
  205. // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
  206. func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
  207. if kp.Time > 0 && kp.Time < time.Second {
  208. logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
  209. kp.Time = time.Second
  210. }
  211. return newFuncServerOption(func(o *serverOptions) {
  212. o.keepaliveParams = kp
  213. })
  214. }
  215. // KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
  216. func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
  217. return newFuncServerOption(func(o *serverOptions) {
  218. o.keepalivePolicy = kep
  219. })
  220. }
  221. // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
  222. //
  223. // This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
  224. //
  225. // Deprecated: register codecs using encoding.RegisterCodec. The server will
  226. // automatically use registered codecs based on the incoming requests' headers.
  227. // See also
  228. // https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
  229. // Will be supported throughout 1.x.
  230. func CustomCodec(codec Codec) ServerOption {
  231. return newFuncServerOption(func(o *serverOptions) {
  232. o.codec = codec
  233. })
  234. }
  235. // RPCCompressor returns a ServerOption that sets a compressor for outbound
  236. // messages. For backward compatibility, all outbound messages will be sent
  237. // using this compressor, regardless of incoming message compression. By
  238. // default, server messages will be sent using the same compressor with which
  239. // request messages were sent.
  240. //
  241. // Deprecated: use encoding.RegisterCompressor instead. Will be supported
  242. // throughout 1.x.
  243. func RPCCompressor(cp Compressor) ServerOption {
  244. return newFuncServerOption(func(o *serverOptions) {
  245. o.cp = cp
  246. })
  247. }
  248. // RPCDecompressor returns a ServerOption that sets a decompressor for inbound
  249. // messages. It has higher priority than decompressors registered via
  250. // encoding.RegisterCompressor.
  251. //
  252. // Deprecated: use encoding.RegisterCompressor instead. Will be supported
  253. // throughout 1.x.
  254. func RPCDecompressor(dc Decompressor) ServerOption {
  255. return newFuncServerOption(func(o *serverOptions) {
  256. o.dc = dc
  257. })
  258. }
  259. // MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
  260. // If this is not set, gRPC uses the default limit.
  261. //
  262. // Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x.
  263. func MaxMsgSize(m int) ServerOption {
  264. return MaxRecvMsgSize(m)
  265. }
  266. // MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
  267. // If this is not set, gRPC uses the default 4MB.
  268. func MaxRecvMsgSize(m int) ServerOption {
  269. return newFuncServerOption(func(o *serverOptions) {
  270. o.maxReceiveMessageSize = m
  271. })
  272. }
  273. // MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
  274. // If this is not set, gRPC uses the default `math.MaxInt32`.
  275. func MaxSendMsgSize(m int) ServerOption {
  276. return newFuncServerOption(func(o *serverOptions) {
  277. o.maxSendMessageSize = m
  278. })
  279. }
  280. // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
  281. // of concurrent streams to each ServerTransport.
  282. func MaxConcurrentStreams(n uint32) ServerOption {
  283. return newFuncServerOption(func(o *serverOptions) {
  284. o.maxConcurrentStreams = n
  285. })
  286. }
  287. // Creds returns a ServerOption that sets credentials for server connections.
  288. func Creds(c credentials.TransportCredentials) ServerOption {
  289. return newFuncServerOption(func(o *serverOptions) {
  290. o.creds = c
  291. })
  292. }
  293. // UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
  294. // server. Only one unary interceptor can be installed. The construction of multiple
  295. // interceptors (e.g., chaining) can be implemented at the caller.
  296. func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
  297. return newFuncServerOption(func(o *serverOptions) {
  298. if o.unaryInt != nil {
  299. panic("The unary server interceptor was already set and may not be reset.")
  300. }
  301. o.unaryInt = i
  302. })
  303. }
  304. // ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
  305. // for unary RPCs. The first interceptor will be the outer most,
  306. // while the last interceptor will be the inner most wrapper around the real call.
  307. // All unary interceptors added by this method will be chained.
  308. func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
  309. return newFuncServerOption(func(o *serverOptions) {
  310. o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
  311. })
  312. }
  313. // StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
  314. // server. Only one stream interceptor can be installed.
  315. func StreamInterceptor(i StreamServerInterceptor) ServerOption {
  316. return newFuncServerOption(func(o *serverOptions) {
  317. if o.streamInt != nil {
  318. panic("The stream server interceptor was already set and may not be reset.")
  319. }
  320. o.streamInt = i
  321. })
  322. }
  323. // ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
  324. // for streaming RPCs. The first interceptor will be the outer most,
  325. // while the last interceptor will be the inner most wrapper around the real call.
  326. // All stream interceptors added by this method will be chained.
  327. func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
  328. return newFuncServerOption(func(o *serverOptions) {
  329. o.chainStreamInts = append(o.chainStreamInts, interceptors...)
  330. })
  331. }
  332. // InTapHandle returns a ServerOption that sets the tap handle for all the server
  333. // transport to be created. Only one can be installed.
  334. func InTapHandle(h tap.ServerInHandle) ServerOption {
  335. return newFuncServerOption(func(o *serverOptions) {
  336. if o.inTapHandle != nil {
  337. panic("The tap handle was already set and may not be reset.")
  338. }
  339. o.inTapHandle = h
  340. })
  341. }
  342. // StatsHandler returns a ServerOption that sets the stats handler for the server.
  343. func StatsHandler(h stats.Handler) ServerOption {
  344. return newFuncServerOption(func(o *serverOptions) {
  345. o.statsHandler = h
  346. })
  347. }
  348. // UnknownServiceHandler returns a ServerOption that allows for adding a custom
  349. // unknown service handler. The provided method is a bidi-streaming RPC service
  350. // handler that will be invoked instead of returning the "unimplemented" gRPC
  351. // error whenever a request is received for an unregistered service or method.
  352. // The handling function and stream interceptor (if set) have full access to
  353. // the ServerStream, including its Context.
  354. func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
  355. return newFuncServerOption(func(o *serverOptions) {
  356. o.unknownStreamDesc = &StreamDesc{
  357. StreamName: "unknown_service_handler",
  358. Handler: streamHandler,
  359. // We need to assume that the users of the streamHandler will want to use both.
  360. ClientStreams: true,
  361. ServerStreams: true,
  362. }
  363. })
  364. }
  365. // ConnectionTimeout returns a ServerOption that sets the timeout for
  366. // connection establishment (up to and including HTTP/2 handshaking) for all
  367. // new connections. If this is not set, the default is 120 seconds. A zero or
  368. // negative value will result in an immediate timeout.
  369. //
  370. // Experimental
  371. //
  372. // Notice: This API is EXPERIMENTAL and may be changed or removed in a
  373. // later release.
  374. func ConnectionTimeout(d time.Duration) ServerOption {
  375. return newFuncServerOption(func(o *serverOptions) {
  376. o.connectionTimeout = d
  377. })
  378. }
  379. // MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
  380. // of header list that the server is prepared to accept.
  381. func MaxHeaderListSize(s uint32) ServerOption {
  382. return newFuncServerOption(func(o *serverOptions) {
  383. o.maxHeaderListSize = &s
  384. })
  385. }
  386. // HeaderTableSize returns a ServerOption that sets the size of dynamic
  387. // header table for stream.
  388. //
  389. // Experimental
  390. //
  391. // Notice: This API is EXPERIMENTAL and may be changed or removed in a
  392. // later release.
  393. func HeaderTableSize(s uint32) ServerOption {
  394. return newFuncServerOption(func(o *serverOptions) {
  395. o.headerTableSize = &s
  396. })
  397. }
  398. // NumStreamWorkers returns a ServerOption that sets the number of worker
  399. // goroutines that should be used to process incoming streams. Setting this to
  400. // zero (default) will disable workers and spawn a new goroutine for each
  401. // stream.
  402. //
  403. // Experimental
  404. //
  405. // Notice: This API is EXPERIMENTAL and may be changed or removed in a
  406. // later release.
  407. func NumStreamWorkers(numServerWorkers uint32) ServerOption {
  408. // TODO: If/when this API gets stabilized (i.e. stream workers become the
  409. // only way streams are processed), change the behavior of the zero value to
  410. // a sane default. Preliminary experiments suggest that a value equal to the
  411. // number of CPUs available is most performant; requires thorough testing.
  412. return newFuncServerOption(func(o *serverOptions) {
  413. o.numServerWorkers = numServerWorkers
  414. })
  415. }
  416. // serverWorkerResetThreshold defines how often the stack must be reset. Every
  417. // N requests, by spawning a new goroutine in its place, a worker can reset its
  418. // stack so that large stacks don't live in memory forever. 2^16 should allow
  419. // each goroutine stack to live for at least a few seconds in a typical
  420. // workload (assuming a QPS of a few thousand requests/sec).
  421. const serverWorkerResetThreshold = 1 << 16
  422. // serverWorkers blocks on a *transport.Stream channel forever and waits for
  423. // data to be fed by serveStreams. This allows different requests to be
  424. // processed by the same goroutine, removing the need for expensive stack
  425. // re-allocations (see the runtime.morestack problem [1]).
  426. //
  427. // [1] https://github.com/golang/go/issues/18138
  428. func (s *Server) serverWorker(ch chan *serverWorkerData) {
  429. // To make sure all server workers don't reset at the same time, choose a
  430. // random number of iterations before resetting.
  431. threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
  432. for completed := 0; completed < threshold; completed++ {
  433. data, ok := <-ch
  434. if !ok {
  435. return
  436. }
  437. s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
  438. data.wg.Done()
  439. }
  440. go s.serverWorker(ch)
  441. }
  442. // initServerWorkers creates worker goroutines and channels to process incoming
  443. // connections to reduce the time spent overall on runtime.morestack.
  444. func (s *Server) initServerWorkers() {
  445. s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers)
  446. for i := uint32(0); i < s.opts.numServerWorkers; i++ {
  447. s.serverWorkerChannels[i] = make(chan *serverWorkerData)
  448. go s.serverWorker(s.serverWorkerChannels[i])
  449. }
  450. }
  451. func (s *Server) stopServerWorkers() {
  452. for i := uint32(0); i < s.opts.numServerWorkers; i++ {
  453. close(s.serverWorkerChannels[i])
  454. }
  455. }
  456. // NewServer creates a gRPC server which has no service registered and has not
  457. // started to accept requests yet.
  458. func NewServer(opt ...ServerOption) *Server {
  459. opts := defaultServerOptions
  460. for _, o := range opt {
  461. o.apply(&opts)
  462. }
  463. s := &Server{
  464. lis: make(map[net.Listener]bool),
  465. opts: opts,
  466. conns: make(map[transport.ServerTransport]bool),
  467. services: make(map[string]*serviceInfo),
  468. quit: grpcsync.NewEvent(),
  469. done: grpcsync.NewEvent(),
  470. czData: new(channelzData),
  471. }
  472. chainUnaryServerInterceptors(s)
  473. chainStreamServerInterceptors(s)
  474. s.cv = sync.NewCond(&s.mu)
  475. if EnableTracing {
  476. _, file, line, _ := runtime.Caller(1)
  477. s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
  478. }
  479. if s.opts.numServerWorkers > 0 {
  480. s.initServerWorkers()
  481. }
  482. if channelz.IsOn() {
  483. s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
  484. }
  485. return s
  486. }
  487. // printf records an event in s's event log, unless s has been stopped.
  488. // REQUIRES s.mu is held.
  489. func (s *Server) printf(format string, a ...interface{}) {
  490. if s.events != nil {
  491. s.events.Printf(format, a...)
  492. }
  493. }
  494. // errorf records an error in s's event log, unless s has been stopped.
  495. // REQUIRES s.mu is held.
  496. func (s *Server) errorf(format string, a ...interface{}) {
  497. if s.events != nil {
  498. s.events.Errorf(format, a...)
  499. }
  500. }
  501. // ServiceRegistrar wraps a single method that supports service registration. It
  502. // enables users to pass concrete types other than grpc.Server to the service
  503. // registration methods exported by the IDL generated code.
  504. type ServiceRegistrar interface {
  505. // RegisterService registers a service and its implementation to the
  506. // concrete type implementing this interface. It may not be called
  507. // once the server has started serving.
  508. // desc describes the service and its methods and handlers. impl is the
  509. // service implementation which is passed to the method handlers.
  510. RegisterService(desc *ServiceDesc, impl interface{})
  511. }
  512. // RegisterService registers a service and its implementation to the gRPC
  513. // server. It is called from the IDL generated code. This must be called before
  514. // invoking Serve. If ss is non-nil (for legacy code), its type is checked to
  515. // ensure it implements sd.HandlerType.
  516. func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
  517. if ss != nil {
  518. ht := reflect.TypeOf(sd.HandlerType).Elem()
  519. st := reflect.TypeOf(ss)
  520. if !st.Implements(ht) {
  521. logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
  522. }
  523. }
  524. s.register(sd, ss)
  525. }
  526. func (s *Server) register(sd *ServiceDesc, ss interface{}) {
  527. s.mu.Lock()
  528. defer s.mu.Unlock()
  529. s.printf("RegisterService(%q)", sd.ServiceName)
  530. if s.serve {
  531. logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
  532. }
  533. if _, ok := s.services[sd.ServiceName]; ok {
  534. logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
  535. }
  536. info := &serviceInfo{
  537. serviceImpl: ss,
  538. methods: make(map[string]*MethodDesc),
  539. streams: make(map[string]*StreamDesc),
  540. mdata: sd.Metadata,
  541. }
  542. for i := range sd.Methods {
  543. d := &sd.Methods[i]
  544. info.methods[d.MethodName] = d
  545. }
  546. for i := range sd.Streams {
  547. d := &sd.Streams[i]
  548. info.streams[d.StreamName] = d
  549. }
  550. s.services[sd.ServiceName] = info
  551. }
  552. // MethodInfo contains the information of an RPC including its method name and type.
  553. type MethodInfo struct {
  554. // Name is the method name only, without the service name or package name.
  555. Name string
  556. // IsClientStream indicates whether the RPC is a client streaming RPC.
  557. IsClientStream bool
  558. // IsServerStream indicates whether the RPC is a server streaming RPC.
  559. IsServerStream bool
  560. }
  561. // ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
  562. type ServiceInfo struct {
  563. Methods []MethodInfo
  564. // Metadata is the metadata specified in ServiceDesc when registering service.
  565. Metadata interface{}
  566. }
  567. // GetServiceInfo returns a map from service names to ServiceInfo.
  568. // Service names include the package names, in the form of <package>.<service>.
  569. func (s *Server) GetServiceInfo() map[string]ServiceInfo {
  570. ret := make(map[string]ServiceInfo)
  571. for n, srv := range s.services {
  572. methods := make([]MethodInfo, 0, len(srv.methods)+len(srv.streams))
  573. for m := range srv.methods {
  574. methods = append(methods, MethodInfo{
  575. Name: m,
  576. IsClientStream: false,
  577. IsServerStream: false,
  578. })
  579. }
  580. for m, d := range srv.streams {
  581. methods = append(methods, MethodInfo{
  582. Name: m,
  583. IsClientStream: d.ClientStreams,
  584. IsServerStream: d.ServerStreams,
  585. })
  586. }
  587. ret[n] = ServiceInfo{
  588. Methods: methods,
  589. Metadata: srv.mdata,
  590. }
  591. }
  592. return ret
  593. }
  594. // ErrServerStopped indicates that the operation is now illegal because of
  595. // the server being stopped.
  596. var ErrServerStopped = errors.New("grpc: the server has been stopped")
  597. func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
  598. if s.opts.creds == nil {
  599. return rawConn, nil, nil
  600. }
  601. return s.opts.creds.ServerHandshake(rawConn)
  602. }
  603. type listenSocket struct {
  604. net.Listener
  605. channelzID int64
  606. }
  607. func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
  608. return &channelz.SocketInternalMetric{
  609. SocketOptions: channelz.GetSocketOption(l.Listener),
  610. LocalAddr: l.Listener.Addr(),
  611. }
  612. }
  613. func (l *listenSocket) Close() error {
  614. err := l.Listener.Close()
  615. if channelz.IsOn() {
  616. channelz.RemoveEntry(l.channelzID)
  617. }
  618. return err
  619. }
  620. // Serve accepts incoming connections on the listener lis, creating a new
  621. // ServerTransport and service goroutine for each. The service goroutines
  622. // read gRPC requests and then call the registered handlers to reply to them.
  623. // Serve returns when lis.Accept fails with fatal errors. lis will be closed when
  624. // this method returns.
  625. // Serve will return a non-nil error unless Stop or GracefulStop is called.
  626. func (s *Server) Serve(lis net.Listener) error {
  627. s.mu.Lock()
  628. s.printf("serving")
  629. s.serve = true
  630. if s.lis == nil {
  631. // Serve called after Stop or GracefulStop.
  632. s.mu.Unlock()
  633. lis.Close()
  634. return ErrServerStopped
  635. }
  636. s.serveWG.Add(1)
  637. defer func() {
  638. s.serveWG.Done()
  639. if s.quit.HasFired() {
  640. // Stop or GracefulStop called; block until done and return nil.
  641. <-s.done.Done()
  642. }
  643. }()
  644. ls := &listenSocket{Listener: lis}
  645. s.lis[ls] = true
  646. if channelz.IsOn() {
  647. ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
  648. }
  649. s.mu.Unlock()
  650. defer func() {
  651. s.mu.Lock()
  652. if s.lis != nil && s.lis[ls] {
  653. ls.Close()
  654. delete(s.lis, ls)
  655. }
  656. s.mu.Unlock()
  657. }()
  658. var tempDelay time.Duration // how long to sleep on accept failure
  659. for {
  660. rawConn, err := lis.Accept()
  661. if err != nil {
  662. if ne, ok := err.(interface {
  663. Temporary() bool
  664. }); ok && ne.Temporary() {
  665. if tempDelay == 0 {
  666. tempDelay = 5 * time.Millisecond
  667. } else {
  668. tempDelay *= 2
  669. }
  670. if max := 1 * time.Second; tempDelay > max {
  671. tempDelay = max
  672. }
  673. s.mu.Lock()
  674. s.printf("Accept error: %v; retrying in %v", err, tempDelay)
  675. s.mu.Unlock()
  676. timer := time.NewTimer(tempDelay)
  677. select {
  678. case <-timer.C:
  679. case <-s.quit.Done():
  680. timer.Stop()
  681. return nil
  682. }
  683. continue
  684. }
  685. s.mu.Lock()
  686. s.printf("done serving; Accept = %v", err)
  687. s.mu.Unlock()
  688. if s.quit.HasFired() {
  689. return nil
  690. }
  691. return err
  692. }
  693. tempDelay = 0
  694. // Start a new goroutine to deal with rawConn so we don't stall this Accept
  695. // loop goroutine.
  696. //
  697. // Make sure we account for the goroutine so GracefulStop doesn't nil out
  698. // s.conns before this conn can be added.
  699. s.serveWG.Add(1)
  700. go func() {
  701. s.handleRawConn(rawConn)
  702. s.serveWG.Done()
  703. }()
  704. }
  705. }
  706. // handleRawConn forks a goroutine to handle a just-accepted connection that
  707. // has not had any I/O performed on it yet.
  708. func (s *Server) handleRawConn(rawConn net.Conn) {
  709. if s.quit.HasFired() {
  710. rawConn.Close()
  711. return
  712. }
  713. rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
  714. conn, authInfo, err := s.useTransportAuthenticator(rawConn)
  715. if err != nil {
  716. // ErrConnDispatched means that the connection was dispatched away from
  717. // gRPC; those connections should be left open.
  718. if err != credentials.ErrConnDispatched {
  719. s.mu.Lock()
  720. s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
  721. s.mu.Unlock()
  722. channelz.Warningf(logger, s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
  723. rawConn.Close()
  724. }
  725. rawConn.SetDeadline(time.Time{})
  726. return
  727. }
  728. // Finish handshaking (HTTP2)
  729. st := s.newHTTP2Transport(conn, authInfo)
  730. if st == nil {
  731. return
  732. }
  733. rawConn.SetDeadline(time.Time{})
  734. if !s.addConn(st) {
  735. return
  736. }
  737. go func() {
  738. s.serveStreams(st)
  739. s.removeConn(st)
  740. }()
  741. }
  742. // newHTTP2Transport sets up a http/2 transport (using the
  743. // gRPC http2 server transport in transport/http2_server.go).
  744. func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
  745. config := &transport.ServerConfig{
  746. MaxStreams: s.opts.maxConcurrentStreams,
  747. AuthInfo: authInfo,
  748. InTapHandle: s.opts.inTapHandle,
  749. StatsHandler: s.opts.statsHandler,
  750. KeepaliveParams: s.opts.keepaliveParams,
  751. KeepalivePolicy: s.opts.keepalivePolicy,
  752. InitialWindowSize: s.opts.initialWindowSize,
  753. InitialConnWindowSize: s.opts.initialConnWindowSize,
  754. WriteBufferSize: s.opts.writeBufferSize,
  755. ReadBufferSize: s.opts.readBufferSize,
  756. ChannelzParentID: s.channelzID,
  757. MaxHeaderListSize: s.opts.maxHeaderListSize,
  758. HeaderTableSize: s.opts.headerTableSize,
  759. }
  760. st, err := transport.NewServerTransport("http2", c, config)
  761. if err != nil {
  762. s.mu.Lock()
  763. s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
  764. s.mu.Unlock()
  765. c.Close()
  766. channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
  767. return nil
  768. }
  769. return st
  770. }
  771. func (s *Server) serveStreams(st transport.ServerTransport) {
  772. defer st.Close()
  773. var wg sync.WaitGroup
  774. var roundRobinCounter uint32
  775. st.HandleStreams(func(stream *transport.Stream) {
  776. wg.Add(1)
  777. if s.opts.numServerWorkers > 0 {
  778. data := &serverWorkerData{st: st, wg: &wg, stream: stream}
  779. select {
  780. case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
  781. default:
  782. // If all stream workers are busy, fallback to the default code path.
  783. go func() {
  784. s.handleStream(st, stream, s.traceInfo(st, stream))
  785. wg.Done()
  786. }()
  787. }
  788. } else {
  789. go func() {
  790. defer wg.Done()
  791. s.handleStream(st, stream, s.traceInfo(st, stream))
  792. }()
  793. }
  794. }, func(ctx context.Context, method string) context.Context {
  795. if !EnableTracing {
  796. return ctx
  797. }
  798. tr := trace.New("grpc.Recv."+methodFamily(method), method)
  799. return trace.NewContext(ctx, tr)
  800. })
  801. wg.Wait()
  802. }
  803. var _ http.Handler = (*Server)(nil)
  804. // ServeHTTP implements the Go standard library's http.Handler
  805. // interface by responding to the gRPC request r, by looking up
  806. // the requested gRPC method in the gRPC server s.
  807. //
  808. // The provided HTTP request must have arrived on an HTTP/2
  809. // connection. When using the Go standard library's server,
  810. // practically this means that the Request must also have arrived
  811. // over TLS.
  812. //
  813. // To share one port (such as 443 for https) between gRPC and an
  814. // existing http.Handler, use a root http.Handler such as:
  815. //
  816. // if r.ProtoMajor == 2 && strings.HasPrefix(
  817. // r.Header.Get("Content-Type"), "application/grpc") {
  818. // grpcServer.ServeHTTP(w, r)
  819. // } else {
  820. // yourMux.ServeHTTP(w, r)
  821. // }
  822. //
  823. // Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
  824. // separate from grpc-go's HTTP/2 server. Performance and features may vary
  825. // between the two paths. ServeHTTP does not support some gRPC features
  826. // available through grpc-go's HTTP/2 server.
  827. //
  828. // Experimental
  829. //
  830. // Notice: This API is EXPERIMENTAL and may be changed or removed in a
  831. // later release.
  832. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  833. st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
  834. if err != nil {
  835. http.Error(w, err.Error(), http.StatusInternalServerError)
  836. return
  837. }
  838. if !s.addConn(st) {
  839. return
  840. }
  841. defer s.removeConn(st)
  842. s.serveStreams(st)
  843. }
  844. // traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
  845. // If tracing is not enabled, it returns nil.
  846. func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
  847. if !EnableTracing {
  848. return nil
  849. }
  850. tr, ok := trace.FromContext(stream.Context())
  851. if !ok {
  852. return nil
  853. }
  854. trInfo = &traceInfo{
  855. tr: tr,
  856. firstLine: firstLine{
  857. client: false,
  858. remoteAddr: st.RemoteAddr(),
  859. },
  860. }
  861. if dl, ok := stream.Context().Deadline(); ok {
  862. trInfo.firstLine.deadline = time.Until(dl)
  863. }
  864. return trInfo
  865. }
  866. func (s *Server) addConn(st transport.ServerTransport) bool {
  867. s.mu.Lock()
  868. defer s.mu.Unlock()
  869. if s.conns == nil {
  870. st.Close()
  871. return false
  872. }
  873. if s.drain {
  874. // Transport added after we drained our existing conns: drain it
  875. // immediately.
  876. st.Drain()
  877. }
  878. s.conns[st] = true
  879. return true
  880. }
  881. func (s *Server) removeConn(st transport.ServerTransport) {
  882. s.mu.Lock()
  883. defer s.mu.Unlock()
  884. if s.conns != nil {
  885. delete(s.conns, st)
  886. s.cv.Broadcast()
  887. }
  888. }
  889. func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
  890. return &channelz.ServerInternalMetric{
  891. CallsStarted: atomic.LoadInt64(&s.czData.callsStarted),
  892. CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded),
  893. CallsFailed: atomic.LoadInt64(&s.czData.callsFailed),
  894. LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
  895. }
  896. }
  897. func (s *Server) incrCallsStarted() {
  898. atomic.AddInt64(&s.czData.callsStarted, 1)
  899. atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
  900. }
  901. func (s *Server) incrCallsSucceeded() {
  902. atomic.AddInt64(&s.czData.callsSucceeded, 1)
  903. }
  904. func (s *Server) incrCallsFailed() {
  905. atomic.AddInt64(&s.czData.callsFailed, 1)
  906. }
  907. func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
  908. data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
  909. if err != nil {
  910. channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err)
  911. return err
  912. }
  913. compData, err := compress(data, cp, comp)
  914. if err != nil {
  915. channelz.Error(logger, s.channelzID, "grpc: server failed to compress response: ", err)
  916. return err
  917. }
  918. hdr, payload := msgHeader(data, compData)
  919. // TODO(dfawley): should we be checking len(data) instead?
  920. if len(payload) > s.opts.maxSendMessageSize {
  921. return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
  922. }
  923. err = t.Write(stream, hdr, payload, opts)
  924. if err == nil && s.opts.statsHandler != nil {
  925. s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
  926. }
  927. return err
  928. }
  929. // chainUnaryServerInterceptors chains all unary server interceptors into one.
  930. func chainUnaryServerInterceptors(s *Server) {
  931. // Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
  932. // be executed before any other chained interceptors.
  933. interceptors := s.opts.chainUnaryInts
  934. if s.opts.unaryInt != nil {
  935. interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
  936. }
  937. var chainedInt UnaryServerInterceptor
  938. if len(interceptors) == 0 {
  939. chainedInt = nil
  940. } else if len(interceptors) == 1 {
  941. chainedInt = interceptors[0]
  942. } else {
  943. chainedInt = func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
  944. return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
  945. }
  946. }
  947. s.opts.unaryInt = chainedInt
  948. }
  949. // getChainUnaryHandler recursively generate the chained UnaryHandler
  950. func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
  951. if curr == len(interceptors)-1 {
  952. return finalHandler
  953. }
  954. return func(ctx context.Context, req interface{}) (interface{}, error) {
  955. return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
  956. }
  957. }
  958. func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
  959. sh := s.opts.statsHandler
  960. if sh != nil || trInfo != nil || channelz.IsOn() {
  961. if channelz.IsOn() {
  962. s.incrCallsStarted()
  963. }
  964. var statsBegin *stats.Begin
  965. if sh != nil {
  966. beginTime := time.Now()
  967. statsBegin = &stats.Begin{
  968. BeginTime: beginTime,
  969. }
  970. sh.HandleRPC(stream.Context(), statsBegin)
  971. }
  972. if trInfo != nil {
  973. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  974. }
  975. // The deferred error handling for tracing, stats handler and channelz are
  976. // combined into one function to reduce stack usage -- a defer takes ~56-64
  977. // bytes on the stack, so overflowing the stack will require a stack
  978. // re-allocation, which is expensive.
  979. //
  980. // To maintain behavior similar to separate deferred statements, statements
  981. // should be executed in the reverse order. That is, tracing first, stats
  982. // handler second, and channelz last. Note that panics *within* defers will
  983. // lead to different behavior, but that's an acceptable compromise; that
  984. // would be undefined behavior territory anyway.
  985. defer func() {
  986. if trInfo != nil {
  987. if err != nil && err != io.EOF {
  988. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  989. trInfo.tr.SetError()
  990. }
  991. trInfo.tr.Finish()
  992. }
  993. if sh != nil {
  994. end := &stats.End{
  995. BeginTime: statsBegin.BeginTime,
  996. EndTime: time.Now(),
  997. }
  998. if err != nil && err != io.EOF {
  999. end.Error = toRPCErr(err)
  1000. }
  1001. sh.HandleRPC(stream.Context(), end)
  1002. }
  1003. if channelz.IsOn() {
  1004. if err != nil && err != io.EOF {
  1005. s.incrCallsFailed()
  1006. } else {
  1007. s.incrCallsSucceeded()
  1008. }
  1009. }
  1010. }()
  1011. }
  1012. binlog := binarylog.GetMethodLogger(stream.Method())
  1013. if binlog != nil {
  1014. ctx := stream.Context()
  1015. md, _ := metadata.FromIncomingContext(ctx)
  1016. logEntry := &binarylog.ClientHeader{
  1017. Header: md,
  1018. MethodName: stream.Method(),
  1019. PeerAddr: nil,
  1020. }
  1021. if deadline, ok := ctx.Deadline(); ok {
  1022. logEntry.Timeout = time.Until(deadline)
  1023. if logEntry.Timeout < 0 {
  1024. logEntry.Timeout = 0
  1025. }
  1026. }
  1027. if a := md[":authority"]; len(a) > 0 {
  1028. logEntry.Authority = a[0]
  1029. }
  1030. if peer, ok := peer.FromContext(ctx); ok {
  1031. logEntry.PeerAddr = peer.Addr
  1032. }
  1033. binlog.Log(logEntry)
  1034. }
  1035. // comp and cp are used for compression. decomp and dc are used for
  1036. // decompression. If comp and decomp are both set, they are the same;
  1037. // however they are kept separate to ensure that at most one of the
  1038. // compressor/decompressor variable pairs are set for use later.
  1039. var comp, decomp encoding.Compressor
  1040. var cp Compressor
  1041. var dc Decompressor
  1042. // If dc is set and matches the stream's compression, use it. Otherwise, try
  1043. // to find a matching registered compressor for decomp.
  1044. if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
  1045. dc = s.opts.dc
  1046. } else if rc != "" && rc != encoding.Identity {
  1047. decomp = encoding.GetCompressor(rc)
  1048. if decomp == nil {
  1049. st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
  1050. t.WriteStatus(stream, st)
  1051. return st.Err()
  1052. }
  1053. }
  1054. // If cp is set, use it. Otherwise, attempt to compress the response using
  1055. // the incoming message compression method.
  1056. //
  1057. // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
  1058. if s.opts.cp != nil {
  1059. cp = s.opts.cp
  1060. stream.SetSendCompress(cp.Type())
  1061. } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
  1062. // Legacy compressor not specified; attempt to respond with same encoding.
  1063. comp = encoding.GetCompressor(rc)
  1064. if comp != nil {
  1065. stream.SetSendCompress(rc)
  1066. }
  1067. }
  1068. var payInfo *payloadInfo
  1069. if sh != nil || binlog != nil {
  1070. payInfo = &payloadInfo{}
  1071. }
  1072. d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
  1073. if err != nil {
  1074. if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
  1075. channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
  1076. }
  1077. return err
  1078. }
  1079. if channelz.IsOn() {
  1080. t.IncrMsgRecv()
  1081. }
  1082. df := func(v interface{}) error {
  1083. if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
  1084. return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
  1085. }
  1086. if sh != nil {
  1087. sh.HandleRPC(stream.Context(), &stats.InPayload{
  1088. RecvTime: time.Now(),
  1089. Payload: v,
  1090. WireLength: payInfo.wireLength + headerLen,
  1091. Data: d,
  1092. Length: len(d),
  1093. })
  1094. }
  1095. if binlog != nil {
  1096. binlog.Log(&binarylog.ClientMessage{
  1097. Message: d,
  1098. })
  1099. }
  1100. if trInfo != nil {
  1101. trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
  1102. }
  1103. return nil
  1104. }
  1105. ctx := NewContextWithServerTransportStream(stream.Context(), stream)
  1106. reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
  1107. if appErr != nil {
  1108. appStatus, ok := status.FromError(appErr)
  1109. if !ok {
  1110. // Convert appErr if it is not a grpc status error.
  1111. appErr = status.Error(codes.Unknown, appErr.Error())
  1112. appStatus, _ = status.FromError(appErr)
  1113. }
  1114. if trInfo != nil {
  1115. trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
  1116. trInfo.tr.SetError()
  1117. }
  1118. if e := t.WriteStatus(stream, appStatus); e != nil {
  1119. channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
  1120. }
  1121. if binlog != nil {
  1122. if h, _ := stream.Header(); h.Len() > 0 {
  1123. // Only log serverHeader if there was header. Otherwise it can
  1124. // be trailer only.
  1125. binlog.Log(&binarylog.ServerHeader{
  1126. Header: h,
  1127. })
  1128. }
  1129. binlog.Log(&binarylog.ServerTrailer{
  1130. Trailer: stream.Trailer(),
  1131. Err: appErr,
  1132. })
  1133. }
  1134. return appErr
  1135. }
  1136. if trInfo != nil {
  1137. trInfo.tr.LazyLog(stringer("OK"), false)
  1138. }
  1139. opts := &transport.Options{Last: true}
  1140. if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
  1141. if err == io.EOF {
  1142. // The entire stream is done (for unary RPC only).
  1143. return err
  1144. }
  1145. if sts, ok := status.FromError(err); ok {
  1146. if e := t.WriteStatus(stream, sts); e != nil {
  1147. channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
  1148. }
  1149. } else {
  1150. switch st := err.(type) {
  1151. case transport.ConnectionError:
  1152. // Nothing to do here.
  1153. default:
  1154. panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
  1155. }
  1156. }
  1157. if binlog != nil {
  1158. h, _ := stream.Header()
  1159. binlog.Log(&binarylog.ServerHeader{
  1160. Header: h,
  1161. })
  1162. binlog.Log(&binarylog.ServerTrailer{
  1163. Trailer: stream.Trailer(),
  1164. Err: appErr,
  1165. })
  1166. }
  1167. return err
  1168. }
  1169. if binlog != nil {
  1170. h, _ := stream.Header()
  1171. binlog.Log(&binarylog.ServerHeader{
  1172. Header: h,
  1173. })
  1174. binlog.Log(&binarylog.ServerMessage{
  1175. Message: reply,
  1176. })
  1177. }
  1178. if channelz.IsOn() {
  1179. t.IncrMsgSent()
  1180. }
  1181. if trInfo != nil {
  1182. trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
  1183. }
  1184. // TODO: Should we be logging if writing status failed here, like above?
  1185. // Should the logging be in WriteStatus? Should we ignore the WriteStatus
  1186. // error or allow the stats handler to see it?
  1187. err = t.WriteStatus(stream, statusOK)
  1188. if binlog != nil {
  1189. binlog.Log(&binarylog.ServerTrailer{
  1190. Trailer: stream.Trailer(),
  1191. Err: appErr,
  1192. })
  1193. }
  1194. return err
  1195. }
  1196. // chainStreamServerInterceptors chains all stream server interceptors into one.
  1197. func chainStreamServerInterceptors(s *Server) {
  1198. // Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
  1199. // be executed before any other chained interceptors.
  1200. interceptors := s.opts.chainStreamInts
  1201. if s.opts.streamInt != nil {
  1202. interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...)
  1203. }
  1204. var chainedInt StreamServerInterceptor
  1205. if len(interceptors) == 0 {
  1206. chainedInt = nil
  1207. } else if len(interceptors) == 1 {
  1208. chainedInt = interceptors[0]
  1209. } else {
  1210. chainedInt = func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
  1211. return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
  1212. }
  1213. }
  1214. s.opts.streamInt = chainedInt
  1215. }
  1216. // getChainStreamHandler recursively generate the chained StreamHandler
  1217. func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler {
  1218. if curr == len(interceptors)-1 {
  1219. return finalHandler
  1220. }
  1221. return func(srv interface{}, ss ServerStream) error {
  1222. return interceptors[curr+1](srv, ss, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
  1223. }
  1224. }
  1225. func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
  1226. if channelz.IsOn() {
  1227. s.incrCallsStarted()
  1228. }
  1229. sh := s.opts.statsHandler
  1230. var statsBegin *stats.Begin
  1231. if sh != nil {
  1232. beginTime := time.Now()
  1233. statsBegin = &stats.Begin{
  1234. BeginTime: beginTime,
  1235. }
  1236. sh.HandleRPC(stream.Context(), statsBegin)
  1237. }
  1238. ctx := NewContextWithServerTransportStream(stream.Context(), stream)
  1239. ss := &serverStream{
  1240. ctx: ctx,
  1241. t: t,
  1242. s: stream,
  1243. p: &parser{r: stream},
  1244. codec: s.getCodec(stream.ContentSubtype()),
  1245. maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
  1246. maxSendMessageSize: s.opts.maxSendMessageSize,
  1247. trInfo: trInfo,
  1248. statsHandler: sh,
  1249. }
  1250. if sh != nil || trInfo != nil || channelz.IsOn() {
  1251. // See comment in processUnaryRPC on defers.
  1252. defer func() {
  1253. if trInfo != nil {
  1254. ss.mu.Lock()
  1255. if err != nil && err != io.EOF {
  1256. ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1257. ss.trInfo.tr.SetError()
  1258. }
  1259. ss.trInfo.tr.Finish()
  1260. ss.trInfo.tr = nil
  1261. ss.mu.Unlock()
  1262. }
  1263. if sh != nil {
  1264. end := &stats.End{
  1265. BeginTime: statsBegin.BeginTime,
  1266. EndTime: time.Now(),
  1267. }
  1268. if err != nil && err != io.EOF {
  1269. end.Error = toRPCErr(err)
  1270. }
  1271. sh.HandleRPC(stream.Context(), end)
  1272. }
  1273. if channelz.IsOn() {
  1274. if err != nil && err != io.EOF {
  1275. s.incrCallsFailed()
  1276. } else {
  1277. s.incrCallsSucceeded()
  1278. }
  1279. }
  1280. }()
  1281. }
  1282. ss.binlog = binarylog.GetMethodLogger(stream.Method())
  1283. if ss.binlog != nil {
  1284. md, _ := metadata.FromIncomingContext(ctx)
  1285. logEntry := &binarylog.ClientHeader{
  1286. Header: md,
  1287. MethodName: stream.Method(),
  1288. PeerAddr: nil,
  1289. }
  1290. if deadline, ok := ctx.Deadline(); ok {
  1291. logEntry.Timeout = time.Until(deadline)
  1292. if logEntry.Timeout < 0 {
  1293. logEntry.Timeout = 0
  1294. }
  1295. }
  1296. if a := md[":authority"]; len(a) > 0 {
  1297. logEntry.Authority = a[0]
  1298. }
  1299. if peer, ok := peer.FromContext(ss.Context()); ok {
  1300. logEntry.PeerAddr = peer.Addr
  1301. }
  1302. ss.binlog.Log(logEntry)
  1303. }
  1304. // If dc is set and matches the stream's compression, use it. Otherwise, try
  1305. // to find a matching registered compressor for decomp.
  1306. if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
  1307. ss.dc = s.opts.dc
  1308. } else if rc != "" && rc != encoding.Identity {
  1309. ss.decomp = encoding.GetCompressor(rc)
  1310. if ss.decomp == nil {
  1311. st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
  1312. t.WriteStatus(ss.s, st)
  1313. return st.Err()
  1314. }
  1315. }
  1316. // If cp is set, use it. Otherwise, attempt to compress the response using
  1317. // the incoming message compression method.
  1318. //
  1319. // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
  1320. if s.opts.cp != nil {
  1321. ss.cp = s.opts.cp
  1322. stream.SetSendCompress(s.opts.cp.Type())
  1323. } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
  1324. // Legacy compressor not specified; attempt to respond with same encoding.
  1325. ss.comp = encoding.GetCompressor(rc)
  1326. if ss.comp != nil {
  1327. stream.SetSendCompress(rc)
  1328. }
  1329. }
  1330. if trInfo != nil {
  1331. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  1332. }
  1333. var appErr error
  1334. var server interface{}
  1335. if info != nil {
  1336. server = info.serviceImpl
  1337. }
  1338. if s.opts.streamInt == nil {
  1339. appErr = sd.Handler(server, ss)
  1340. } else {
  1341. info := &StreamServerInfo{
  1342. FullMethod: stream.Method(),
  1343. IsClientStream: sd.ClientStreams,
  1344. IsServerStream: sd.ServerStreams,
  1345. }
  1346. appErr = s.opts.streamInt(server, ss, info, sd.Handler)
  1347. }
  1348. if appErr != nil {
  1349. appStatus, ok := status.FromError(appErr)
  1350. if !ok {
  1351. appStatus = status.New(codes.Unknown, appErr.Error())
  1352. appErr = appStatus.Err()
  1353. }
  1354. if trInfo != nil {
  1355. ss.mu.Lock()
  1356. ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
  1357. ss.trInfo.tr.SetError()
  1358. ss.mu.Unlock()
  1359. }
  1360. t.WriteStatus(ss.s, appStatus)
  1361. if ss.binlog != nil {
  1362. ss.binlog.Log(&binarylog.ServerTrailer{
  1363. Trailer: ss.s.Trailer(),
  1364. Err: appErr,
  1365. })
  1366. }
  1367. // TODO: Should we log an error from WriteStatus here and below?
  1368. return appErr
  1369. }
  1370. if trInfo != nil {
  1371. ss.mu.Lock()
  1372. ss.trInfo.tr.LazyLog(stringer("OK"), false)
  1373. ss.mu.Unlock()
  1374. }
  1375. err = t.WriteStatus(ss.s, statusOK)
  1376. if ss.binlog != nil {
  1377. ss.binlog.Log(&binarylog.ServerTrailer{
  1378. Trailer: ss.s.Trailer(),
  1379. Err: appErr,
  1380. })
  1381. }
  1382. return err
  1383. }
  1384. func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
  1385. sm := stream.Method()
  1386. if sm != "" && sm[0] == '/' {
  1387. sm = sm[1:]
  1388. }
  1389. pos := strings.LastIndex(sm, "/")
  1390. if pos == -1 {
  1391. if trInfo != nil {
  1392. trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
  1393. trInfo.tr.SetError()
  1394. }
  1395. errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
  1396. if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
  1397. if trInfo != nil {
  1398. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1399. trInfo.tr.SetError()
  1400. }
  1401. channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
  1402. }
  1403. if trInfo != nil {
  1404. trInfo.tr.Finish()
  1405. }
  1406. return
  1407. }
  1408. service := sm[:pos]
  1409. method := sm[pos+1:]
  1410. srv, knownService := s.services[service]
  1411. if knownService {
  1412. if md, ok := srv.methods[method]; ok {
  1413. s.processUnaryRPC(t, stream, srv, md, trInfo)
  1414. return
  1415. }
  1416. if sd, ok := srv.streams[method]; ok {
  1417. s.processStreamingRPC(t, stream, srv, sd, trInfo)
  1418. return
  1419. }
  1420. }
  1421. // Unknown service, or known server unknown method.
  1422. if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
  1423. s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
  1424. return
  1425. }
  1426. var errDesc string
  1427. if !knownService {
  1428. errDesc = fmt.Sprintf("unknown service %v", service)
  1429. } else {
  1430. errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
  1431. }
  1432. if trInfo != nil {
  1433. trInfo.tr.LazyPrintf("%s", errDesc)
  1434. trInfo.tr.SetError()
  1435. }
  1436. if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
  1437. if trInfo != nil {
  1438. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1439. trInfo.tr.SetError()
  1440. }
  1441. channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
  1442. }
  1443. if trInfo != nil {
  1444. trInfo.tr.Finish()
  1445. }
  1446. }
  1447. // The key to save ServerTransportStream in the context.
  1448. type streamKey struct{}
  1449. // NewContextWithServerTransportStream creates a new context from ctx and
  1450. // attaches stream to it.
  1451. //
  1452. // Experimental
  1453. //
  1454. // Notice: This API is EXPERIMENTAL and may be changed or removed in a
  1455. // later release.
  1456. func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
  1457. return context.WithValue(ctx, streamKey{}, stream)
  1458. }
  1459. // ServerTransportStream is a minimal interface that a transport stream must
  1460. // implement. This can be used to mock an actual transport stream for tests of
  1461. // handler code that use, for example, grpc.SetHeader (which requires some
  1462. // stream to be in context).
  1463. //
  1464. // See also NewContextWithServerTransportStream.
  1465. //
  1466. // Experimental
  1467. //
  1468. // Notice: This type is EXPERIMENTAL and may be changed or removed in a
  1469. // later release.
  1470. type ServerTransportStream interface {
  1471. Method() string
  1472. SetHeader(md metadata.MD) error
  1473. SendHeader(md metadata.MD) error
  1474. SetTrailer(md metadata.MD) error
  1475. }
  1476. // ServerTransportStreamFromContext returns the ServerTransportStream saved in
  1477. // ctx. Returns nil if the given context has no stream associated with it
  1478. // (which implies it is not an RPC invocation context).
  1479. //
  1480. // Experimental
  1481. //
  1482. // Notice: This API is EXPERIMENTAL and may be changed or removed in a
  1483. // later release.
  1484. func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
  1485. s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
  1486. return s
  1487. }
  1488. // Stop stops the gRPC server. It immediately closes all open
  1489. // connections and listeners.
  1490. // It cancels all active RPCs on the server side and the corresponding
  1491. // pending RPCs on the client side will get notified by connection
  1492. // errors.
  1493. func (s *Server) Stop() {
  1494. s.quit.Fire()
  1495. defer func() {
  1496. s.serveWG.Wait()
  1497. s.done.Fire()
  1498. }()
  1499. s.channelzRemoveOnce.Do(func() {
  1500. if channelz.IsOn() {
  1501. channelz.RemoveEntry(s.channelzID)
  1502. }
  1503. })
  1504. s.mu.Lock()
  1505. listeners := s.lis
  1506. s.lis = nil
  1507. st := s.conns
  1508. s.conns = nil
  1509. // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
  1510. s.cv.Broadcast()
  1511. s.mu.Unlock()
  1512. for lis := range listeners {
  1513. lis.Close()
  1514. }
  1515. for c := range st {
  1516. c.Close()
  1517. }
  1518. if s.opts.numServerWorkers > 0 {
  1519. s.stopServerWorkers()
  1520. }
  1521. s.mu.Lock()
  1522. if s.events != nil {
  1523. s.events.Finish()
  1524. s.events = nil
  1525. }
  1526. s.mu.Unlock()
  1527. }
  1528. // GracefulStop stops the gRPC server gracefully. It stops the server from
  1529. // accepting new connections and RPCs and blocks until all the pending RPCs are
  1530. // finished.
  1531. func (s *Server) GracefulStop() {
  1532. s.quit.Fire()
  1533. defer s.done.Fire()
  1534. s.channelzRemoveOnce.Do(func() {
  1535. if channelz.IsOn() {
  1536. channelz.RemoveEntry(s.channelzID)
  1537. }
  1538. })
  1539. s.mu.Lock()
  1540. if s.conns == nil {
  1541. s.mu.Unlock()
  1542. return
  1543. }
  1544. for lis := range s.lis {
  1545. lis.Close()
  1546. }
  1547. s.lis = nil
  1548. if !s.drain {
  1549. for st := range s.conns {
  1550. st.Drain()
  1551. }
  1552. s.drain = true
  1553. }
  1554. // Wait for serving threads to be ready to exit. Only then can we be sure no
  1555. // new conns will be created.
  1556. s.mu.Unlock()
  1557. s.serveWG.Wait()
  1558. s.mu.Lock()
  1559. for len(s.conns) != 0 {
  1560. s.cv.Wait()
  1561. }
  1562. s.conns = nil
  1563. if s.events != nil {
  1564. s.events.Finish()
  1565. s.events = nil
  1566. }
  1567. s.mu.Unlock()
  1568. }
  1569. // contentSubtype must be lowercase
  1570. // cannot return nil
  1571. func (s *Server) getCodec(contentSubtype string) baseCodec {
  1572. if s.opts.codec != nil {
  1573. return s.opts.codec
  1574. }
  1575. if contentSubtype == "" {
  1576. return encoding.GetCodec(proto.Name)
  1577. }
  1578. codec := encoding.GetCodec(contentSubtype)
  1579. if codec == nil {
  1580. return encoding.GetCodec(proto.Name)
  1581. }
  1582. return codec
  1583. }
  1584. // SetHeader sets the header metadata.
  1585. // When called multiple times, all the provided metadata will be merged.
  1586. // All the metadata will be sent out when one of the following happens:
  1587. // - grpc.SendHeader() is called;
  1588. // - The first response is sent out;
  1589. // - An RPC status is sent out (error or success).
  1590. func SetHeader(ctx context.Context, md metadata.MD) error {
  1591. if md.Len() == 0 {
  1592. return nil
  1593. }
  1594. stream := ServerTransportStreamFromContext(ctx)
  1595. if stream == nil {
  1596. return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1597. }
  1598. return stream.SetHeader(md)
  1599. }
  1600. // SendHeader sends header metadata. It may be called at most once.
  1601. // The provided md and headers set by SetHeader() will be sent.
  1602. func SendHeader(ctx context.Context, md metadata.MD) error {
  1603. stream := ServerTransportStreamFromContext(ctx)
  1604. if stream == nil {
  1605. return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1606. }
  1607. if err := stream.SendHeader(md); err != nil {
  1608. return toRPCErr(err)
  1609. }
  1610. return nil
  1611. }
  1612. // SetTrailer sets the trailer metadata that will be sent when an RPC returns.
  1613. // When called more than once, all the provided metadata will be merged.
  1614. func SetTrailer(ctx context.Context, md metadata.MD) error {
  1615. if md.Len() == 0 {
  1616. return nil
  1617. }
  1618. stream := ServerTransportStreamFromContext(ctx)
  1619. if stream == nil {
  1620. return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1621. }
  1622. return stream.SetTrailer(md)
  1623. }
  1624. // Method returns the method string for the server context. The returned
  1625. // string is in the format of "/service/method".
  1626. func Method(ctx context.Context) (string, bool) {
  1627. s := ServerTransportStreamFromContext(ctx)
  1628. if s == nil {
  1629. return "", false
  1630. }
  1631. return s.Method(), true
  1632. }
  1633. type channelzServer struct {
  1634. s *Server
  1635. }
  1636. func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
  1637. return c.s.channelzMetric()
  1638. }