server.go 53 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "errors"
  22. "fmt"
  23. "io"
  24. "math"
  25. "net"
  26. "net/http"
  27. "reflect"
  28. "runtime"
  29. "strings"
  30. "sync"
  31. "sync/atomic"
  32. "time"
  33. "golang.org/x/net/trace"
  34. "google.golang.org/grpc/codes"
  35. "google.golang.org/grpc/credentials"
  36. "google.golang.org/grpc/encoding"
  37. "google.golang.org/grpc/encoding/proto"
  38. "google.golang.org/grpc/grpclog"
  39. "google.golang.org/grpc/internal/binarylog"
  40. "google.golang.org/grpc/internal/channelz"
  41. "google.golang.org/grpc/internal/grpcrand"
  42. "google.golang.org/grpc/internal/grpcsync"
  43. "google.golang.org/grpc/internal/transport"
  44. "google.golang.org/grpc/keepalive"
  45. "google.golang.org/grpc/metadata"
  46. "google.golang.org/grpc/peer"
  47. "google.golang.org/grpc/stats"
  48. "google.golang.org/grpc/status"
  49. "google.golang.org/grpc/tap"
  50. )
  51. const (
  52. defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
  53. defaultServerMaxSendMessageSize = math.MaxInt32
  54. )
  55. var statusOK = status.New(codes.OK, "")
  56. var logger = grpclog.Component("core")
  57. type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
  58. // MethodDesc represents an RPC service's method specification.
  59. type MethodDesc struct {
  60. MethodName string
  61. Handler methodHandler
  62. }
  63. // ServiceDesc represents an RPC service's specification.
  64. type ServiceDesc struct {
  65. ServiceName string
  66. // The pointer to the service interface. Used to check whether the user
  67. // provided implementation satisfies the interface requirements.
  68. HandlerType interface{}
  69. Methods []MethodDesc
  70. Streams []StreamDesc
  71. Metadata interface{}
  72. }
  73. // serviceInfo wraps information about a service. It is very similar to
  74. // ServiceDesc and is constructed from it for internal purposes.
  75. type serviceInfo struct {
  76. // Contains the implementation for the methods in this service.
  77. serviceImpl interface{}
  78. methods map[string]*MethodDesc
  79. streams map[string]*StreamDesc
  80. mdata interface{}
  81. }
  82. type serverWorkerData struct {
  83. st transport.ServerTransport
  84. wg *sync.WaitGroup
  85. stream *transport.Stream
  86. }
  87. // Server is a gRPC server to serve RPC requests.
  88. type Server struct {
  89. opts serverOptions
  90. mu sync.Mutex // guards following
  91. lis map[net.Listener]bool
  92. conns map[transport.ServerTransport]bool
  93. serve bool
  94. drain bool
  95. cv *sync.Cond // signaled when connections close for GracefulStop
  96. services map[string]*serviceInfo // service name -> service info
  97. events trace.EventLog
  98. quit *grpcsync.Event
  99. done *grpcsync.Event
  100. channelzRemoveOnce sync.Once
  101. serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
  102. channelzID int64 // channelz unique identification number
  103. czData *channelzData
  104. serverWorkerChannels []chan *serverWorkerData
  105. }
  106. type serverOptions struct {
  107. creds credentials.TransportCredentials
  108. codec baseCodec
  109. cp Compressor
  110. dc Decompressor
  111. unaryInt UnaryServerInterceptor
  112. streamInt StreamServerInterceptor
  113. chainUnaryInts []UnaryServerInterceptor
  114. chainStreamInts []StreamServerInterceptor
  115. inTapHandle tap.ServerInHandle
  116. statsHandler stats.Handler
  117. maxConcurrentStreams uint32
  118. maxReceiveMessageSize int
  119. maxSendMessageSize int
  120. unknownStreamDesc *StreamDesc
  121. keepaliveParams keepalive.ServerParameters
  122. keepalivePolicy keepalive.EnforcementPolicy
  123. initialWindowSize int32
  124. initialConnWindowSize int32
  125. writeBufferSize int
  126. readBufferSize int
  127. connectionTimeout time.Duration
  128. maxHeaderListSize *uint32
  129. headerTableSize *uint32
  130. numServerWorkers uint32
  131. }
  132. var defaultServerOptions = serverOptions{
  133. maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
  134. maxSendMessageSize: defaultServerMaxSendMessageSize,
  135. connectionTimeout: 120 * time.Second,
  136. writeBufferSize: defaultWriteBufSize,
  137. readBufferSize: defaultReadBufSize,
  138. }
  139. // A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
  140. type ServerOption interface {
  141. apply(*serverOptions)
  142. }
  143. // EmptyServerOption does not alter the server configuration. It can be embedded
  144. // in another structure to build custom server options.
  145. //
  146. // This API is EXPERIMENTAL.
  147. type EmptyServerOption struct{}
  148. func (EmptyServerOption) apply(*serverOptions) {}
  149. // funcServerOption wraps a function that modifies serverOptions into an
  150. // implementation of the ServerOption interface.
  151. type funcServerOption struct {
  152. f func(*serverOptions)
  153. }
  154. func (fdo *funcServerOption) apply(do *serverOptions) {
  155. fdo.f(do)
  156. }
  157. func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
  158. return &funcServerOption{
  159. f: f,
  160. }
  161. }
  162. // WriteBufferSize determines how much data can be batched before doing a write on the wire.
  163. // The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
  164. // The default value for this buffer is 32KB.
  165. // Zero will disable the write buffer such that each write will be on underlying connection.
  166. // Note: A Send call may not directly translate to a write.
  167. func WriteBufferSize(s int) ServerOption {
  168. return newFuncServerOption(func(o *serverOptions) {
  169. o.writeBufferSize = s
  170. })
  171. }
  172. // ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
  173. // for one read syscall.
  174. // The default value for this buffer is 32KB.
  175. // Zero will disable read buffer for a connection so data framer can access the underlying
  176. // conn directly.
  177. func ReadBufferSize(s int) ServerOption {
  178. return newFuncServerOption(func(o *serverOptions) {
  179. o.readBufferSize = s
  180. })
  181. }
  182. // InitialWindowSize returns a ServerOption that sets window size for stream.
  183. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  184. func InitialWindowSize(s int32) ServerOption {
  185. return newFuncServerOption(func(o *serverOptions) {
  186. o.initialWindowSize = s
  187. })
  188. }
  189. // InitialConnWindowSize returns a ServerOption that sets window size for a connection.
  190. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  191. func InitialConnWindowSize(s int32) ServerOption {
  192. return newFuncServerOption(func(o *serverOptions) {
  193. o.initialConnWindowSize = s
  194. })
  195. }
  196. // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
  197. func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
  198. if kp.Time > 0 && kp.Time < time.Second {
  199. logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
  200. kp.Time = time.Second
  201. }
  202. return newFuncServerOption(func(o *serverOptions) {
  203. o.keepaliveParams = kp
  204. })
  205. }
  206. // KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
  207. func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
  208. return newFuncServerOption(func(o *serverOptions) {
  209. o.keepalivePolicy = kep
  210. })
  211. }
  212. // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
  213. //
  214. // This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
  215. //
  216. // Deprecated: register codecs using encoding.RegisterCodec. The server will
  217. // automatically use registered codecs based on the incoming requests' headers.
  218. // See also
  219. // https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
  220. // Will be supported throughout 1.x.
  221. func CustomCodec(codec Codec) ServerOption {
  222. return newFuncServerOption(func(o *serverOptions) {
  223. o.codec = codec
  224. })
  225. }
  226. // RPCCompressor returns a ServerOption that sets a compressor for outbound
  227. // messages. For backward compatibility, all outbound messages will be sent
  228. // using this compressor, regardless of incoming message compression. By
  229. // default, server messages will be sent using the same compressor with which
  230. // request messages were sent.
  231. //
  232. // Deprecated: use encoding.RegisterCompressor instead. Will be supported
  233. // throughout 1.x.
  234. func RPCCompressor(cp Compressor) ServerOption {
  235. return newFuncServerOption(func(o *serverOptions) {
  236. o.cp = cp
  237. })
  238. }
  239. // RPCDecompressor returns a ServerOption that sets a decompressor for inbound
  240. // messages. It has higher priority than decompressors registered via
  241. // encoding.RegisterCompressor.
  242. //
  243. // Deprecated: use encoding.RegisterCompressor instead. Will be supported
  244. // throughout 1.x.
  245. func RPCDecompressor(dc Decompressor) ServerOption {
  246. return newFuncServerOption(func(o *serverOptions) {
  247. o.dc = dc
  248. })
  249. }
  250. // MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
  251. // If this is not set, gRPC uses the default limit.
  252. //
  253. // Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x.
  254. func MaxMsgSize(m int) ServerOption {
  255. return MaxRecvMsgSize(m)
  256. }
  257. // MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
  258. // If this is not set, gRPC uses the default 4MB.
  259. func MaxRecvMsgSize(m int) ServerOption {
  260. return newFuncServerOption(func(o *serverOptions) {
  261. o.maxReceiveMessageSize = m
  262. })
  263. }
  264. // MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
  265. // If this is not set, gRPC uses the default `math.MaxInt32`.
  266. func MaxSendMsgSize(m int) ServerOption {
  267. return newFuncServerOption(func(o *serverOptions) {
  268. o.maxSendMessageSize = m
  269. })
  270. }
  271. // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
  272. // of concurrent streams to each ServerTransport.
  273. func MaxConcurrentStreams(n uint32) ServerOption {
  274. return newFuncServerOption(func(o *serverOptions) {
  275. o.maxConcurrentStreams = n
  276. })
  277. }
  278. // Creds returns a ServerOption that sets credentials for server connections.
  279. func Creds(c credentials.TransportCredentials) ServerOption {
  280. return newFuncServerOption(func(o *serverOptions) {
  281. o.creds = c
  282. })
  283. }
  284. // UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
  285. // server. Only one unary interceptor can be installed. The construction of multiple
  286. // interceptors (e.g., chaining) can be implemented at the caller.
  287. func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
  288. return newFuncServerOption(func(o *serverOptions) {
  289. if o.unaryInt != nil {
  290. panic("The unary server interceptor was already set and may not be reset.")
  291. }
  292. o.unaryInt = i
  293. })
  294. }
  295. // ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
  296. // for unary RPCs. The first interceptor will be the outer most,
  297. // while the last interceptor will be the inner most wrapper around the real call.
  298. // All unary interceptors added by this method will be chained.
  299. func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
  300. return newFuncServerOption(func(o *serverOptions) {
  301. o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
  302. })
  303. }
  304. // StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
  305. // server. Only one stream interceptor can be installed.
  306. func StreamInterceptor(i StreamServerInterceptor) ServerOption {
  307. return newFuncServerOption(func(o *serverOptions) {
  308. if o.streamInt != nil {
  309. panic("The stream server interceptor was already set and may not be reset.")
  310. }
  311. o.streamInt = i
  312. })
  313. }
  314. // ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
  315. // for streaming RPCs. The first interceptor will be the outer most,
  316. // while the last interceptor will be the inner most wrapper around the real call.
  317. // All stream interceptors added by this method will be chained.
  318. func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
  319. return newFuncServerOption(func(o *serverOptions) {
  320. o.chainStreamInts = append(o.chainStreamInts, interceptors...)
  321. })
  322. }
  323. // InTapHandle returns a ServerOption that sets the tap handle for all the server
  324. // transport to be created. Only one can be installed.
  325. func InTapHandle(h tap.ServerInHandle) ServerOption {
  326. return newFuncServerOption(func(o *serverOptions) {
  327. if o.inTapHandle != nil {
  328. panic("The tap handle was already set and may not be reset.")
  329. }
  330. o.inTapHandle = h
  331. })
  332. }
  333. // StatsHandler returns a ServerOption that sets the stats handler for the server.
  334. func StatsHandler(h stats.Handler) ServerOption {
  335. return newFuncServerOption(func(o *serverOptions) {
  336. o.statsHandler = h
  337. })
  338. }
  339. // UnknownServiceHandler returns a ServerOption that allows for adding a custom
  340. // unknown service handler. The provided method is a bidi-streaming RPC service
  341. // handler that will be invoked instead of returning the "unimplemented" gRPC
  342. // error whenever a request is received for an unregistered service or method.
  343. // The handling function and stream interceptor (if set) have full access to
  344. // the ServerStream, including its Context.
  345. func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
  346. return newFuncServerOption(func(o *serverOptions) {
  347. o.unknownStreamDesc = &StreamDesc{
  348. StreamName: "unknown_service_handler",
  349. Handler: streamHandler,
  350. // We need to assume that the users of the streamHandler will want to use both.
  351. ClientStreams: true,
  352. ServerStreams: true,
  353. }
  354. })
  355. }
  356. // ConnectionTimeout returns a ServerOption that sets the timeout for
  357. // connection establishment (up to and including HTTP/2 handshaking) for all
  358. // new connections. If this is not set, the default is 120 seconds. A zero or
  359. // negative value will result in an immediate timeout.
  360. //
  361. // This API is EXPERIMENTAL.
  362. func ConnectionTimeout(d time.Duration) ServerOption {
  363. return newFuncServerOption(func(o *serverOptions) {
  364. o.connectionTimeout = d
  365. })
  366. }
  367. // MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
  368. // of header list that the server is prepared to accept.
  369. func MaxHeaderListSize(s uint32) ServerOption {
  370. return newFuncServerOption(func(o *serverOptions) {
  371. o.maxHeaderListSize = &s
  372. })
  373. }
  374. // HeaderTableSize returns a ServerOption that sets the size of dynamic
  375. // header table for stream.
  376. //
  377. // This API is EXPERIMENTAL.
  378. func HeaderTableSize(s uint32) ServerOption {
  379. return newFuncServerOption(func(o *serverOptions) {
  380. o.headerTableSize = &s
  381. })
  382. }
  383. // NumStreamWorkers returns a ServerOption that sets the number of worker
  384. // goroutines that should be used to process incoming streams. Setting this to
  385. // zero (default) will disable workers and spawn a new goroutine for each
  386. // stream.
  387. //
  388. // This API is EXPERIMENTAL.
  389. func NumStreamWorkers(numServerWorkers uint32) ServerOption {
  390. // TODO: If/when this API gets stabilized (i.e. stream workers become the
  391. // only way streams are processed), change the behavior of the zero value to
  392. // a sane default. Preliminary experiments suggest that a value equal to the
  393. // number of CPUs available is most performant; requires thorough testing.
  394. return newFuncServerOption(func(o *serverOptions) {
  395. o.numServerWorkers = numServerWorkers
  396. })
  397. }
  398. // serverWorkerResetThreshold defines how often the stack must be reset. Every
  399. // N requests, by spawning a new goroutine in its place, a worker can reset its
  400. // stack so that large stacks don't live in memory forever. 2^16 should allow
  401. // each goroutine stack to live for at least a few seconds in a typical
  402. // workload (assuming a QPS of a few thousand requests/sec).
  403. const serverWorkerResetThreshold = 1 << 16
  404. // serverWorkers blocks on a *transport.Stream channel forever and waits for
  405. // data to be fed by serveStreams. This allows different requests to be
  406. // processed by the same goroutine, removing the need for expensive stack
  407. // re-allocations (see the runtime.morestack problem [1]).
  408. //
  409. // [1] https://github.com/golang/go/issues/18138
  410. func (s *Server) serverWorker(ch chan *serverWorkerData) {
  411. // To make sure all server workers don't reset at the same time, choose a
  412. // random number of iterations before resetting.
  413. threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
  414. for completed := 0; completed < threshold; completed++ {
  415. data, ok := <-ch
  416. if !ok {
  417. return
  418. }
  419. s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
  420. data.wg.Done()
  421. }
  422. go s.serverWorker(ch)
  423. }
  424. // initServerWorkers creates worker goroutines and channels to process incoming
  425. // connections to reduce the time spent overall on runtime.morestack.
  426. func (s *Server) initServerWorkers() {
  427. s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers)
  428. for i := uint32(0); i < s.opts.numServerWorkers; i++ {
  429. s.serverWorkerChannels[i] = make(chan *serverWorkerData)
  430. go s.serverWorker(s.serverWorkerChannels[i])
  431. }
  432. }
  433. func (s *Server) stopServerWorkers() {
  434. for i := uint32(0); i < s.opts.numServerWorkers; i++ {
  435. close(s.serverWorkerChannels[i])
  436. }
  437. }
  438. // NewServer creates a gRPC server which has no service registered and has not
  439. // started to accept requests yet.
  440. func NewServer(opt ...ServerOption) *Server {
  441. opts := defaultServerOptions
  442. for _, o := range opt {
  443. o.apply(&opts)
  444. }
  445. s := &Server{
  446. lis: make(map[net.Listener]bool),
  447. opts: opts,
  448. conns: make(map[transport.ServerTransport]bool),
  449. services: make(map[string]*serviceInfo),
  450. quit: grpcsync.NewEvent(),
  451. done: grpcsync.NewEvent(),
  452. czData: new(channelzData),
  453. }
  454. chainUnaryServerInterceptors(s)
  455. chainStreamServerInterceptors(s)
  456. s.cv = sync.NewCond(&s.mu)
  457. if EnableTracing {
  458. _, file, line, _ := runtime.Caller(1)
  459. s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
  460. }
  461. if s.opts.numServerWorkers > 0 {
  462. s.initServerWorkers()
  463. }
  464. if channelz.IsOn() {
  465. s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
  466. }
  467. return s
  468. }
  469. // printf records an event in s's event log, unless s has been stopped.
  470. // REQUIRES s.mu is held.
  471. func (s *Server) printf(format string, a ...interface{}) {
  472. if s.events != nil {
  473. s.events.Printf(format, a...)
  474. }
  475. }
  476. // errorf records an error in s's event log, unless s has been stopped.
  477. // REQUIRES s.mu is held.
  478. func (s *Server) errorf(format string, a ...interface{}) {
  479. if s.events != nil {
  480. s.events.Errorf(format, a...)
  481. }
  482. }
  483. // ServiceRegistrar wraps a single method that supports service registration. It
  484. // enables users to pass concrete types other than grpc.Server to the service
  485. // registration methods exported by the IDL generated code.
  486. type ServiceRegistrar interface {
  487. // RegisterService registers a service and its implementation to the
  488. // concrete type implementing this interface. It may not be called
  489. // once the server has started serving.
  490. // desc describes the service and its methods and handlers. impl is the
  491. // service implementation which is passed to the method handlers.
  492. RegisterService(desc *ServiceDesc, impl interface{})
  493. }
  494. // RegisterService registers a service and its implementation to the gRPC
  495. // server. It is called from the IDL generated code. This must be called before
  496. // invoking Serve. If ss is non-nil (for legacy code), its type is checked to
  497. // ensure it implements sd.HandlerType.
  498. func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
  499. if ss != nil {
  500. ht := reflect.TypeOf(sd.HandlerType).Elem()
  501. st := reflect.TypeOf(ss)
  502. if !st.Implements(ht) {
  503. logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
  504. }
  505. }
  506. s.register(sd, ss)
  507. }
  508. func (s *Server) register(sd *ServiceDesc, ss interface{}) {
  509. s.mu.Lock()
  510. defer s.mu.Unlock()
  511. s.printf("RegisterService(%q)", sd.ServiceName)
  512. if s.serve {
  513. logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
  514. }
  515. if _, ok := s.services[sd.ServiceName]; ok {
  516. logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
  517. }
  518. info := &serviceInfo{
  519. serviceImpl: ss,
  520. methods: make(map[string]*MethodDesc),
  521. streams: make(map[string]*StreamDesc),
  522. mdata: sd.Metadata,
  523. }
  524. for i := range sd.Methods {
  525. d := &sd.Methods[i]
  526. info.methods[d.MethodName] = d
  527. }
  528. for i := range sd.Streams {
  529. d := &sd.Streams[i]
  530. info.streams[d.StreamName] = d
  531. }
  532. s.services[sd.ServiceName] = info
  533. }
  534. // MethodInfo contains the information of an RPC including its method name and type.
  535. type MethodInfo struct {
  536. // Name is the method name only, without the service name or package name.
  537. Name string
  538. // IsClientStream indicates whether the RPC is a client streaming RPC.
  539. IsClientStream bool
  540. // IsServerStream indicates whether the RPC is a server streaming RPC.
  541. IsServerStream bool
  542. }
  543. // ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
  544. type ServiceInfo struct {
  545. Methods []MethodInfo
  546. // Metadata is the metadata specified in ServiceDesc when registering service.
  547. Metadata interface{}
  548. }
  549. // GetServiceInfo returns a map from service names to ServiceInfo.
  550. // Service names include the package names, in the form of <package>.<service>.
  551. func (s *Server) GetServiceInfo() map[string]ServiceInfo {
  552. ret := make(map[string]ServiceInfo)
  553. for n, srv := range s.services {
  554. methods := make([]MethodInfo, 0, len(srv.methods)+len(srv.streams))
  555. for m := range srv.methods {
  556. methods = append(methods, MethodInfo{
  557. Name: m,
  558. IsClientStream: false,
  559. IsServerStream: false,
  560. })
  561. }
  562. for m, d := range srv.streams {
  563. methods = append(methods, MethodInfo{
  564. Name: m,
  565. IsClientStream: d.ClientStreams,
  566. IsServerStream: d.ServerStreams,
  567. })
  568. }
  569. ret[n] = ServiceInfo{
  570. Methods: methods,
  571. Metadata: srv.mdata,
  572. }
  573. }
  574. return ret
  575. }
  576. // ErrServerStopped indicates that the operation is now illegal because of
  577. // the server being stopped.
  578. var ErrServerStopped = errors.New("grpc: the server has been stopped")
  579. func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
  580. if s.opts.creds == nil {
  581. return rawConn, nil, nil
  582. }
  583. return s.opts.creds.ServerHandshake(rawConn)
  584. }
  585. type listenSocket struct {
  586. net.Listener
  587. channelzID int64
  588. }
  589. func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
  590. return &channelz.SocketInternalMetric{
  591. SocketOptions: channelz.GetSocketOption(l.Listener),
  592. LocalAddr: l.Listener.Addr(),
  593. }
  594. }
  595. func (l *listenSocket) Close() error {
  596. err := l.Listener.Close()
  597. if channelz.IsOn() {
  598. channelz.RemoveEntry(l.channelzID)
  599. }
  600. return err
  601. }
  602. // Serve accepts incoming connections on the listener lis, creating a new
  603. // ServerTransport and service goroutine for each. The service goroutines
  604. // read gRPC requests and then call the registered handlers to reply to them.
  605. // Serve returns when lis.Accept fails with fatal errors. lis will be closed when
  606. // this method returns.
  607. // Serve will return a non-nil error unless Stop or GracefulStop is called.
  608. func (s *Server) Serve(lis net.Listener) error {
  609. s.mu.Lock()
  610. s.printf("serving")
  611. s.serve = true
  612. if s.lis == nil {
  613. // Serve called after Stop or GracefulStop.
  614. s.mu.Unlock()
  615. lis.Close()
  616. return ErrServerStopped
  617. }
  618. s.serveWG.Add(1)
  619. defer func() {
  620. s.serveWG.Done()
  621. if s.quit.HasFired() {
  622. // Stop or GracefulStop called; block until done and return nil.
  623. <-s.done.Done()
  624. }
  625. }()
  626. ls := &listenSocket{Listener: lis}
  627. s.lis[ls] = true
  628. if channelz.IsOn() {
  629. ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
  630. }
  631. s.mu.Unlock()
  632. defer func() {
  633. s.mu.Lock()
  634. if s.lis != nil && s.lis[ls] {
  635. ls.Close()
  636. delete(s.lis, ls)
  637. }
  638. s.mu.Unlock()
  639. }()
  640. var tempDelay time.Duration // how long to sleep on accept failure
  641. for {
  642. rawConn, err := lis.Accept()
  643. if err != nil {
  644. if ne, ok := err.(interface {
  645. Temporary() bool
  646. }); ok && ne.Temporary() {
  647. if tempDelay == 0 {
  648. tempDelay = 5 * time.Millisecond
  649. } else {
  650. tempDelay *= 2
  651. }
  652. if max := 1 * time.Second; tempDelay > max {
  653. tempDelay = max
  654. }
  655. s.mu.Lock()
  656. s.printf("Accept error: %v; retrying in %v", err, tempDelay)
  657. s.mu.Unlock()
  658. timer := time.NewTimer(tempDelay)
  659. select {
  660. case <-timer.C:
  661. case <-s.quit.Done():
  662. timer.Stop()
  663. return nil
  664. }
  665. continue
  666. }
  667. s.mu.Lock()
  668. s.printf("done serving; Accept = %v", err)
  669. s.mu.Unlock()
  670. if s.quit.HasFired() {
  671. return nil
  672. }
  673. return err
  674. }
  675. tempDelay = 0
  676. // Start a new goroutine to deal with rawConn so we don't stall this Accept
  677. // loop goroutine.
  678. //
  679. // Make sure we account for the goroutine so GracefulStop doesn't nil out
  680. // s.conns before this conn can be added.
  681. s.serveWG.Add(1)
  682. go func() {
  683. s.handleRawConn(rawConn)
  684. s.serveWG.Done()
  685. }()
  686. }
  687. }
  688. // handleRawConn forks a goroutine to handle a just-accepted connection that
  689. // has not had any I/O performed on it yet.
  690. func (s *Server) handleRawConn(rawConn net.Conn) {
  691. if s.quit.HasFired() {
  692. rawConn.Close()
  693. return
  694. }
  695. rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
  696. conn, authInfo, err := s.useTransportAuthenticator(rawConn)
  697. if err != nil {
  698. // ErrConnDispatched means that the connection was dispatched away from
  699. // gRPC; those connections should be left open.
  700. if err != credentials.ErrConnDispatched {
  701. s.mu.Lock()
  702. s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
  703. s.mu.Unlock()
  704. channelz.Warningf(logger, s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
  705. rawConn.Close()
  706. }
  707. rawConn.SetDeadline(time.Time{})
  708. return
  709. }
  710. // Finish handshaking (HTTP2)
  711. st := s.newHTTP2Transport(conn, authInfo)
  712. if st == nil {
  713. return
  714. }
  715. rawConn.SetDeadline(time.Time{})
  716. if !s.addConn(st) {
  717. return
  718. }
  719. go func() {
  720. s.serveStreams(st)
  721. s.removeConn(st)
  722. }()
  723. }
  724. // newHTTP2Transport sets up a http/2 transport (using the
  725. // gRPC http2 server transport in transport/http2_server.go).
  726. func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
  727. config := &transport.ServerConfig{
  728. MaxStreams: s.opts.maxConcurrentStreams,
  729. AuthInfo: authInfo,
  730. InTapHandle: s.opts.inTapHandle,
  731. StatsHandler: s.opts.statsHandler,
  732. KeepaliveParams: s.opts.keepaliveParams,
  733. KeepalivePolicy: s.opts.keepalivePolicy,
  734. InitialWindowSize: s.opts.initialWindowSize,
  735. InitialConnWindowSize: s.opts.initialConnWindowSize,
  736. WriteBufferSize: s.opts.writeBufferSize,
  737. ReadBufferSize: s.opts.readBufferSize,
  738. ChannelzParentID: s.channelzID,
  739. MaxHeaderListSize: s.opts.maxHeaderListSize,
  740. HeaderTableSize: s.opts.headerTableSize,
  741. }
  742. st, err := transport.NewServerTransport("http2", c, config)
  743. if err != nil {
  744. s.mu.Lock()
  745. s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
  746. s.mu.Unlock()
  747. c.Close()
  748. channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
  749. return nil
  750. }
  751. return st
  752. }
  753. func (s *Server) serveStreams(st transport.ServerTransport) {
  754. defer st.Close()
  755. var wg sync.WaitGroup
  756. var roundRobinCounter uint32
  757. st.HandleStreams(func(stream *transport.Stream) {
  758. wg.Add(1)
  759. if s.opts.numServerWorkers > 0 {
  760. data := &serverWorkerData{st: st, wg: &wg, stream: stream}
  761. select {
  762. case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
  763. default:
  764. // If all stream workers are busy, fallback to the default code path.
  765. go func() {
  766. s.handleStream(st, stream, s.traceInfo(st, stream))
  767. wg.Done()
  768. }()
  769. }
  770. } else {
  771. go func() {
  772. defer wg.Done()
  773. s.handleStream(st, stream, s.traceInfo(st, stream))
  774. }()
  775. }
  776. }, func(ctx context.Context, method string) context.Context {
  777. if !EnableTracing {
  778. return ctx
  779. }
  780. tr := trace.New("grpc.Recv."+methodFamily(method), method)
  781. return trace.NewContext(ctx, tr)
  782. })
  783. wg.Wait()
  784. }
  785. var _ http.Handler = (*Server)(nil)
  786. // ServeHTTP implements the Go standard library's http.Handler
  787. // interface by responding to the gRPC request r, by looking up
  788. // the requested gRPC method in the gRPC server s.
  789. //
  790. // The provided HTTP request must have arrived on an HTTP/2
  791. // connection. When using the Go standard library's server,
  792. // practically this means that the Request must also have arrived
  793. // over TLS.
  794. //
  795. // To share one port (such as 443 for https) between gRPC and an
  796. // existing http.Handler, use a root http.Handler such as:
  797. //
  798. // if r.ProtoMajor == 2 && strings.HasPrefix(
  799. // r.Header.Get("Content-Type"), "application/grpc") {
  800. // grpcServer.ServeHTTP(w, r)
  801. // } else {
  802. // yourMux.ServeHTTP(w, r)
  803. // }
  804. //
  805. // Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
  806. // separate from grpc-go's HTTP/2 server. Performance and features may vary
  807. // between the two paths. ServeHTTP does not support some gRPC features
  808. // available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
  809. // and subject to change.
  810. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  811. st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
  812. if err != nil {
  813. http.Error(w, err.Error(), http.StatusInternalServerError)
  814. return
  815. }
  816. if !s.addConn(st) {
  817. return
  818. }
  819. defer s.removeConn(st)
  820. s.serveStreams(st)
  821. }
  822. // traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
  823. // If tracing is not enabled, it returns nil.
  824. func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
  825. if !EnableTracing {
  826. return nil
  827. }
  828. tr, ok := trace.FromContext(stream.Context())
  829. if !ok {
  830. return nil
  831. }
  832. trInfo = &traceInfo{
  833. tr: tr,
  834. firstLine: firstLine{
  835. client: false,
  836. remoteAddr: st.RemoteAddr(),
  837. },
  838. }
  839. if dl, ok := stream.Context().Deadline(); ok {
  840. trInfo.firstLine.deadline = time.Until(dl)
  841. }
  842. return trInfo
  843. }
  844. func (s *Server) addConn(st transport.ServerTransport) bool {
  845. s.mu.Lock()
  846. defer s.mu.Unlock()
  847. if s.conns == nil {
  848. st.Close()
  849. return false
  850. }
  851. if s.drain {
  852. // Transport added after we drained our existing conns: drain it
  853. // immediately.
  854. st.Drain()
  855. }
  856. s.conns[st] = true
  857. return true
  858. }
  859. func (s *Server) removeConn(st transport.ServerTransport) {
  860. s.mu.Lock()
  861. defer s.mu.Unlock()
  862. if s.conns != nil {
  863. delete(s.conns, st)
  864. s.cv.Broadcast()
  865. }
  866. }
  867. func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
  868. return &channelz.ServerInternalMetric{
  869. CallsStarted: atomic.LoadInt64(&s.czData.callsStarted),
  870. CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded),
  871. CallsFailed: atomic.LoadInt64(&s.czData.callsFailed),
  872. LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
  873. }
  874. }
  875. func (s *Server) incrCallsStarted() {
  876. atomic.AddInt64(&s.czData.callsStarted, 1)
  877. atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
  878. }
  879. func (s *Server) incrCallsSucceeded() {
  880. atomic.AddInt64(&s.czData.callsSucceeded, 1)
  881. }
  882. func (s *Server) incrCallsFailed() {
  883. atomic.AddInt64(&s.czData.callsFailed, 1)
  884. }
  885. func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
  886. data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
  887. if err != nil {
  888. channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err)
  889. return err
  890. }
  891. compData, err := compress(data, cp, comp)
  892. if err != nil {
  893. channelz.Error(logger, s.channelzID, "grpc: server failed to compress response: ", err)
  894. return err
  895. }
  896. hdr, payload := msgHeader(data, compData)
  897. // TODO(dfawley): should we be checking len(data) instead?
  898. if len(payload) > s.opts.maxSendMessageSize {
  899. return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
  900. }
  901. err = t.Write(stream, hdr, payload, opts)
  902. if err == nil && s.opts.statsHandler != nil {
  903. s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
  904. }
  905. return err
  906. }
  907. // chainUnaryServerInterceptors chains all unary server interceptors into one.
  908. func chainUnaryServerInterceptors(s *Server) {
  909. // Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
  910. // be executed before any other chained interceptors.
  911. interceptors := s.opts.chainUnaryInts
  912. if s.opts.unaryInt != nil {
  913. interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
  914. }
  915. var chainedInt UnaryServerInterceptor
  916. if len(interceptors) == 0 {
  917. chainedInt = nil
  918. } else if len(interceptors) == 1 {
  919. chainedInt = interceptors[0]
  920. } else {
  921. chainedInt = func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
  922. return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
  923. }
  924. }
  925. s.opts.unaryInt = chainedInt
  926. }
  927. // getChainUnaryHandler recursively generate the chained UnaryHandler
  928. func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
  929. if curr == len(interceptors)-1 {
  930. return finalHandler
  931. }
  932. return func(ctx context.Context, req interface{}) (interface{}, error) {
  933. return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
  934. }
  935. }
  936. func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
  937. sh := s.opts.statsHandler
  938. if sh != nil || trInfo != nil || channelz.IsOn() {
  939. if channelz.IsOn() {
  940. s.incrCallsStarted()
  941. }
  942. var statsBegin *stats.Begin
  943. if sh != nil {
  944. beginTime := time.Now()
  945. statsBegin = &stats.Begin{
  946. BeginTime: beginTime,
  947. }
  948. sh.HandleRPC(stream.Context(), statsBegin)
  949. }
  950. if trInfo != nil {
  951. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  952. }
  953. // The deferred error handling for tracing, stats handler and channelz are
  954. // combined into one function to reduce stack usage -- a defer takes ~56-64
  955. // bytes on the stack, so overflowing the stack will require a stack
  956. // re-allocation, which is expensive.
  957. //
  958. // To maintain behavior similar to separate deferred statements, statements
  959. // should be executed in the reverse order. That is, tracing first, stats
  960. // handler second, and channelz last. Note that panics *within* defers will
  961. // lead to different behavior, but that's an acceptable compromise; that
  962. // would be undefined behavior territory anyway.
  963. defer func() {
  964. if trInfo != nil {
  965. if err != nil && err != io.EOF {
  966. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  967. trInfo.tr.SetError()
  968. }
  969. trInfo.tr.Finish()
  970. }
  971. if sh != nil {
  972. end := &stats.End{
  973. BeginTime: statsBegin.BeginTime,
  974. EndTime: time.Now(),
  975. }
  976. if err != nil && err != io.EOF {
  977. end.Error = toRPCErr(err)
  978. }
  979. sh.HandleRPC(stream.Context(), end)
  980. }
  981. if channelz.IsOn() {
  982. if err != nil && err != io.EOF {
  983. s.incrCallsFailed()
  984. } else {
  985. s.incrCallsSucceeded()
  986. }
  987. }
  988. }()
  989. }
  990. binlog := binarylog.GetMethodLogger(stream.Method())
  991. if binlog != nil {
  992. ctx := stream.Context()
  993. md, _ := metadata.FromIncomingContext(ctx)
  994. logEntry := &binarylog.ClientHeader{
  995. Header: md,
  996. MethodName: stream.Method(),
  997. PeerAddr: nil,
  998. }
  999. if deadline, ok := ctx.Deadline(); ok {
  1000. logEntry.Timeout = time.Until(deadline)
  1001. if logEntry.Timeout < 0 {
  1002. logEntry.Timeout = 0
  1003. }
  1004. }
  1005. if a := md[":authority"]; len(a) > 0 {
  1006. logEntry.Authority = a[0]
  1007. }
  1008. if peer, ok := peer.FromContext(ctx); ok {
  1009. logEntry.PeerAddr = peer.Addr
  1010. }
  1011. binlog.Log(logEntry)
  1012. }
  1013. // comp and cp are used for compression. decomp and dc are used for
  1014. // decompression. If comp and decomp are both set, they are the same;
  1015. // however they are kept separate to ensure that at most one of the
  1016. // compressor/decompressor variable pairs are set for use later.
  1017. var comp, decomp encoding.Compressor
  1018. var cp Compressor
  1019. var dc Decompressor
  1020. // If dc is set and matches the stream's compression, use it. Otherwise, try
  1021. // to find a matching registered compressor for decomp.
  1022. if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
  1023. dc = s.opts.dc
  1024. } else if rc != "" && rc != encoding.Identity {
  1025. decomp = encoding.GetCompressor(rc)
  1026. if decomp == nil {
  1027. st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
  1028. t.WriteStatus(stream, st)
  1029. return st.Err()
  1030. }
  1031. }
  1032. // If cp is set, use it. Otherwise, attempt to compress the response using
  1033. // the incoming message compression method.
  1034. //
  1035. // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
  1036. if s.opts.cp != nil {
  1037. cp = s.opts.cp
  1038. stream.SetSendCompress(cp.Type())
  1039. } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
  1040. // Legacy compressor not specified; attempt to respond with same encoding.
  1041. comp = encoding.GetCompressor(rc)
  1042. if comp != nil {
  1043. stream.SetSendCompress(rc)
  1044. }
  1045. }
  1046. var payInfo *payloadInfo
  1047. if sh != nil || binlog != nil {
  1048. payInfo = &payloadInfo{}
  1049. }
  1050. d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
  1051. if err != nil {
  1052. if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
  1053. channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
  1054. }
  1055. return err
  1056. }
  1057. if channelz.IsOn() {
  1058. t.IncrMsgRecv()
  1059. }
  1060. df := func(v interface{}) error {
  1061. if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
  1062. return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
  1063. }
  1064. if sh != nil {
  1065. sh.HandleRPC(stream.Context(), &stats.InPayload{
  1066. RecvTime: time.Now(),
  1067. Payload: v,
  1068. WireLength: payInfo.wireLength,
  1069. Data: d,
  1070. Length: len(d),
  1071. })
  1072. }
  1073. if binlog != nil {
  1074. binlog.Log(&binarylog.ClientMessage{
  1075. Message: d,
  1076. })
  1077. }
  1078. if trInfo != nil {
  1079. trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
  1080. }
  1081. return nil
  1082. }
  1083. ctx := NewContextWithServerTransportStream(stream.Context(), stream)
  1084. reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
  1085. if appErr != nil {
  1086. appStatus, ok := status.FromError(appErr)
  1087. if !ok {
  1088. // Convert appErr if it is not a grpc status error.
  1089. appErr = status.Error(codes.Unknown, appErr.Error())
  1090. appStatus, _ = status.FromError(appErr)
  1091. }
  1092. if trInfo != nil {
  1093. trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
  1094. trInfo.tr.SetError()
  1095. }
  1096. if e := t.WriteStatus(stream, appStatus); e != nil {
  1097. channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
  1098. }
  1099. if binlog != nil {
  1100. if h, _ := stream.Header(); h.Len() > 0 {
  1101. // Only log serverHeader if there was header. Otherwise it can
  1102. // be trailer only.
  1103. binlog.Log(&binarylog.ServerHeader{
  1104. Header: h,
  1105. })
  1106. }
  1107. binlog.Log(&binarylog.ServerTrailer{
  1108. Trailer: stream.Trailer(),
  1109. Err: appErr,
  1110. })
  1111. }
  1112. return appErr
  1113. }
  1114. if trInfo != nil {
  1115. trInfo.tr.LazyLog(stringer("OK"), false)
  1116. }
  1117. opts := &transport.Options{Last: true}
  1118. if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
  1119. if err == io.EOF {
  1120. // The entire stream is done (for unary RPC only).
  1121. return err
  1122. }
  1123. if sts, ok := status.FromError(err); ok {
  1124. if e := t.WriteStatus(stream, sts); e != nil {
  1125. channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
  1126. }
  1127. } else {
  1128. switch st := err.(type) {
  1129. case transport.ConnectionError:
  1130. // Nothing to do here.
  1131. default:
  1132. panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
  1133. }
  1134. }
  1135. if binlog != nil {
  1136. h, _ := stream.Header()
  1137. binlog.Log(&binarylog.ServerHeader{
  1138. Header: h,
  1139. })
  1140. binlog.Log(&binarylog.ServerTrailer{
  1141. Trailer: stream.Trailer(),
  1142. Err: appErr,
  1143. })
  1144. }
  1145. return err
  1146. }
  1147. if binlog != nil {
  1148. h, _ := stream.Header()
  1149. binlog.Log(&binarylog.ServerHeader{
  1150. Header: h,
  1151. })
  1152. binlog.Log(&binarylog.ServerMessage{
  1153. Message: reply,
  1154. })
  1155. }
  1156. if channelz.IsOn() {
  1157. t.IncrMsgSent()
  1158. }
  1159. if trInfo != nil {
  1160. trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
  1161. }
  1162. // TODO: Should we be logging if writing status failed here, like above?
  1163. // Should the logging be in WriteStatus? Should we ignore the WriteStatus
  1164. // error or allow the stats handler to see it?
  1165. err = t.WriteStatus(stream, statusOK)
  1166. if binlog != nil {
  1167. binlog.Log(&binarylog.ServerTrailer{
  1168. Trailer: stream.Trailer(),
  1169. Err: appErr,
  1170. })
  1171. }
  1172. return err
  1173. }
  1174. // chainStreamServerInterceptors chains all stream server interceptors into one.
  1175. func chainStreamServerInterceptors(s *Server) {
  1176. // Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
  1177. // be executed before any other chained interceptors.
  1178. interceptors := s.opts.chainStreamInts
  1179. if s.opts.streamInt != nil {
  1180. interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...)
  1181. }
  1182. var chainedInt StreamServerInterceptor
  1183. if len(interceptors) == 0 {
  1184. chainedInt = nil
  1185. } else if len(interceptors) == 1 {
  1186. chainedInt = interceptors[0]
  1187. } else {
  1188. chainedInt = func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
  1189. return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
  1190. }
  1191. }
  1192. s.opts.streamInt = chainedInt
  1193. }
  1194. // getChainStreamHandler recursively generate the chained StreamHandler
  1195. func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler {
  1196. if curr == len(interceptors)-1 {
  1197. return finalHandler
  1198. }
  1199. return func(srv interface{}, ss ServerStream) error {
  1200. return interceptors[curr+1](srv, ss, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
  1201. }
  1202. }
  1203. func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
  1204. if channelz.IsOn() {
  1205. s.incrCallsStarted()
  1206. }
  1207. sh := s.opts.statsHandler
  1208. var statsBegin *stats.Begin
  1209. if sh != nil {
  1210. beginTime := time.Now()
  1211. statsBegin = &stats.Begin{
  1212. BeginTime: beginTime,
  1213. }
  1214. sh.HandleRPC(stream.Context(), statsBegin)
  1215. }
  1216. ctx := NewContextWithServerTransportStream(stream.Context(), stream)
  1217. ss := &serverStream{
  1218. ctx: ctx,
  1219. t: t,
  1220. s: stream,
  1221. p: &parser{r: stream},
  1222. codec: s.getCodec(stream.ContentSubtype()),
  1223. maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
  1224. maxSendMessageSize: s.opts.maxSendMessageSize,
  1225. trInfo: trInfo,
  1226. statsHandler: sh,
  1227. }
  1228. if sh != nil || trInfo != nil || channelz.IsOn() {
  1229. // See comment in processUnaryRPC on defers.
  1230. defer func() {
  1231. if trInfo != nil {
  1232. ss.mu.Lock()
  1233. if err != nil && err != io.EOF {
  1234. ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1235. ss.trInfo.tr.SetError()
  1236. }
  1237. ss.trInfo.tr.Finish()
  1238. ss.trInfo.tr = nil
  1239. ss.mu.Unlock()
  1240. }
  1241. if sh != nil {
  1242. end := &stats.End{
  1243. BeginTime: statsBegin.BeginTime,
  1244. EndTime: time.Now(),
  1245. }
  1246. if err != nil && err != io.EOF {
  1247. end.Error = toRPCErr(err)
  1248. }
  1249. sh.HandleRPC(stream.Context(), end)
  1250. }
  1251. if channelz.IsOn() {
  1252. if err != nil && err != io.EOF {
  1253. s.incrCallsFailed()
  1254. } else {
  1255. s.incrCallsSucceeded()
  1256. }
  1257. }
  1258. }()
  1259. }
  1260. ss.binlog = binarylog.GetMethodLogger(stream.Method())
  1261. if ss.binlog != nil {
  1262. md, _ := metadata.FromIncomingContext(ctx)
  1263. logEntry := &binarylog.ClientHeader{
  1264. Header: md,
  1265. MethodName: stream.Method(),
  1266. PeerAddr: nil,
  1267. }
  1268. if deadline, ok := ctx.Deadline(); ok {
  1269. logEntry.Timeout = time.Until(deadline)
  1270. if logEntry.Timeout < 0 {
  1271. logEntry.Timeout = 0
  1272. }
  1273. }
  1274. if a := md[":authority"]; len(a) > 0 {
  1275. logEntry.Authority = a[0]
  1276. }
  1277. if peer, ok := peer.FromContext(ss.Context()); ok {
  1278. logEntry.PeerAddr = peer.Addr
  1279. }
  1280. ss.binlog.Log(logEntry)
  1281. }
  1282. // If dc is set and matches the stream's compression, use it. Otherwise, try
  1283. // to find a matching registered compressor for decomp.
  1284. if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
  1285. ss.dc = s.opts.dc
  1286. } else if rc != "" && rc != encoding.Identity {
  1287. ss.decomp = encoding.GetCompressor(rc)
  1288. if ss.decomp == nil {
  1289. st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
  1290. t.WriteStatus(ss.s, st)
  1291. return st.Err()
  1292. }
  1293. }
  1294. // If cp is set, use it. Otherwise, attempt to compress the response using
  1295. // the incoming message compression method.
  1296. //
  1297. // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
  1298. if s.opts.cp != nil {
  1299. ss.cp = s.opts.cp
  1300. stream.SetSendCompress(s.opts.cp.Type())
  1301. } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
  1302. // Legacy compressor not specified; attempt to respond with same encoding.
  1303. ss.comp = encoding.GetCompressor(rc)
  1304. if ss.comp != nil {
  1305. stream.SetSendCompress(rc)
  1306. }
  1307. }
  1308. if trInfo != nil {
  1309. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  1310. }
  1311. var appErr error
  1312. var server interface{}
  1313. if info != nil {
  1314. server = info.serviceImpl
  1315. }
  1316. if s.opts.streamInt == nil {
  1317. appErr = sd.Handler(server, ss)
  1318. } else {
  1319. info := &StreamServerInfo{
  1320. FullMethod: stream.Method(),
  1321. IsClientStream: sd.ClientStreams,
  1322. IsServerStream: sd.ServerStreams,
  1323. }
  1324. appErr = s.opts.streamInt(server, ss, info, sd.Handler)
  1325. }
  1326. if appErr != nil {
  1327. appStatus, ok := status.FromError(appErr)
  1328. if !ok {
  1329. appStatus = status.New(codes.Unknown, appErr.Error())
  1330. appErr = appStatus.Err()
  1331. }
  1332. if trInfo != nil {
  1333. ss.mu.Lock()
  1334. ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
  1335. ss.trInfo.tr.SetError()
  1336. ss.mu.Unlock()
  1337. }
  1338. t.WriteStatus(ss.s, appStatus)
  1339. if ss.binlog != nil {
  1340. ss.binlog.Log(&binarylog.ServerTrailer{
  1341. Trailer: ss.s.Trailer(),
  1342. Err: appErr,
  1343. })
  1344. }
  1345. // TODO: Should we log an error from WriteStatus here and below?
  1346. return appErr
  1347. }
  1348. if trInfo != nil {
  1349. ss.mu.Lock()
  1350. ss.trInfo.tr.LazyLog(stringer("OK"), false)
  1351. ss.mu.Unlock()
  1352. }
  1353. err = t.WriteStatus(ss.s, statusOK)
  1354. if ss.binlog != nil {
  1355. ss.binlog.Log(&binarylog.ServerTrailer{
  1356. Trailer: ss.s.Trailer(),
  1357. Err: appErr,
  1358. })
  1359. }
  1360. return err
  1361. }
  1362. func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
  1363. sm := stream.Method()
  1364. if sm != "" && sm[0] == '/' {
  1365. sm = sm[1:]
  1366. }
  1367. pos := strings.LastIndex(sm, "/")
  1368. if pos == -1 {
  1369. if trInfo != nil {
  1370. trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
  1371. trInfo.tr.SetError()
  1372. }
  1373. errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
  1374. if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
  1375. if trInfo != nil {
  1376. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1377. trInfo.tr.SetError()
  1378. }
  1379. channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
  1380. }
  1381. if trInfo != nil {
  1382. trInfo.tr.Finish()
  1383. }
  1384. return
  1385. }
  1386. service := sm[:pos]
  1387. method := sm[pos+1:]
  1388. srv, knownService := s.services[service]
  1389. if knownService {
  1390. if md, ok := srv.methods[method]; ok {
  1391. s.processUnaryRPC(t, stream, srv, md, trInfo)
  1392. return
  1393. }
  1394. if sd, ok := srv.streams[method]; ok {
  1395. s.processStreamingRPC(t, stream, srv, sd, trInfo)
  1396. return
  1397. }
  1398. }
  1399. // Unknown service, or known server unknown method.
  1400. if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
  1401. s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
  1402. return
  1403. }
  1404. var errDesc string
  1405. if !knownService {
  1406. errDesc = fmt.Sprintf("unknown service %v", service)
  1407. } else {
  1408. errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
  1409. }
  1410. if trInfo != nil {
  1411. trInfo.tr.LazyPrintf("%s", errDesc)
  1412. trInfo.tr.SetError()
  1413. }
  1414. if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
  1415. if trInfo != nil {
  1416. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1417. trInfo.tr.SetError()
  1418. }
  1419. channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
  1420. }
  1421. if trInfo != nil {
  1422. trInfo.tr.Finish()
  1423. }
  1424. }
  1425. // The key to save ServerTransportStream in the context.
  1426. type streamKey struct{}
  1427. // NewContextWithServerTransportStream creates a new context from ctx and
  1428. // attaches stream to it.
  1429. //
  1430. // This API is EXPERIMENTAL.
  1431. func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
  1432. return context.WithValue(ctx, streamKey{}, stream)
  1433. }
  1434. // ServerTransportStream is a minimal interface that a transport stream must
  1435. // implement. This can be used to mock an actual transport stream for tests of
  1436. // handler code that use, for example, grpc.SetHeader (which requires some
  1437. // stream to be in context).
  1438. //
  1439. // See also NewContextWithServerTransportStream.
  1440. //
  1441. // This API is EXPERIMENTAL.
  1442. type ServerTransportStream interface {
  1443. Method() string
  1444. SetHeader(md metadata.MD) error
  1445. SendHeader(md metadata.MD) error
  1446. SetTrailer(md metadata.MD) error
  1447. }
  1448. // ServerTransportStreamFromContext returns the ServerTransportStream saved in
  1449. // ctx. Returns nil if the given context has no stream associated with it
  1450. // (which implies it is not an RPC invocation context).
  1451. //
  1452. // This API is EXPERIMENTAL.
  1453. func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
  1454. s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
  1455. return s
  1456. }
  1457. // Stop stops the gRPC server. It immediately closes all open
  1458. // connections and listeners.
  1459. // It cancels all active RPCs on the server side and the corresponding
  1460. // pending RPCs on the client side will get notified by connection
  1461. // errors.
  1462. func (s *Server) Stop() {
  1463. s.quit.Fire()
  1464. defer func() {
  1465. s.serveWG.Wait()
  1466. s.done.Fire()
  1467. }()
  1468. s.channelzRemoveOnce.Do(func() {
  1469. if channelz.IsOn() {
  1470. channelz.RemoveEntry(s.channelzID)
  1471. }
  1472. })
  1473. s.mu.Lock()
  1474. listeners := s.lis
  1475. s.lis = nil
  1476. st := s.conns
  1477. s.conns = nil
  1478. // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
  1479. s.cv.Broadcast()
  1480. s.mu.Unlock()
  1481. for lis := range listeners {
  1482. lis.Close()
  1483. }
  1484. for c := range st {
  1485. c.Close()
  1486. }
  1487. if s.opts.numServerWorkers > 0 {
  1488. s.stopServerWorkers()
  1489. }
  1490. s.mu.Lock()
  1491. if s.events != nil {
  1492. s.events.Finish()
  1493. s.events = nil
  1494. }
  1495. s.mu.Unlock()
  1496. }
  1497. // GracefulStop stops the gRPC server gracefully. It stops the server from
  1498. // accepting new connections and RPCs and blocks until all the pending RPCs are
  1499. // finished.
  1500. func (s *Server) GracefulStop() {
  1501. s.quit.Fire()
  1502. defer s.done.Fire()
  1503. s.channelzRemoveOnce.Do(func() {
  1504. if channelz.IsOn() {
  1505. channelz.RemoveEntry(s.channelzID)
  1506. }
  1507. })
  1508. s.mu.Lock()
  1509. if s.conns == nil {
  1510. s.mu.Unlock()
  1511. return
  1512. }
  1513. for lis := range s.lis {
  1514. lis.Close()
  1515. }
  1516. s.lis = nil
  1517. if !s.drain {
  1518. for st := range s.conns {
  1519. st.Drain()
  1520. }
  1521. s.drain = true
  1522. }
  1523. // Wait for serving threads to be ready to exit. Only then can we be sure no
  1524. // new conns will be created.
  1525. s.mu.Unlock()
  1526. s.serveWG.Wait()
  1527. s.mu.Lock()
  1528. for len(s.conns) != 0 {
  1529. s.cv.Wait()
  1530. }
  1531. s.conns = nil
  1532. if s.events != nil {
  1533. s.events.Finish()
  1534. s.events = nil
  1535. }
  1536. s.mu.Unlock()
  1537. }
  1538. // contentSubtype must be lowercase
  1539. // cannot return nil
  1540. func (s *Server) getCodec(contentSubtype string) baseCodec {
  1541. if s.opts.codec != nil {
  1542. return s.opts.codec
  1543. }
  1544. if contentSubtype == "" {
  1545. return encoding.GetCodec(proto.Name)
  1546. }
  1547. codec := encoding.GetCodec(contentSubtype)
  1548. if codec == nil {
  1549. return encoding.GetCodec(proto.Name)
  1550. }
  1551. return codec
  1552. }
  1553. // SetHeader sets the header metadata.
  1554. // When called multiple times, all the provided metadata will be merged.
  1555. // All the metadata will be sent out when one of the following happens:
  1556. // - grpc.SendHeader() is called;
  1557. // - The first response is sent out;
  1558. // - An RPC status is sent out (error or success).
  1559. func SetHeader(ctx context.Context, md metadata.MD) error {
  1560. if md.Len() == 0 {
  1561. return nil
  1562. }
  1563. stream := ServerTransportStreamFromContext(ctx)
  1564. if stream == nil {
  1565. return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1566. }
  1567. return stream.SetHeader(md)
  1568. }
  1569. // SendHeader sends header metadata. It may be called at most once.
  1570. // The provided md and headers set by SetHeader() will be sent.
  1571. func SendHeader(ctx context.Context, md metadata.MD) error {
  1572. stream := ServerTransportStreamFromContext(ctx)
  1573. if stream == nil {
  1574. return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1575. }
  1576. if err := stream.SendHeader(md); err != nil {
  1577. return toRPCErr(err)
  1578. }
  1579. return nil
  1580. }
  1581. // SetTrailer sets the trailer metadata that will be sent when an RPC returns.
  1582. // When called more than once, all the provided metadata will be merged.
  1583. func SetTrailer(ctx context.Context, md metadata.MD) error {
  1584. if md.Len() == 0 {
  1585. return nil
  1586. }
  1587. stream := ServerTransportStreamFromContext(ctx)
  1588. if stream == nil {
  1589. return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1590. }
  1591. return stream.SetTrailer(md)
  1592. }
  1593. // Method returns the method string for the server context. The returned
  1594. // string is in the format of "/service/method".
  1595. func Method(ctx context.Context) (string, bool) {
  1596. s := ServerTransportStreamFromContext(ctx)
  1597. if s == nil {
  1598. return "", false
  1599. }
  1600. return s.Method(), true
  1601. }
  1602. type channelzServer struct {
  1603. s *Server
  1604. }
  1605. func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
  1606. return c.s.channelzMetric()
  1607. }