snowflake-broker_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522
  1. package main
  2. import (
  3. "bytes"
  4. "container/heap"
  5. . "github.com/smartystreets/goconvey/convey"
  6. "io/ioutil"
  7. "log"
  8. "net"
  9. "net/http"
  10. "net/http/httptest"
  11. "os"
  12. "testing"
  13. "time"
  14. )
  15. func NullLogger() *log.Logger {
  16. logger := log.New(os.Stdout, "", 0)
  17. logger.SetOutput(ioutil.Discard)
  18. return logger
  19. }
  20. func TestBroker(t *testing.T) {
  21. Convey("Context", t, func() {
  22. ctx := NewBrokerContext(NullLogger())
  23. Convey("Adds Snowflake", func() {
  24. So(ctx.snowflakes.Len(), ShouldEqual, 0)
  25. So(len(ctx.idToSnowflake), ShouldEqual, 0)
  26. ctx.AddSnowflake("foo")
  27. So(ctx.snowflakes.Len(), ShouldEqual, 1)
  28. So(len(ctx.idToSnowflake), ShouldEqual, 1)
  29. })
  30. Convey("Broker goroutine matches clients with proxies", func() {
  31. p := new(ProxyPoll)
  32. p.id = "test"
  33. p.offerChannel = make(chan []byte)
  34. go func(ctx *BrokerContext) {
  35. ctx.proxyPolls <- p
  36. close(ctx.proxyPolls)
  37. }(ctx)
  38. ctx.Broker()
  39. So(ctx.snowflakes.Len(), ShouldEqual, 1)
  40. snowflake := heap.Pop(ctx.snowflakes).(*Snowflake)
  41. snowflake.offerChannel <- []byte("test offer")
  42. offer := <-p.offerChannel
  43. So(ctx.idToSnowflake["test"], ShouldNotBeNil)
  44. So(offer, ShouldResemble, []byte("test offer"))
  45. So(ctx.snowflakes.Len(), ShouldEqual, 0)
  46. })
  47. Convey("Request an offer from the Snowflake Heap", func() {
  48. done := make(chan []byte)
  49. go func() {
  50. offer := ctx.RequestOffer("test")
  51. done <- offer
  52. }()
  53. request := <-ctx.proxyPolls
  54. request.offerChannel <- []byte("test offer")
  55. offer := <-done
  56. So(offer, ShouldResemble, []byte("test offer"))
  57. })
  58. Convey("Responds to client offers...", func() {
  59. w := httptest.NewRecorder()
  60. data := bytes.NewReader([]byte("test"))
  61. r, err := http.NewRequest("POST", "snowflake.broker/client", data)
  62. So(err, ShouldBeNil)
  63. Convey("with 503 when no snowflakes are available.", func() {
  64. clientOffers(ctx, w, r)
  65. So(w.Code, ShouldEqual, http.StatusServiceUnavailable)
  66. So(w.Body.String(), ShouldEqual, "")
  67. })
  68. Convey("with a proxy answer if available.", func() {
  69. done := make(chan bool)
  70. // Prepare a fake proxy to respond with.
  71. snowflake := ctx.AddSnowflake("fake")
  72. go func() {
  73. clientOffers(ctx, w, r)
  74. done <- true
  75. }()
  76. offer := <-snowflake.offerChannel
  77. So(offer, ShouldResemble, []byte("test"))
  78. snowflake.answerChannel <- []byte("fake answer")
  79. <-done
  80. So(w.Body.String(), ShouldEqual, "fake answer")
  81. So(w.Code, ShouldEqual, http.StatusOK)
  82. })
  83. Convey("Times out when no proxy responds.", func() {
  84. if testing.Short() {
  85. return
  86. }
  87. done := make(chan bool)
  88. snowflake := ctx.AddSnowflake("fake")
  89. go func() {
  90. clientOffers(ctx, w, r)
  91. // Takes a few seconds here...
  92. done <- true
  93. }()
  94. offer := <-snowflake.offerChannel
  95. So(offer, ShouldResemble, []byte("test"))
  96. <-done
  97. So(w.Code, ShouldEqual, http.StatusGatewayTimeout)
  98. })
  99. })
  100. Convey("Responds to proxy polls...", func() {
  101. done := make(chan bool)
  102. w := httptest.NewRecorder()
  103. data := bytes.NewReader([]byte("test"))
  104. r, err := http.NewRequest("POST", "snowflake.broker/proxy", data)
  105. r.Header.Set("X-Session-ID", "test")
  106. So(err, ShouldBeNil)
  107. Convey("with a client offer if available.", func() {
  108. go func(ctx *BrokerContext) {
  109. proxyPolls(ctx, w, r)
  110. done <- true
  111. }(ctx)
  112. // Pass a fake client offer to this proxy
  113. p := <-ctx.proxyPolls
  114. So(p.id, ShouldEqual, "test")
  115. p.offerChannel <- []byte("fake offer")
  116. <-done
  117. So(w.Code, ShouldEqual, http.StatusOK)
  118. So(w.Body.String(), ShouldEqual, "fake offer")
  119. })
  120. Convey("times out when no client offer is available.", func() {
  121. go func(ctx *BrokerContext) {
  122. proxyPolls(ctx, w, r)
  123. done <- true
  124. }(ctx)
  125. p := <-ctx.proxyPolls
  126. So(p.id, ShouldEqual, "test")
  127. // nil means timeout
  128. p.offerChannel <- nil
  129. <-done
  130. So(w.Body.String(), ShouldEqual, "")
  131. So(w.Code, ShouldEqual, http.StatusGatewayTimeout)
  132. })
  133. })
  134. Convey("Responds to proxy answers...", func() {
  135. s := ctx.AddSnowflake("test")
  136. w := httptest.NewRecorder()
  137. data := bytes.NewReader([]byte("fake answer"))
  138. Convey("by passing to the client if valid.", func() {
  139. r, err := http.NewRequest("POST", "snowflake.broker/answer", data)
  140. So(err, ShouldBeNil)
  141. r.Header.Set("X-Session-ID", "test")
  142. go func(ctx *BrokerContext) {
  143. proxyAnswers(ctx, w, r)
  144. }(ctx)
  145. answer := <-s.answerChannel
  146. So(w.Code, ShouldEqual, http.StatusOK)
  147. So(answer, ShouldResemble, []byte("fake answer"))
  148. })
  149. Convey("with error if the proxy is not recognized", func() {
  150. r, err := http.NewRequest("POST", "snowflake.broker/answer", nil)
  151. So(err, ShouldBeNil)
  152. r.Header.Set("X-Session-ID", "invalid")
  153. proxyAnswers(ctx, w, r)
  154. So(w.Code, ShouldEqual, http.StatusGone)
  155. })
  156. Convey("with error if the proxy gives invalid answer", func() {
  157. data := bytes.NewReader(nil)
  158. r, err := http.NewRequest("POST", "snowflake.broker/answer", data)
  159. r.Header.Set("X-Session-ID", "test")
  160. So(err, ShouldBeNil)
  161. proxyAnswers(ctx, w, r)
  162. So(w.Code, ShouldEqual, http.StatusBadRequest)
  163. })
  164. Convey("with error if the proxy writes too much data", func() {
  165. data := bytes.NewReader(make([]byte, 100001, 100001))
  166. r, err := http.NewRequest("POST", "snowflake.broker/answer", data)
  167. r.Header.Set("X-Session-ID", "test")
  168. So(err, ShouldBeNil)
  169. proxyAnswers(ctx, w, r)
  170. So(w.Code, ShouldEqual, http.StatusBadRequest)
  171. })
  172. })
  173. })
  174. Convey("End-To-End", t, func() {
  175. done := make(chan bool)
  176. polled := make(chan bool)
  177. ctx := NewBrokerContext(NullLogger())
  178. // Proxy polls with its ID first...
  179. dataP := bytes.NewReader([]byte("test"))
  180. wP := httptest.NewRecorder()
  181. rP, err := http.NewRequest("POST", "snowflake.broker/proxy", dataP)
  182. So(err, ShouldBeNil)
  183. rP.Header.Set("X-Session-ID", "test")
  184. go func() {
  185. proxyPolls(ctx, wP, rP)
  186. polled <- true
  187. }()
  188. // Manually do the Broker goroutine action here for full control.
  189. p := <-ctx.proxyPolls
  190. So(p.id, ShouldEqual, "test")
  191. s := ctx.AddSnowflake(p.id)
  192. go func() {
  193. offer := <-s.offerChannel
  194. p.offerChannel <- offer
  195. }()
  196. So(ctx.idToSnowflake["test"], ShouldNotBeNil)
  197. // Client request blocks until proxy answer arrives.
  198. dataC := bytes.NewReader([]byte("fake offer"))
  199. wC := httptest.NewRecorder()
  200. rC, err := http.NewRequest("POST", "snowflake.broker/client", dataC)
  201. So(err, ShouldBeNil)
  202. go func() {
  203. clientOffers(ctx, wC, rC)
  204. done <- true
  205. }()
  206. <-polled
  207. So(wP.Code, ShouldEqual, http.StatusOK)
  208. So(wP.Body.String(), ShouldResemble, "fake offer")
  209. So(ctx.idToSnowflake["test"], ShouldNotBeNil)
  210. // Follow up with the answer request afterwards
  211. wA := httptest.NewRecorder()
  212. dataA := bytes.NewReader([]byte("fake answer"))
  213. rA, err := http.NewRequest("POST", "snowflake.broker/proxy", dataA)
  214. So(err, ShouldBeNil)
  215. rA.Header.Set("X-Session-ID", "test")
  216. proxyAnswers(ctx, wA, rA)
  217. So(wA.Code, ShouldEqual, http.StatusOK)
  218. <-done
  219. So(wC.Code, ShouldEqual, http.StatusOK)
  220. So(wC.Body.String(), ShouldEqual, "fake answer")
  221. })
  222. }
  223. func TestSnowflakeHeap(t *testing.T) {
  224. Convey("SnowflakeHeap", t, func() {
  225. h := new(SnowflakeHeap)
  226. heap.Init(h)
  227. So(h.Len(), ShouldEqual, 0)
  228. s1 := new(Snowflake)
  229. s2 := new(Snowflake)
  230. s3 := new(Snowflake)
  231. s4 := new(Snowflake)
  232. s1.clients = 4
  233. s2.clients = 5
  234. s3.clients = 3
  235. s4.clients = 1
  236. heap.Push(h, s1)
  237. So(h.Len(), ShouldEqual, 1)
  238. heap.Push(h, s2)
  239. So(h.Len(), ShouldEqual, 2)
  240. heap.Push(h, s3)
  241. So(h.Len(), ShouldEqual, 3)
  242. heap.Push(h, s4)
  243. So(h.Len(), ShouldEqual, 4)
  244. heap.Remove(h, 0)
  245. So(h.Len(), ShouldEqual, 3)
  246. r := heap.Pop(h).(*Snowflake)
  247. So(h.Len(), ShouldEqual, 2)
  248. So(r.clients, ShouldEqual, 3)
  249. So(r.index, ShouldEqual, -1)
  250. r = heap.Pop(h).(*Snowflake)
  251. So(h.Len(), ShouldEqual, 1)
  252. So(r.clients, ShouldEqual, 4)
  253. So(r.index, ShouldEqual, -1)
  254. r = heap.Pop(h).(*Snowflake)
  255. So(h.Len(), ShouldEqual, 0)
  256. So(r.clients, ShouldEqual, 5)
  257. So(r.index, ShouldEqual, -1)
  258. })
  259. }
  260. func TestGeoip(t *testing.T) {
  261. Convey("Geoip", t, func() {
  262. tv4 := new(GeoIPv4Table)
  263. err := GeoIPLoadFile(tv4, "test_geoip")
  264. So(err, ShouldEqual, nil)
  265. tv6 := new(GeoIPv6Table)
  266. err = GeoIPLoadFile(tv6, "test_geoip6")
  267. So(err, ShouldEqual, nil)
  268. Convey("IPv4 Country Mapping Tests", func() {
  269. for _, test := range []struct {
  270. addr, cc string
  271. ok bool
  272. }{
  273. {
  274. "129.97.208.23", //uwaterloo
  275. "CA",
  276. true,
  277. },
  278. {
  279. "127.0.0.1",
  280. "",
  281. false,
  282. },
  283. {
  284. "255.255.255.255",
  285. "",
  286. false,
  287. },
  288. {
  289. "0.0.0.0",
  290. "",
  291. false,
  292. },
  293. {
  294. "223.252.127.255", //test high end of range
  295. "JP",
  296. true,
  297. },
  298. {
  299. "223.252.127.255", //test low end of range
  300. "JP",
  301. true,
  302. },
  303. } {
  304. country, ok := GetCountryByAddr(tv4, net.ParseIP(test.addr))
  305. So(country, ShouldEqual, test.cc)
  306. So(ok, ShouldResemble, test.ok)
  307. }
  308. })
  309. Convey("IPv6 Country Mapping Tests", func() {
  310. for _, test := range []struct {
  311. addr, cc string
  312. ok bool
  313. }{
  314. {
  315. "2620:101:f000:0:250:56ff:fe80:168e", //uwaterloo
  316. "CA",
  317. true,
  318. },
  319. {
  320. "fd00:0:0:0:0:0:0:1",
  321. "",
  322. false,
  323. },
  324. {
  325. "0:0:0:0:0:0:0:0",
  326. "",
  327. false,
  328. },
  329. {
  330. "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff",
  331. "",
  332. false,
  333. },
  334. {
  335. "2a07:2e47:ffff:ffff:ffff:ffff:ffff:ffff", //test high end of range
  336. "FR",
  337. true,
  338. },
  339. {
  340. "2a07:2e40::", //test low end of range
  341. "FR",
  342. true,
  343. },
  344. } {
  345. country, ok := GetCountryByAddr(tv6, net.ParseIP(test.addr))
  346. So(country, ShouldEqual, test.cc)
  347. So(ok, ShouldResemble, test.ok)
  348. }
  349. })
  350. // Make sure things behave properly if geoip file fails to load
  351. ctx := NewBrokerContext(NullLogger())
  352. ctx.metrics.LoadGeoipDatabases("invalid_filename", "invalid_filename6")
  353. ctx.metrics.UpdateCountryStats("127.0.0.1")
  354. So(ctx.metrics.tablev4, ShouldEqual, nil)
  355. })
  356. }
  357. func TestMetrics(t *testing.T) {
  358. Convey("Test metrics...", t, func() {
  359. done := make(chan bool)
  360. buf := new(bytes.Buffer)
  361. ctx := NewBrokerContext(log.New(buf, "", 0))
  362. err := ctx.metrics.LoadGeoipDatabases("test_geoip", "test_geoip6")
  363. So(err, ShouldEqual, nil)
  364. //Test addition of proxy polls
  365. Convey("for proxy polls", func() {
  366. w := httptest.NewRecorder()
  367. data := bytes.NewReader([]byte("test"))
  368. r, err := http.NewRequest("POST", "snowflake.broker/proxy", data)
  369. r.Header.Set("X-Session-ID", "test")
  370. r.RemoteAddr = "129.97.208.23:8888" //CA geoip
  371. So(err, ShouldBeNil)
  372. go func(ctx *BrokerContext) {
  373. proxyPolls(ctx, w, r)
  374. done <- true
  375. }(ctx)
  376. p := <-ctx.proxyPolls //manually unblock poll
  377. p.offerChannel <- nil
  378. <-done
  379. ctx.metrics.printMetrics()
  380. So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips CA=1\nsnowflake-ips-total 1\nsnowflake-idle-count 8\nclient-denied-count 0\nclient-snowflake-match-count 0\n")
  381. })
  382. //Test addition of client failures
  383. Convey("for no proxies available", func() {
  384. w := httptest.NewRecorder()
  385. data := bytes.NewReader([]byte("test"))
  386. r, err := http.NewRequest("POST", "snowflake.broker/client", data)
  387. So(err, ShouldBeNil)
  388. clientOffers(ctx, w, r)
  389. ctx.metrics.printMetrics()
  390. So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips \nsnowflake-ips-total 0\nsnowflake-idle-count 0\nclient-denied-count 8\nclient-snowflake-match-count 0\n")
  391. // Test reset
  392. buf.Reset()
  393. ctx.metrics.zeroMetrics()
  394. ctx.metrics.printMetrics()
  395. So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips \nsnowflake-ips-total 0\nsnowflake-idle-count 0\nclient-denied-count 0\nclient-snowflake-match-count 0\n")
  396. })
  397. //Test addition of client matches
  398. Convey("for client-proxy match", func() {
  399. w := httptest.NewRecorder()
  400. data := bytes.NewReader([]byte("test"))
  401. r, err := http.NewRequest("POST", "snowflake.broker/client", data)
  402. So(err, ShouldBeNil)
  403. // Prepare a fake proxy to respond with.
  404. snowflake := ctx.AddSnowflake("fake")
  405. go func() {
  406. clientOffers(ctx, w, r)
  407. done <- true
  408. }()
  409. offer := <-snowflake.offerChannel
  410. So(offer, ShouldResemble, []byte("test"))
  411. snowflake.answerChannel <- []byte("fake answer")
  412. <-done
  413. ctx.metrics.printMetrics()
  414. So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips \nsnowflake-ips-total 0\nsnowflake-idle-count 0\nclient-denied-count 0\nclient-snowflake-match-count 8\n")
  415. })
  416. //Test rounding boundary
  417. Convey("binning boundary", func() {
  418. w := httptest.NewRecorder()
  419. data := bytes.NewReader([]byte("test"))
  420. r, err := http.NewRequest("POST", "snowflake.broker/client", data)
  421. So(err, ShouldBeNil)
  422. clientOffers(ctx, w, r)
  423. clientOffers(ctx, w, r)
  424. clientOffers(ctx, w, r)
  425. clientOffers(ctx, w, r)
  426. clientOffers(ctx, w, r)
  427. clientOffers(ctx, w, r)
  428. clientOffers(ctx, w, r)
  429. clientOffers(ctx, w, r)
  430. ctx.metrics.printMetrics()
  431. So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips \nsnowflake-ips-total 0\nsnowflake-idle-count 0\nclient-denied-count 8\nclient-snowflake-match-count 0\n")
  432. clientOffers(ctx, w, r)
  433. buf.Reset()
  434. ctx.metrics.printMetrics()
  435. So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips \nsnowflake-ips-total 0\nsnowflake-idle-count 0\nclient-denied-count 16\nclient-snowflake-match-count 0\n")
  436. })
  437. //Test unique ip
  438. Convey("proxy counts by unique ip", func() {
  439. w := httptest.NewRecorder()
  440. data := bytes.NewReader([]byte("test"))
  441. r, err := http.NewRequest("POST", "snowflake.broker/proxy", data)
  442. r.Header.Set("X-Session-ID", "test")
  443. r.RemoteAddr = "129.97.208.23:8888" //CA geoip
  444. So(err, ShouldBeNil)
  445. go func(ctx *BrokerContext) {
  446. proxyPolls(ctx, w, r)
  447. done <- true
  448. }(ctx)
  449. p := <-ctx.proxyPolls //manually unblock poll
  450. p.offerChannel <- nil
  451. <-done
  452. data = bytes.NewReader([]byte("test"))
  453. r, err = http.NewRequest("POST", "snowflake.broker/proxy", data)
  454. r.Header.Set("X-Session-ID", "test")
  455. r.RemoteAddr = "129.97.208.23:8888" //CA geoip
  456. go func(ctx *BrokerContext) {
  457. proxyPolls(ctx, w, r)
  458. done <- true
  459. }(ctx)
  460. p = <-ctx.proxyPolls //manually unblock poll
  461. p.offerChannel <- nil
  462. <-done
  463. ctx.metrics.printMetrics()
  464. So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips CA=1\nsnowflake-ips-total 1\nsnowflake-idle-count 8\nclient-denied-count 0\nclient-snowflake-match-count 0\n")
  465. })
  466. })
  467. }