notify.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859
  1. package pq
  2. // Package pq is a pure Go Postgres driver for the database/sql package.
  3. // This module contains support for Postgres LISTEN/NOTIFY.
  4. import (
  5. "context"
  6. "database/sql/driver"
  7. "errors"
  8. "fmt"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. )
  13. // Notification represents a single notification from the database.
  14. type Notification struct {
  15. // Process ID (PID) of the notifying postgres backend.
  16. BePid int
  17. // Name of the channel the notification was sent on.
  18. Channel string
  19. // Payload, or the empty string if unspecified.
  20. Extra string
  21. }
  22. func recvNotification(r *readBuf) *Notification {
  23. bePid := r.int32()
  24. channel := r.string()
  25. extra := r.string()
  26. return &Notification{bePid, channel, extra}
  27. }
  28. // SetNotificationHandler sets the given notification handler on the given
  29. // connection. A runtime panic occurs if c is not a pq connection. A nil handler
  30. // may be used to unset it.
  31. //
  32. // Note: Notification handlers are executed synchronously by pq meaning commands
  33. // won't continue to be processed until the handler returns.
  34. func SetNotificationHandler(c driver.Conn, handler func(*Notification)) {
  35. c.(*conn).notificationHandler = handler
  36. }
  37. // NotificationHandlerConnector wraps a regular connector and sets a notification handler
  38. // on it.
  39. type NotificationHandlerConnector struct {
  40. driver.Connector
  41. notificationHandler func(*Notification)
  42. }
  43. // Connect calls the underlying connector's connect method and then sets the
  44. // notification handler.
  45. func (n *NotificationHandlerConnector) Connect(ctx context.Context) (driver.Conn, error) {
  46. c, err := n.Connector.Connect(ctx)
  47. if err == nil {
  48. SetNotificationHandler(c, n.notificationHandler)
  49. }
  50. return c, err
  51. }
  52. // ConnectorNotificationHandler returns the currently set notification handler, if any. If
  53. // the given connector is not a result of ConnectorWithNotificationHandler, nil is
  54. // returned.
  55. func ConnectorNotificationHandler(c driver.Connector) func(*Notification) {
  56. if c, ok := c.(*NotificationHandlerConnector); ok {
  57. return c.notificationHandler
  58. }
  59. return nil
  60. }
  61. // ConnectorWithNotificationHandler creates or sets the given handler for the given
  62. // connector. If the given connector is a result of calling this function
  63. // previously, it is simply set on the given connector and returned. Otherwise,
  64. // this returns a new connector wrapping the given one and setting the notification
  65. // handler. A nil notification handler may be used to unset it.
  66. //
  67. // The returned connector is intended to be used with database/sql.OpenDB.
  68. //
  69. // Note: Notification handlers are executed synchronously by pq meaning commands
  70. // won't continue to be processed until the handler returns.
  71. func ConnectorWithNotificationHandler(c driver.Connector, handler func(*Notification)) *NotificationHandlerConnector {
  72. if c, ok := c.(*NotificationHandlerConnector); ok {
  73. c.notificationHandler = handler
  74. return c
  75. }
  76. return &NotificationHandlerConnector{Connector: c, notificationHandler: handler}
  77. }
  78. const (
  79. connStateIdle int32 = iota
  80. connStateExpectResponse
  81. connStateExpectReadyForQuery
  82. )
  83. type message struct {
  84. typ byte
  85. err error
  86. }
  87. var errListenerConnClosed = errors.New("pq: ListenerConn has been closed")
  88. // ListenerConn is a low-level interface for waiting for notifications. You
  89. // should use Listener instead.
  90. type ListenerConn struct {
  91. // guards cn and err
  92. connectionLock sync.Mutex
  93. cn *conn
  94. err error
  95. connState int32
  96. // the sending goroutine will be holding this lock
  97. senderLock sync.Mutex
  98. notificationChan chan<- *Notification
  99. replyChan chan message
  100. }
  101. // NewListenerConn creates a new ListenerConn. Use NewListener instead.
  102. func NewListenerConn(name string, notificationChan chan<- *Notification) (*ListenerConn, error) {
  103. return newDialListenerConn(defaultDialer{}, name, notificationChan)
  104. }
  105. func newDialListenerConn(d Dialer, name string, c chan<- *Notification) (*ListenerConn, error) {
  106. cn, err := DialOpen(d, name)
  107. if err != nil {
  108. return nil, err
  109. }
  110. l := &ListenerConn{
  111. cn: cn.(*conn),
  112. notificationChan: c,
  113. connState: connStateIdle,
  114. replyChan: make(chan message, 2),
  115. }
  116. go l.listenerConnMain()
  117. return l, nil
  118. }
  119. // We can only allow one goroutine at a time to be running a query on the
  120. // connection for various reasons, so the goroutine sending on the connection
  121. // must be holding senderLock.
  122. //
  123. // Returns an error if an unrecoverable error has occurred and the ListenerConn
  124. // should be abandoned.
  125. func (l *ListenerConn) acquireSenderLock() error {
  126. // we must acquire senderLock first to avoid deadlocks; see ExecSimpleQuery
  127. l.senderLock.Lock()
  128. l.connectionLock.Lock()
  129. err := l.err
  130. l.connectionLock.Unlock()
  131. if err != nil {
  132. l.senderLock.Unlock()
  133. return err
  134. }
  135. return nil
  136. }
  137. func (l *ListenerConn) releaseSenderLock() {
  138. l.senderLock.Unlock()
  139. }
  140. // setState advances the protocol state to newState. Returns false if moving
  141. // to that state from the current state is not allowed.
  142. func (l *ListenerConn) setState(newState int32) bool {
  143. var expectedState int32
  144. switch newState {
  145. case connStateIdle:
  146. expectedState = connStateExpectReadyForQuery
  147. case connStateExpectResponse:
  148. expectedState = connStateIdle
  149. case connStateExpectReadyForQuery:
  150. expectedState = connStateExpectResponse
  151. default:
  152. panic(fmt.Sprintf("unexpected listenerConnState %d", newState))
  153. }
  154. return atomic.CompareAndSwapInt32(&l.connState, expectedState, newState)
  155. }
  156. // Main logic is here: receive messages from the postgres backend, forward
  157. // notifications and query replies and keep the internal state in sync with the
  158. // protocol state. Returns when the connection has been lost, is about to go
  159. // away or should be discarded because we couldn't agree on the state with the
  160. // server backend.
  161. func (l *ListenerConn) listenerConnLoop() (err error) {
  162. defer errRecoverNoErrBadConn(&err)
  163. r := &readBuf{}
  164. for {
  165. t, err := l.cn.recvMessage(r)
  166. if err != nil {
  167. return err
  168. }
  169. switch t {
  170. case 'A':
  171. // recvNotification copies all the data so we don't need to worry
  172. // about the scratch buffer being overwritten.
  173. l.notificationChan <- recvNotification(r)
  174. case 'T', 'D':
  175. // only used by tests; ignore
  176. case 'E':
  177. // We might receive an ErrorResponse even when not in a query; it
  178. // is expected that the server will close the connection after
  179. // that, but we should make sure that the error we display is the
  180. // one from the stray ErrorResponse, not io.ErrUnexpectedEOF.
  181. if !l.setState(connStateExpectReadyForQuery) {
  182. return parseError(r)
  183. }
  184. l.replyChan <- message{t, parseError(r)}
  185. case 'C', 'I':
  186. if !l.setState(connStateExpectReadyForQuery) {
  187. // protocol out of sync
  188. return fmt.Errorf("unexpected CommandComplete")
  189. }
  190. // ExecSimpleQuery doesn't need to know about this message
  191. case 'Z':
  192. if !l.setState(connStateIdle) {
  193. // protocol out of sync
  194. return fmt.Errorf("unexpected ReadyForQuery")
  195. }
  196. l.replyChan <- message{t, nil}
  197. case 'S':
  198. // ignore
  199. case 'N':
  200. if n := l.cn.noticeHandler; n != nil {
  201. n(parseError(r))
  202. }
  203. default:
  204. return fmt.Errorf("unexpected message %q from server in listenerConnLoop", t)
  205. }
  206. }
  207. }
  208. // This is the main routine for the goroutine receiving on the database
  209. // connection. Most of the main logic is in listenerConnLoop.
  210. func (l *ListenerConn) listenerConnMain() {
  211. err := l.listenerConnLoop()
  212. // listenerConnLoop terminated; we're done, but we still have to clean up.
  213. // Make sure nobody tries to start any new queries by making sure the err
  214. // pointer is set. It is important that we do not overwrite its value; a
  215. // connection could be closed by either this goroutine or one sending on
  216. // the connection -- whoever closes the connection is assumed to have the
  217. // more meaningful error message (as the other one will probably get
  218. // net.errClosed), so that goroutine sets the error we expose while the
  219. // other error is discarded. If the connection is lost while two
  220. // goroutines are operating on the socket, it probably doesn't matter which
  221. // error we expose so we don't try to do anything more complex.
  222. l.connectionLock.Lock()
  223. if l.err == nil {
  224. l.err = err
  225. }
  226. l.cn.Close()
  227. l.connectionLock.Unlock()
  228. // There might be a query in-flight; make sure nobody's waiting for a
  229. // response to it, since there's not going to be one.
  230. close(l.replyChan)
  231. // let the listener know we're done
  232. close(l.notificationChan)
  233. // this ListenerConn is done
  234. }
  235. // Listen sends a LISTEN query to the server. See ExecSimpleQuery.
  236. func (l *ListenerConn) Listen(channel string) (bool, error) {
  237. return l.ExecSimpleQuery("LISTEN " + QuoteIdentifier(channel))
  238. }
  239. // Unlisten sends an UNLISTEN query to the server. See ExecSimpleQuery.
  240. func (l *ListenerConn) Unlisten(channel string) (bool, error) {
  241. return l.ExecSimpleQuery("UNLISTEN " + QuoteIdentifier(channel))
  242. }
  243. // UnlistenAll sends an `UNLISTEN *` query to the server. See ExecSimpleQuery.
  244. func (l *ListenerConn) UnlistenAll() (bool, error) {
  245. return l.ExecSimpleQuery("UNLISTEN *")
  246. }
  247. // Ping the remote server to make sure it's alive. Non-nil error means the
  248. // connection has failed and should be abandoned.
  249. func (l *ListenerConn) Ping() error {
  250. sent, err := l.ExecSimpleQuery("")
  251. if !sent {
  252. return err
  253. }
  254. if err != nil {
  255. // shouldn't happen
  256. panic(err)
  257. }
  258. return nil
  259. }
  260. // Attempt to send a query on the connection. Returns an error if sending the
  261. // query failed, and the caller should initiate closure of this connection.
  262. // The caller must be holding senderLock (see acquireSenderLock and
  263. // releaseSenderLock).
  264. func (l *ListenerConn) sendSimpleQuery(q string) (err error) {
  265. defer errRecoverNoErrBadConn(&err)
  266. // must set connection state before sending the query
  267. if !l.setState(connStateExpectResponse) {
  268. panic("two queries running at the same time")
  269. }
  270. // Can't use l.cn.writeBuf here because it uses the scratch buffer which
  271. // might get overwritten by listenerConnLoop.
  272. b := &writeBuf{
  273. buf: []byte("Q\x00\x00\x00\x00"),
  274. pos: 1,
  275. }
  276. b.string(q)
  277. l.cn.send(b)
  278. return nil
  279. }
  280. // ExecSimpleQuery executes a "simple query" (i.e. one with no bindable
  281. // parameters) on the connection. The possible return values are:
  282. // 1) "executed" is true; the query was executed to completion on the
  283. // database server. If the query failed, err will be set to the error
  284. // returned by the database, otherwise err will be nil.
  285. // 2) If "executed" is false, the query could not be executed on the remote
  286. // server. err will be non-nil.
  287. //
  288. // After a call to ExecSimpleQuery has returned an executed=false value, the
  289. // connection has either been closed or will be closed shortly thereafter, and
  290. // all subsequently executed queries will return an error.
  291. func (l *ListenerConn) ExecSimpleQuery(q string) (executed bool, err error) {
  292. if err = l.acquireSenderLock(); err != nil {
  293. return false, err
  294. }
  295. defer l.releaseSenderLock()
  296. err = l.sendSimpleQuery(q)
  297. if err != nil {
  298. // We can't know what state the protocol is in, so we need to abandon
  299. // this connection.
  300. l.connectionLock.Lock()
  301. // Set the error pointer if it hasn't been set already; see
  302. // listenerConnMain.
  303. if l.err == nil {
  304. l.err = err
  305. }
  306. l.connectionLock.Unlock()
  307. l.cn.c.Close()
  308. return false, err
  309. }
  310. // now we just wait for a reply..
  311. for {
  312. m, ok := <-l.replyChan
  313. if !ok {
  314. // We lost the connection to server, don't bother waiting for a
  315. // a response. err should have been set already.
  316. l.connectionLock.Lock()
  317. err := l.err
  318. l.connectionLock.Unlock()
  319. return false, err
  320. }
  321. switch m.typ {
  322. case 'Z':
  323. // sanity check
  324. if m.err != nil {
  325. panic("m.err != nil")
  326. }
  327. // done; err might or might not be set
  328. return true, err
  329. case 'E':
  330. // sanity check
  331. if m.err == nil {
  332. panic("m.err == nil")
  333. }
  334. // server responded with an error; ReadyForQuery to follow
  335. err = m.err
  336. default:
  337. return false, fmt.Errorf("unknown response for simple query: %q", m.typ)
  338. }
  339. }
  340. }
  341. // Close closes the connection.
  342. func (l *ListenerConn) Close() error {
  343. l.connectionLock.Lock()
  344. if l.err != nil {
  345. l.connectionLock.Unlock()
  346. return errListenerConnClosed
  347. }
  348. l.err = errListenerConnClosed
  349. l.connectionLock.Unlock()
  350. // We can't send anything on the connection without holding senderLock.
  351. // Simply close the net.Conn to wake up everyone operating on it.
  352. return l.cn.c.Close()
  353. }
  354. // Err returns the reason the connection was closed. It is not safe to call
  355. // this function until l.Notify has been closed.
  356. func (l *ListenerConn) Err() error {
  357. return l.err
  358. }
  359. var errListenerClosed = errors.New("pq: Listener has been closed")
  360. // ErrChannelAlreadyOpen is returned from Listen when a channel is already
  361. // open.
  362. var ErrChannelAlreadyOpen = errors.New("pq: channel is already open")
  363. // ErrChannelNotOpen is returned from Unlisten when a channel is not open.
  364. var ErrChannelNotOpen = errors.New("pq: channel is not open")
  365. // ListenerEventType is an enumeration of listener event types.
  366. type ListenerEventType int
  367. const (
  368. // ListenerEventConnected is emitted only when the database connection
  369. // has been initially initialized. The err argument of the callback
  370. // will always be nil.
  371. ListenerEventConnected ListenerEventType = iota
  372. // ListenerEventDisconnected is emitted after a database connection has
  373. // been lost, either because of an error or because Close has been
  374. // called. The err argument will be set to the reason the database
  375. // connection was lost.
  376. ListenerEventDisconnected
  377. // ListenerEventReconnected is emitted after a database connection has
  378. // been re-established after connection loss. The err argument of the
  379. // callback will always be nil. After this event has been emitted, a
  380. // nil pq.Notification is sent on the Listener.Notify channel.
  381. ListenerEventReconnected
  382. // ListenerEventConnectionAttemptFailed is emitted after a connection
  383. // to the database was attempted, but failed. The err argument will be
  384. // set to an error describing why the connection attempt did not
  385. // succeed.
  386. ListenerEventConnectionAttemptFailed
  387. )
  388. // EventCallbackType is the event callback type. See also ListenerEventType
  389. // constants' documentation.
  390. type EventCallbackType func(event ListenerEventType, err error)
  391. // Listener provides an interface for listening to notifications from a
  392. // PostgreSQL database. For general usage information, see section
  393. // "Notifications".
  394. //
  395. // Listener can safely be used from concurrently running goroutines.
  396. type Listener struct {
  397. // Channel for receiving notifications from the database. In some cases a
  398. // nil value will be sent. See section "Notifications" above.
  399. Notify chan *Notification
  400. name string
  401. minReconnectInterval time.Duration
  402. maxReconnectInterval time.Duration
  403. dialer Dialer
  404. eventCallback EventCallbackType
  405. lock sync.Mutex
  406. isClosed bool
  407. reconnectCond *sync.Cond
  408. cn *ListenerConn
  409. connNotificationChan <-chan *Notification
  410. channels map[string]struct{}
  411. }
  412. // NewListener creates a new database connection dedicated to LISTEN / NOTIFY.
  413. //
  414. // name should be set to a connection string to be used to establish the
  415. // database connection (see section "Connection String Parameters" above).
  416. //
  417. // minReconnectInterval controls the duration to wait before trying to
  418. // re-establish the database connection after connection loss. After each
  419. // consecutive failure this interval is doubled, until maxReconnectInterval is
  420. // reached. Successfully completing the connection establishment procedure
  421. // resets the interval back to minReconnectInterval.
  422. //
  423. // The last parameter eventCallback can be set to a function which will be
  424. // called by the Listener when the state of the underlying database connection
  425. // changes. This callback will be called by the goroutine which dispatches the
  426. // notifications over the Notify channel, so you should try to avoid doing
  427. // potentially time-consuming operations from the callback.
  428. func NewListener(name string,
  429. minReconnectInterval time.Duration,
  430. maxReconnectInterval time.Duration,
  431. eventCallback EventCallbackType) *Listener {
  432. return NewDialListener(defaultDialer{}, name, minReconnectInterval, maxReconnectInterval, eventCallback)
  433. }
  434. // NewDialListener is like NewListener but it takes a Dialer.
  435. func NewDialListener(d Dialer,
  436. name string,
  437. minReconnectInterval time.Duration,
  438. maxReconnectInterval time.Duration,
  439. eventCallback EventCallbackType) *Listener {
  440. l := &Listener{
  441. name: name,
  442. minReconnectInterval: minReconnectInterval,
  443. maxReconnectInterval: maxReconnectInterval,
  444. dialer: d,
  445. eventCallback: eventCallback,
  446. channels: make(map[string]struct{}),
  447. Notify: make(chan *Notification, 32),
  448. }
  449. l.reconnectCond = sync.NewCond(&l.lock)
  450. go l.listenerMain()
  451. return l
  452. }
  453. // NotificationChannel returns the notification channel for this listener.
  454. // This is the same channel as Notify, and will not be recreated during the
  455. // life time of the Listener.
  456. func (l *Listener) NotificationChannel() <-chan *Notification {
  457. return l.Notify
  458. }
  459. // Listen starts listening for notifications on a channel. Calls to this
  460. // function will block until an acknowledgement has been received from the
  461. // server. Note that Listener automatically re-establishes the connection
  462. // after connection loss, so this function may block indefinitely if the
  463. // connection can not be re-established.
  464. //
  465. // Listen will only fail in three conditions:
  466. // 1) The channel is already open. The returned error will be
  467. // ErrChannelAlreadyOpen.
  468. // 2) The query was executed on the remote server, but PostgreSQL returned an
  469. // error message in response to the query. The returned error will be a
  470. // pq.Error containing the information the server supplied.
  471. // 3) Close is called on the Listener before the request could be completed.
  472. //
  473. // The channel name is case-sensitive.
  474. func (l *Listener) Listen(channel string) error {
  475. l.lock.Lock()
  476. defer l.lock.Unlock()
  477. if l.isClosed {
  478. return errListenerClosed
  479. }
  480. // The server allows you to issue a LISTEN on a channel which is already
  481. // open, but it seems useful to be able to detect this case to spot for
  482. // mistakes in application logic. If the application genuinely does't
  483. // care, it can check the exported error and ignore it.
  484. _, exists := l.channels[channel]
  485. if exists {
  486. return ErrChannelAlreadyOpen
  487. }
  488. if l.cn != nil {
  489. // If gotResponse is true but error is set, the query was executed on
  490. // the remote server, but resulted in an error. This should be
  491. // relatively rare, so it's fine if we just pass the error to our
  492. // caller. However, if gotResponse is false, we could not complete the
  493. // query on the remote server and our underlying connection is about
  494. // to go away, so we only add relname to l.channels, and wait for
  495. // resync() to take care of the rest.
  496. gotResponse, err := l.cn.Listen(channel)
  497. if gotResponse && err != nil {
  498. return err
  499. }
  500. }
  501. l.channels[channel] = struct{}{}
  502. for l.cn == nil {
  503. l.reconnectCond.Wait()
  504. // we let go of the mutex for a while
  505. if l.isClosed {
  506. return errListenerClosed
  507. }
  508. }
  509. return nil
  510. }
  511. // Unlisten removes a channel from the Listener's channel list. Returns
  512. // ErrChannelNotOpen if the Listener is not listening on the specified channel.
  513. // Returns immediately with no error if there is no connection. Note that you
  514. // might still get notifications for this channel even after Unlisten has
  515. // returned.
  516. //
  517. // The channel name is case-sensitive.
  518. func (l *Listener) Unlisten(channel string) error {
  519. l.lock.Lock()
  520. defer l.lock.Unlock()
  521. if l.isClosed {
  522. return errListenerClosed
  523. }
  524. // Similarly to LISTEN, this is not an error in Postgres, but it seems
  525. // useful to distinguish from the normal conditions.
  526. _, exists := l.channels[channel]
  527. if !exists {
  528. return ErrChannelNotOpen
  529. }
  530. if l.cn != nil {
  531. // Similarly to Listen (see comment in that function), the caller
  532. // should only be bothered with an error if it came from the backend as
  533. // a response to our query.
  534. gotResponse, err := l.cn.Unlisten(channel)
  535. if gotResponse && err != nil {
  536. return err
  537. }
  538. }
  539. // Don't bother waiting for resync if there's no connection.
  540. delete(l.channels, channel)
  541. return nil
  542. }
  543. // UnlistenAll removes all channels from the Listener's channel list. Returns
  544. // immediately with no error if there is no connection. Note that you might
  545. // still get notifications for any of the deleted channels even after
  546. // UnlistenAll has returned.
  547. func (l *Listener) UnlistenAll() error {
  548. l.lock.Lock()
  549. defer l.lock.Unlock()
  550. if l.isClosed {
  551. return errListenerClosed
  552. }
  553. if l.cn != nil {
  554. // Similarly to Listen (see comment in that function), the caller
  555. // should only be bothered with an error if it came from the backend as
  556. // a response to our query.
  557. gotResponse, err := l.cn.UnlistenAll()
  558. if gotResponse && err != nil {
  559. return err
  560. }
  561. }
  562. // Don't bother waiting for resync if there's no connection.
  563. l.channels = make(map[string]struct{})
  564. return nil
  565. }
  566. // Ping the remote server to make sure it's alive. Non-nil return value means
  567. // that there is no active connection.
  568. func (l *Listener) Ping() error {
  569. l.lock.Lock()
  570. defer l.lock.Unlock()
  571. if l.isClosed {
  572. return errListenerClosed
  573. }
  574. if l.cn == nil {
  575. return errors.New("no connection")
  576. }
  577. return l.cn.Ping()
  578. }
  579. // Clean up after losing the server connection. Returns l.cn.Err(), which
  580. // should have the reason the connection was lost.
  581. func (l *Listener) disconnectCleanup() error {
  582. l.lock.Lock()
  583. defer l.lock.Unlock()
  584. // sanity check; can't look at Err() until the channel has been closed
  585. select {
  586. case _, ok := <-l.connNotificationChan:
  587. if ok {
  588. panic("connNotificationChan not closed")
  589. }
  590. default:
  591. panic("connNotificationChan not closed")
  592. }
  593. err := l.cn.Err()
  594. l.cn.Close()
  595. l.cn = nil
  596. return err
  597. }
  598. // Synchronize the list of channels we want to be listening on with the server
  599. // after the connection has been established.
  600. func (l *Listener) resync(cn *ListenerConn, notificationChan <-chan *Notification) error {
  601. doneChan := make(chan error)
  602. go func(notificationChan <-chan *Notification) {
  603. for channel := range l.channels {
  604. // If we got a response, return that error to our caller as it's
  605. // going to be more descriptive than cn.Err().
  606. gotResponse, err := cn.Listen(channel)
  607. if gotResponse && err != nil {
  608. doneChan <- err
  609. return
  610. }
  611. // If we couldn't reach the server, wait for notificationChan to
  612. // close and then return the error message from the connection, as
  613. // per ListenerConn's interface.
  614. if err != nil {
  615. for range notificationChan {
  616. }
  617. doneChan <- cn.Err()
  618. return
  619. }
  620. }
  621. doneChan <- nil
  622. }(notificationChan)
  623. // Ignore notifications while synchronization is going on to avoid
  624. // deadlocks. We have to send a nil notification over Notify anyway as
  625. // we can't possibly know which notifications (if any) were lost while
  626. // the connection was down, so there's no reason to try and process
  627. // these messages at all.
  628. for {
  629. select {
  630. case _, ok := <-notificationChan:
  631. if !ok {
  632. notificationChan = nil
  633. }
  634. case err := <-doneChan:
  635. return err
  636. }
  637. }
  638. }
  639. // caller should NOT be holding l.lock
  640. func (l *Listener) closed() bool {
  641. l.lock.Lock()
  642. defer l.lock.Unlock()
  643. return l.isClosed
  644. }
  645. func (l *Listener) connect() error {
  646. notificationChan := make(chan *Notification, 32)
  647. cn, err := newDialListenerConn(l.dialer, l.name, notificationChan)
  648. if err != nil {
  649. return err
  650. }
  651. l.lock.Lock()
  652. defer l.lock.Unlock()
  653. err = l.resync(cn, notificationChan)
  654. if err != nil {
  655. cn.Close()
  656. return err
  657. }
  658. l.cn = cn
  659. l.connNotificationChan = notificationChan
  660. l.reconnectCond.Broadcast()
  661. return nil
  662. }
  663. // Close disconnects the Listener from the database and shuts it down.
  664. // Subsequent calls to its methods will return an error. Close returns an
  665. // error if the connection has already been closed.
  666. func (l *Listener) Close() error {
  667. l.lock.Lock()
  668. defer l.lock.Unlock()
  669. if l.isClosed {
  670. return errListenerClosed
  671. }
  672. if l.cn != nil {
  673. l.cn.Close()
  674. }
  675. l.isClosed = true
  676. // Unblock calls to Listen()
  677. l.reconnectCond.Broadcast()
  678. return nil
  679. }
  680. func (l *Listener) emitEvent(event ListenerEventType, err error) {
  681. if l.eventCallback != nil {
  682. l.eventCallback(event, err)
  683. }
  684. }
  685. // Main logic here: maintain a connection to the server when possible, wait
  686. // for notifications and emit events.
  687. func (l *Listener) listenerConnLoop() {
  688. var nextReconnect time.Time
  689. reconnectInterval := l.minReconnectInterval
  690. for {
  691. for {
  692. err := l.connect()
  693. if err == nil {
  694. break
  695. }
  696. if l.closed() {
  697. return
  698. }
  699. l.emitEvent(ListenerEventConnectionAttemptFailed, err)
  700. time.Sleep(reconnectInterval)
  701. reconnectInterval *= 2
  702. if reconnectInterval > l.maxReconnectInterval {
  703. reconnectInterval = l.maxReconnectInterval
  704. }
  705. }
  706. if nextReconnect.IsZero() {
  707. l.emitEvent(ListenerEventConnected, nil)
  708. } else {
  709. l.emitEvent(ListenerEventReconnected, nil)
  710. l.Notify <- nil
  711. }
  712. reconnectInterval = l.minReconnectInterval
  713. nextReconnect = time.Now().Add(reconnectInterval)
  714. for {
  715. notification, ok := <-l.connNotificationChan
  716. if !ok {
  717. // lost connection, loop again
  718. break
  719. }
  720. l.Notify <- notification
  721. }
  722. err := l.disconnectCleanup()
  723. if l.closed() {
  724. return
  725. }
  726. l.emitEvent(ListenerEventDisconnected, err)
  727. time.Sleep(time.Until(nextReconnect))
  728. }
  729. }
  730. func (l *Listener) listenerMain() {
  731. l.listenerConnLoop()
  732. close(l.Notify)
  733. }