notify.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783
  1. package pq
  2. // Package pq is a pure Go Postgres driver for the database/sql package.
  3. // This module contains support for Postgres LISTEN/NOTIFY.
  4. import (
  5. "errors"
  6. "fmt"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. )
  11. // Notification represents a single notification from the database.
  12. type Notification struct {
  13. // Process ID (PID) of the notifying postgres backend.
  14. BePid int
  15. // Name of the channel the notification was sent on.
  16. Channel string
  17. // Payload, or the empty string if unspecified.
  18. Extra string
  19. }
  20. func recvNotification(r *readBuf) *Notification {
  21. bePid := r.int32()
  22. channel := r.string()
  23. extra := r.string()
  24. return &Notification{bePid, channel, extra}
  25. }
  26. const (
  27. connStateIdle int32 = iota
  28. connStateExpectResponse
  29. connStateExpectReadyForQuery
  30. )
  31. type message struct {
  32. typ byte
  33. err error
  34. }
  35. var errListenerConnClosed = errors.New("pq: ListenerConn has been closed")
  36. // ListenerConn is a low-level interface for waiting for notifications. You
  37. // should use Listener instead.
  38. type ListenerConn struct {
  39. // guards cn and err
  40. connectionLock sync.Mutex
  41. cn *conn
  42. err error
  43. connState int32
  44. // the sending goroutine will be holding this lock
  45. senderLock sync.Mutex
  46. notificationChan chan<- *Notification
  47. replyChan chan message
  48. }
  49. // Creates a new ListenerConn. Use NewListener instead.
  50. func NewListenerConn(name string, notificationChan chan<- *Notification) (*ListenerConn, error) {
  51. return newDialListenerConn(defaultDialer{}, name, notificationChan)
  52. }
  53. func newDialListenerConn(d Dialer, name string, c chan<- *Notification) (*ListenerConn, error) {
  54. cn, err := DialOpen(d, name)
  55. if err != nil {
  56. return nil, err
  57. }
  58. l := &ListenerConn{
  59. cn: cn.(*conn),
  60. notificationChan: c,
  61. connState: connStateIdle,
  62. replyChan: make(chan message, 2),
  63. }
  64. go l.listenerConnMain()
  65. return l, nil
  66. }
  67. // We can only allow one goroutine at a time to be running a query on the
  68. // connection for various reasons, so the goroutine sending on the connection
  69. // must be holding senderLock.
  70. //
  71. // Returns an error if an unrecoverable error has occurred and the ListenerConn
  72. // should be abandoned.
  73. func (l *ListenerConn) acquireSenderLock() error {
  74. // we must acquire senderLock first to avoid deadlocks; see ExecSimpleQuery
  75. l.senderLock.Lock()
  76. l.connectionLock.Lock()
  77. err := l.err
  78. l.connectionLock.Unlock()
  79. if err != nil {
  80. l.senderLock.Unlock()
  81. return err
  82. }
  83. return nil
  84. }
  85. func (l *ListenerConn) releaseSenderLock() {
  86. l.senderLock.Unlock()
  87. }
  88. // setState advances the protocol state to newState. Returns false if moving
  89. // to that state from the current state is not allowed.
  90. func (l *ListenerConn) setState(newState int32) bool {
  91. var expectedState int32
  92. switch newState {
  93. case connStateIdle:
  94. expectedState = connStateExpectReadyForQuery
  95. case connStateExpectResponse:
  96. expectedState = connStateIdle
  97. case connStateExpectReadyForQuery:
  98. expectedState = connStateExpectResponse
  99. default:
  100. panic(fmt.Sprintf("unexpected listenerConnState %d", newState))
  101. }
  102. return atomic.CompareAndSwapInt32(&l.connState, expectedState, newState)
  103. }
  104. // Main logic is here: receive messages from the postgres backend, forward
  105. // notifications and query replies and keep the internal state in sync with the
  106. // protocol state. Returns when the connection has been lost, is about to go
  107. // away or should be discarded because we couldn't agree on the state with the
  108. // server backend.
  109. func (l *ListenerConn) listenerConnLoop() (err error) {
  110. defer errRecoverNoErrBadConn(&err)
  111. r := &readBuf{}
  112. for {
  113. t, err := l.cn.recvMessage(r)
  114. if err != nil {
  115. return err
  116. }
  117. switch t {
  118. case 'A':
  119. // recvNotification copies all the data so we don't need to worry
  120. // about the scratch buffer being overwritten.
  121. l.notificationChan <- recvNotification(r)
  122. case 'T', 'D':
  123. // only used by tests; ignore
  124. case 'E':
  125. // We might receive an ErrorResponse even when not in a query; it
  126. // is expected that the server will close the connection after
  127. // that, but we should make sure that the error we display is the
  128. // one from the stray ErrorResponse, not io.ErrUnexpectedEOF.
  129. if !l.setState(connStateExpectReadyForQuery) {
  130. return parseError(r)
  131. }
  132. l.replyChan <- message{t, parseError(r)}
  133. case 'C', 'I':
  134. if !l.setState(connStateExpectReadyForQuery) {
  135. // protocol out of sync
  136. return fmt.Errorf("unexpected CommandComplete")
  137. }
  138. // ExecSimpleQuery doesn't need to know about this message
  139. case 'Z':
  140. if !l.setState(connStateIdle) {
  141. // protocol out of sync
  142. return fmt.Errorf("unexpected ReadyForQuery")
  143. }
  144. l.replyChan <- message{t, nil}
  145. case 'N', 'S':
  146. // ignore
  147. default:
  148. return fmt.Errorf("unexpected message %q from server in listenerConnLoop", t)
  149. }
  150. }
  151. }
  152. // This is the main routine for the goroutine receiving on the database
  153. // connection. Most of the main logic is in listenerConnLoop.
  154. func (l *ListenerConn) listenerConnMain() {
  155. err := l.listenerConnLoop()
  156. // listenerConnLoop terminated; we're done, but we still have to clean up.
  157. // Make sure nobody tries to start any new queries by making sure the err
  158. // pointer is set. It is important that we do not overwrite its value; a
  159. // connection could be closed by either this goroutine or one sending on
  160. // the connection -- whoever closes the connection is assumed to have the
  161. // more meaningful error message (as the other one will probably get
  162. // net.errClosed), so that goroutine sets the error we expose while the
  163. // other error is discarded. If the connection is lost while two
  164. // goroutines are operating on the socket, it probably doesn't matter which
  165. // error we expose so we don't try to do anything more complex.
  166. l.connectionLock.Lock()
  167. if l.err == nil {
  168. l.err = err
  169. }
  170. l.cn.Close()
  171. l.connectionLock.Unlock()
  172. // There might be a query in-flight; make sure nobody's waiting for a
  173. // response to it, since there's not going to be one.
  174. close(l.replyChan)
  175. // let the listener know we're done
  176. close(l.notificationChan)
  177. // this ListenerConn is done
  178. }
  179. // Send a LISTEN query to the server. See ExecSimpleQuery.
  180. func (l *ListenerConn) Listen(channel string) (bool, error) {
  181. return l.ExecSimpleQuery("LISTEN " + QuoteIdentifier(channel))
  182. }
  183. // Send an UNLISTEN query to the server. See ExecSimpleQuery.
  184. func (l *ListenerConn) Unlisten(channel string) (bool, error) {
  185. return l.ExecSimpleQuery("UNLISTEN " + QuoteIdentifier(channel))
  186. }
  187. // Send `UNLISTEN *` to the server. See ExecSimpleQuery.
  188. func (l *ListenerConn) UnlistenAll() (bool, error) {
  189. return l.ExecSimpleQuery("UNLISTEN *")
  190. }
  191. // Ping the remote server to make sure it's alive. Non-nil error means the
  192. // connection has failed and should be abandoned.
  193. func (l *ListenerConn) Ping() error {
  194. sent, err := l.ExecSimpleQuery("")
  195. if !sent {
  196. return err
  197. }
  198. if err != nil {
  199. // shouldn't happen
  200. panic(err)
  201. }
  202. return nil
  203. }
  204. // Attempt to send a query on the connection. Returns an error if sending the
  205. // query failed, and the caller should initiate closure of this connection.
  206. // The caller must be holding senderLock (see acquireSenderLock and
  207. // releaseSenderLock).
  208. func (l *ListenerConn) sendSimpleQuery(q string) (err error) {
  209. defer errRecoverNoErrBadConn(&err)
  210. // must set connection state before sending the query
  211. if !l.setState(connStateExpectResponse) {
  212. panic("two queries running at the same time")
  213. }
  214. // Can't use l.cn.writeBuf here because it uses the scratch buffer which
  215. // might get overwritten by listenerConnLoop.
  216. b := &writeBuf{
  217. buf: []byte("Q\x00\x00\x00\x00"),
  218. pos: 1,
  219. }
  220. b.string(q)
  221. l.cn.send(b)
  222. return nil
  223. }
  224. // Execute a "simple query" (i.e. one with no bindable parameters) on the
  225. // connection. The possible return values are:
  226. // 1) "executed" is true; the query was executed to completion on the
  227. // database server. If the query failed, err will be set to the error
  228. // returned by the database, otherwise err will be nil.
  229. // 2) If "executed" is false, the query could not be executed on the remote
  230. // server. err will be non-nil.
  231. //
  232. // After a call to ExecSimpleQuery has returned an executed=false value, the
  233. // connection has either been closed or will be closed shortly thereafter, and
  234. // all subsequently executed queries will return an error.
  235. func (l *ListenerConn) ExecSimpleQuery(q string) (executed bool, err error) {
  236. if err = l.acquireSenderLock(); err != nil {
  237. return false, err
  238. }
  239. defer l.releaseSenderLock()
  240. err = l.sendSimpleQuery(q)
  241. if err != nil {
  242. // We can't know what state the protocol is in, so we need to abandon
  243. // this connection.
  244. l.connectionLock.Lock()
  245. // Set the error pointer if it hasn't been set already; see
  246. // listenerConnMain.
  247. if l.err == nil {
  248. l.err = err
  249. }
  250. l.connectionLock.Unlock()
  251. l.cn.c.Close()
  252. return false, err
  253. }
  254. // now we just wait for a reply..
  255. for {
  256. m, ok := <-l.replyChan
  257. if !ok {
  258. // We lost the connection to server, don't bother waiting for a
  259. // a response. err should have been set already.
  260. l.connectionLock.Lock()
  261. err := l.err
  262. l.connectionLock.Unlock()
  263. return false, err
  264. }
  265. switch m.typ {
  266. case 'Z':
  267. // sanity check
  268. if m.err != nil {
  269. panic("m.err != nil")
  270. }
  271. // done; err might or might not be set
  272. return true, err
  273. case 'E':
  274. // sanity check
  275. if m.err == nil {
  276. panic("m.err == nil")
  277. }
  278. // server responded with an error; ReadyForQuery to follow
  279. err = m.err
  280. default:
  281. return false, fmt.Errorf("unknown response for simple query: %q", m.typ)
  282. }
  283. }
  284. }
  285. func (l *ListenerConn) Close() error {
  286. l.connectionLock.Lock()
  287. if l.err != nil {
  288. l.connectionLock.Unlock()
  289. return errListenerConnClosed
  290. }
  291. l.err = errListenerConnClosed
  292. l.connectionLock.Unlock()
  293. // We can't send anything on the connection without holding senderLock.
  294. // Simply close the net.Conn to wake up everyone operating on it.
  295. return l.cn.c.Close()
  296. }
  297. // Err() returns the reason the connection was closed. It is not safe to call
  298. // this function until l.Notify has been closed.
  299. func (l *ListenerConn) Err() error {
  300. return l.err
  301. }
  302. var errListenerClosed = errors.New("pq: Listener has been closed")
  303. var ErrChannelAlreadyOpen = errors.New("pq: channel is already open")
  304. var ErrChannelNotOpen = errors.New("pq: channel is not open")
  305. type ListenerEventType int
  306. const (
  307. // Emitted only when the database connection has been initially
  308. // initialized. err will always be nil.
  309. ListenerEventConnected ListenerEventType = iota
  310. // Emitted after a database connection has been lost, either because of an
  311. // error or because Close has been called. err will be set to the reason
  312. // the database connection was lost.
  313. ListenerEventDisconnected
  314. // Emitted after a database connection has been re-established after
  315. // connection loss. err will always be nil. After this event has been
  316. // emitted, a nil pq.Notification is sent on the Listener.Notify channel.
  317. ListenerEventReconnected
  318. // Emitted after a connection to the database was attempted, but failed.
  319. // err will be set to an error describing why the connection attempt did
  320. // not succeed.
  321. ListenerEventConnectionAttemptFailed
  322. )
  323. type EventCallbackType func(event ListenerEventType, err error)
  324. // Listener provides an interface for listening to notifications from a
  325. // PostgreSQL database. For general usage information, see section
  326. // "Notifications".
  327. //
  328. // Listener can safely be used from concurrently running goroutines.
  329. type Listener struct {
  330. // Channel for receiving notifications from the database. In some cases a
  331. // nil value will be sent. See section "Notifications" above.
  332. Notify chan *Notification
  333. name string
  334. minReconnectInterval time.Duration
  335. maxReconnectInterval time.Duration
  336. dialer Dialer
  337. eventCallback EventCallbackType
  338. lock sync.Mutex
  339. isClosed bool
  340. reconnectCond *sync.Cond
  341. cn *ListenerConn
  342. connNotificationChan <-chan *Notification
  343. channels map[string]struct{}
  344. }
  345. // NewListener creates a new database connection dedicated to LISTEN / NOTIFY.
  346. //
  347. // name should be set to a connection string to be used to establish the
  348. // database connection (see section "Connection String Parameters" above).
  349. //
  350. // minReconnectInterval controls the duration to wait before trying to
  351. // re-establish the database connection after connection loss. After each
  352. // consecutive failure this interval is doubled, until maxReconnectInterval is
  353. // reached. Successfully completing the connection establishment procedure
  354. // resets the interval back to minReconnectInterval.
  355. //
  356. // The last parameter eventCallback can be set to a function which will be
  357. // called by the Listener when the state of the underlying database connection
  358. // changes. This callback will be called by the goroutine which dispatches the
  359. // notifications over the Notify channel, so you should try to avoid doing
  360. // potentially time-consuming operations from the callback.
  361. func NewListener(name string,
  362. minReconnectInterval time.Duration,
  363. maxReconnectInterval time.Duration,
  364. eventCallback EventCallbackType) *Listener {
  365. return NewDialListener(defaultDialer{}, name, minReconnectInterval, maxReconnectInterval, eventCallback)
  366. }
  367. // NewDialListener is like NewListener but it takes a Dialer.
  368. func NewDialListener(d Dialer,
  369. name string,
  370. minReconnectInterval time.Duration,
  371. maxReconnectInterval time.Duration,
  372. eventCallback EventCallbackType) *Listener {
  373. l := &Listener{
  374. name: name,
  375. minReconnectInterval: minReconnectInterval,
  376. maxReconnectInterval: maxReconnectInterval,
  377. dialer: d,
  378. eventCallback: eventCallback,
  379. channels: make(map[string]struct{}),
  380. Notify: make(chan *Notification, 32),
  381. }
  382. l.reconnectCond = sync.NewCond(&l.lock)
  383. go l.listenerMain()
  384. return l
  385. }
  386. // Returns the notification channel for this listener. This is the same
  387. // channel as Notify, and will not be recreated during the life time of the
  388. // Listener.
  389. func (l *Listener) NotificationChannel() <-chan *Notification {
  390. return l.Notify
  391. }
  392. // Listen starts listening for notifications on a channel. Calls to this
  393. // function will block until an acknowledgement has been received from the
  394. // server. Note that Listener automatically re-establishes the connection
  395. // after connection loss, so this function may block indefinitely if the
  396. // connection can not be re-established.
  397. //
  398. // Listen will only fail in three conditions:
  399. // 1) The channel is already open. The returned error will be
  400. // ErrChannelAlreadyOpen.
  401. // 2) The query was executed on the remote server, but PostgreSQL returned an
  402. // error message in response to the query. The returned error will be a
  403. // pq.Error containing the information the server supplied.
  404. // 3) Close is called on the Listener before the request could be completed.
  405. //
  406. // The channel name is case-sensitive.
  407. func (l *Listener) Listen(channel string) error {
  408. l.lock.Lock()
  409. defer l.lock.Unlock()
  410. if l.isClosed {
  411. return errListenerClosed
  412. }
  413. // The server allows you to issue a LISTEN on a channel which is already
  414. // open, but it seems useful to be able to detect this case to spot for
  415. // mistakes in application logic. If the application genuinely does't
  416. // care, it can check the exported error and ignore it.
  417. _, exists := l.channels[channel]
  418. if exists {
  419. return ErrChannelAlreadyOpen
  420. }
  421. if l.cn != nil {
  422. // If gotResponse is true but error is set, the query was executed on
  423. // the remote server, but resulted in an error. This should be
  424. // relatively rare, so it's fine if we just pass the error to our
  425. // caller. However, if gotResponse is false, we could not complete the
  426. // query on the remote server and our underlying connection is about
  427. // to go away, so we only add relname to l.channels, and wait for
  428. // resync() to take care of the rest.
  429. gotResponse, err := l.cn.Listen(channel)
  430. if gotResponse && err != nil {
  431. return err
  432. }
  433. }
  434. l.channels[channel] = struct{}{}
  435. for l.cn == nil {
  436. l.reconnectCond.Wait()
  437. // we let go of the mutex for a while
  438. if l.isClosed {
  439. return errListenerClosed
  440. }
  441. }
  442. return nil
  443. }
  444. // Unlisten removes a channel from the Listener's channel list. Returns
  445. // ErrChannelNotOpen if the Listener is not listening on the specified channel.
  446. // Returns immediately with no error if there is no connection. Note that you
  447. // might still get notifications for this channel even after Unlisten has
  448. // returned.
  449. //
  450. // The channel name is case-sensitive.
  451. func (l *Listener) Unlisten(channel string) error {
  452. l.lock.Lock()
  453. defer l.lock.Unlock()
  454. if l.isClosed {
  455. return errListenerClosed
  456. }
  457. // Similarly to LISTEN, this is not an error in Postgres, but it seems
  458. // useful to distinguish from the normal conditions.
  459. _, exists := l.channels[channel]
  460. if !exists {
  461. return ErrChannelNotOpen
  462. }
  463. if l.cn != nil {
  464. // Similarly to Listen (see comment in that function), the caller
  465. // should only be bothered with an error if it came from the backend as
  466. // a response to our query.
  467. gotResponse, err := l.cn.Unlisten(channel)
  468. if gotResponse && err != nil {
  469. return err
  470. }
  471. }
  472. // Don't bother waiting for resync if there's no connection.
  473. delete(l.channels, channel)
  474. return nil
  475. }
  476. // UnlistenAll removes all channels from the Listener's channel list. Returns
  477. // immediately with no error if there is no connection. Note that you might
  478. // still get notifications for any of the deleted channels even after
  479. // UnlistenAll has returned.
  480. func (l *Listener) UnlistenAll() error {
  481. l.lock.Lock()
  482. defer l.lock.Unlock()
  483. if l.isClosed {
  484. return errListenerClosed
  485. }
  486. if l.cn != nil {
  487. // Similarly to Listen (see comment in that function), the caller
  488. // should only be bothered with an error if it came from the backend as
  489. // a response to our query.
  490. gotResponse, err := l.cn.UnlistenAll()
  491. if gotResponse && err != nil {
  492. return err
  493. }
  494. }
  495. // Don't bother waiting for resync if there's no connection.
  496. l.channels = make(map[string]struct{})
  497. return nil
  498. }
  499. // Ping the remote server to make sure it's alive. Non-nil return value means
  500. // that there is no active connection.
  501. func (l *Listener) Ping() error {
  502. l.lock.Lock()
  503. defer l.lock.Unlock()
  504. if l.isClosed {
  505. return errListenerClosed
  506. }
  507. if l.cn == nil {
  508. return errors.New("no connection")
  509. }
  510. return l.cn.Ping()
  511. }
  512. // Clean up after losing the server connection. Returns l.cn.Err(), which
  513. // should have the reason the connection was lost.
  514. func (l *Listener) disconnectCleanup() error {
  515. l.lock.Lock()
  516. defer l.lock.Unlock()
  517. // sanity check; can't look at Err() until the channel has been closed
  518. select {
  519. case _, ok := <-l.connNotificationChan:
  520. if ok {
  521. panic("connNotificationChan not closed")
  522. }
  523. default:
  524. panic("connNotificationChan not closed")
  525. }
  526. err := l.cn.Err()
  527. l.cn.Close()
  528. l.cn = nil
  529. return err
  530. }
  531. // Synchronize the list of channels we want to be listening on with the server
  532. // after the connection has been established.
  533. func (l *Listener) resync(cn *ListenerConn, notificationChan <-chan *Notification) error {
  534. doneChan := make(chan error)
  535. go func() {
  536. for channel := range l.channels {
  537. // If we got a response, return that error to our caller as it's
  538. // going to be more descriptive than cn.Err().
  539. gotResponse, err := cn.Listen(channel)
  540. if gotResponse && err != nil {
  541. doneChan <- err
  542. return
  543. }
  544. // If we couldn't reach the server, wait for notificationChan to
  545. // close and then return the error message from the connection, as
  546. // per ListenerConn's interface.
  547. if err != nil {
  548. for _ = range notificationChan {
  549. }
  550. doneChan <- cn.Err()
  551. return
  552. }
  553. }
  554. doneChan <- nil
  555. }()
  556. // Ignore notifications while synchronization is going on to avoid
  557. // deadlocks. We have to send a nil notification over Notify anyway as
  558. // we can't possibly know which notifications (if any) were lost while
  559. // the connection was down, so there's no reason to try and process
  560. // these messages at all.
  561. for {
  562. select {
  563. case _, ok := <-notificationChan:
  564. if !ok {
  565. notificationChan = nil
  566. }
  567. case err := <-doneChan:
  568. return err
  569. }
  570. }
  571. }
  572. // caller should NOT be holding l.lock
  573. func (l *Listener) closed() bool {
  574. l.lock.Lock()
  575. defer l.lock.Unlock()
  576. return l.isClosed
  577. }
  578. func (l *Listener) connect() error {
  579. notificationChan := make(chan *Notification, 32)
  580. cn, err := newDialListenerConn(l.dialer, l.name, notificationChan)
  581. if err != nil {
  582. return err
  583. }
  584. l.lock.Lock()
  585. defer l.lock.Unlock()
  586. err = l.resync(cn, notificationChan)
  587. if err != nil {
  588. cn.Close()
  589. return err
  590. }
  591. l.cn = cn
  592. l.connNotificationChan = notificationChan
  593. l.reconnectCond.Broadcast()
  594. return nil
  595. }
  596. // Close disconnects the Listener from the database and shuts it down.
  597. // Subsequent calls to its methods will return an error. Close returns an
  598. // error if the connection has already been closed.
  599. func (l *Listener) Close() error {
  600. l.lock.Lock()
  601. defer l.lock.Unlock()
  602. if l.isClosed {
  603. return errListenerClosed
  604. }
  605. if l.cn != nil {
  606. l.cn.Close()
  607. }
  608. l.isClosed = true
  609. return nil
  610. }
  611. func (l *Listener) emitEvent(event ListenerEventType, err error) {
  612. if l.eventCallback != nil {
  613. l.eventCallback(event, err)
  614. }
  615. }
  616. // Main logic here: maintain a connection to the server when possible, wait
  617. // for notifications and emit events.
  618. func (l *Listener) listenerConnLoop() {
  619. var nextReconnect time.Time
  620. reconnectInterval := l.minReconnectInterval
  621. for {
  622. for {
  623. err := l.connect()
  624. if err == nil {
  625. break
  626. }
  627. if l.closed() {
  628. return
  629. }
  630. l.emitEvent(ListenerEventConnectionAttemptFailed, err)
  631. time.Sleep(reconnectInterval)
  632. reconnectInterval *= 2
  633. if reconnectInterval > l.maxReconnectInterval {
  634. reconnectInterval = l.maxReconnectInterval
  635. }
  636. }
  637. if nextReconnect.IsZero() {
  638. l.emitEvent(ListenerEventConnected, nil)
  639. } else {
  640. l.emitEvent(ListenerEventReconnected, nil)
  641. l.Notify <- nil
  642. }
  643. reconnectInterval = l.minReconnectInterval
  644. nextReconnect = time.Now().Add(reconnectInterval)
  645. for {
  646. notification, ok := <-l.connNotificationChan
  647. if !ok {
  648. // lost connection, loop again
  649. break
  650. }
  651. l.Notify <- notification
  652. }
  653. err := l.disconnectCleanup()
  654. if l.closed() {
  655. return
  656. }
  657. l.emitEvent(ListenerEventDisconnected, err)
  658. time.Sleep(nextReconnect.Sub(time.Now()))
  659. }
  660. }
  661. func (l *Listener) listenerMain() {
  662. l.listenerConnLoop()
  663. close(l.Notify)
  664. }