snowflake-broker_test.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. package main
  2. import (
  3. "bytes"
  4. "container/heap"
  5. . "github.com/smartystreets/goconvey/convey"
  6. "io/ioutil"
  7. "log"
  8. "net"
  9. "net/http"
  10. "net/http/httptest"
  11. "os"
  12. "testing"
  13. )
  14. func NullLogger() *log.Logger {
  15. logger := log.New(os.Stdout, "", 0)
  16. logger.SetOutput(ioutil.Discard)
  17. return logger
  18. }
  19. func TestBroker(t *testing.T) {
  20. Convey("Context", t, func() {
  21. ctx := NewBrokerContext(NullLogger())
  22. Convey("Adds Snowflake", func() {
  23. So(ctx.snowflakes.Len(), ShouldEqual, 0)
  24. So(len(ctx.idToSnowflake), ShouldEqual, 0)
  25. ctx.AddSnowflake("foo")
  26. So(ctx.snowflakes.Len(), ShouldEqual, 1)
  27. So(len(ctx.idToSnowflake), ShouldEqual, 1)
  28. })
  29. Convey("Broker goroutine matches clients with proxies", func() {
  30. p := new(ProxyPoll)
  31. p.id = "test"
  32. p.offerChannel = make(chan []byte)
  33. go func(ctx *BrokerContext) {
  34. ctx.proxyPolls <- p
  35. close(ctx.proxyPolls)
  36. }(ctx)
  37. ctx.Broker()
  38. So(ctx.snowflakes.Len(), ShouldEqual, 1)
  39. snowflake := heap.Pop(ctx.snowflakes).(*Snowflake)
  40. snowflake.offerChannel <- []byte("test offer")
  41. offer := <-p.offerChannel
  42. So(ctx.idToSnowflake["test"], ShouldNotBeNil)
  43. So(offer, ShouldResemble, []byte("test offer"))
  44. So(ctx.snowflakes.Len(), ShouldEqual, 0)
  45. })
  46. Convey("Request an offer from the Snowflake Heap", func() {
  47. done := make(chan []byte)
  48. go func() {
  49. offer := ctx.RequestOffer("test")
  50. done <- offer
  51. }()
  52. request := <-ctx.proxyPolls
  53. request.offerChannel <- []byte("test offer")
  54. offer := <-done
  55. So(offer, ShouldResemble, []byte("test offer"))
  56. })
  57. Convey("Responds to client offers...", func() {
  58. w := httptest.NewRecorder()
  59. data := bytes.NewReader([]byte("test"))
  60. r, err := http.NewRequest("POST", "snowflake.broker/client", data)
  61. So(err, ShouldBeNil)
  62. Convey("with 503 when no snowflakes are available.", func() {
  63. clientOffers(ctx, w, r)
  64. So(w.Code, ShouldEqual, http.StatusServiceUnavailable)
  65. So(w.Body.String(), ShouldEqual, "")
  66. })
  67. Convey("with a proxy answer if available.", func() {
  68. done := make(chan bool)
  69. // Prepare a fake proxy to respond with.
  70. snowflake := ctx.AddSnowflake("fake")
  71. go func() {
  72. clientOffers(ctx, w, r)
  73. done <- true
  74. }()
  75. offer := <-snowflake.offerChannel
  76. So(offer, ShouldResemble, []byte("test"))
  77. snowflake.answerChannel <- []byte("fake answer")
  78. <-done
  79. So(w.Body.String(), ShouldEqual, "fake answer")
  80. So(w.Code, ShouldEqual, http.StatusOK)
  81. })
  82. Convey("Times out when no proxy responds.", func() {
  83. if testing.Short() {
  84. return
  85. }
  86. done := make(chan bool)
  87. snowflake := ctx.AddSnowflake("fake")
  88. go func() {
  89. clientOffers(ctx, w, r)
  90. // Takes a few seconds here...
  91. done <- true
  92. }()
  93. offer := <-snowflake.offerChannel
  94. So(offer, ShouldResemble, []byte("test"))
  95. <-done
  96. So(w.Code, ShouldEqual, http.StatusGatewayTimeout)
  97. })
  98. })
  99. Convey("Responds to proxy polls...", func() {
  100. done := make(chan bool)
  101. w := httptest.NewRecorder()
  102. data := bytes.NewReader([]byte("test"))
  103. r, err := http.NewRequest("POST", "snowflake.broker/proxy", data)
  104. r.Header.Set("X-Session-ID", "test")
  105. So(err, ShouldBeNil)
  106. Convey("with a client offer if available.", func() {
  107. go func(ctx *BrokerContext) {
  108. proxyPolls(ctx, w, r)
  109. done <- true
  110. }(ctx)
  111. // Pass a fake client offer to this proxy
  112. p := <-ctx.proxyPolls
  113. So(p.id, ShouldEqual, "test")
  114. p.offerChannel <- []byte("fake offer")
  115. <-done
  116. So(w.Code, ShouldEqual, http.StatusOK)
  117. So(w.Body.String(), ShouldEqual, "fake offer")
  118. })
  119. Convey("times out when no client offer is available.", func() {
  120. go func(ctx *BrokerContext) {
  121. proxyPolls(ctx, w, r)
  122. done <- true
  123. }(ctx)
  124. p := <-ctx.proxyPolls
  125. So(p.id, ShouldEqual, "test")
  126. // nil means timeout
  127. p.offerChannel <- nil
  128. <-done
  129. So(w.Body.String(), ShouldEqual, "")
  130. So(w.Code, ShouldEqual, http.StatusGatewayTimeout)
  131. })
  132. })
  133. Convey("Responds to proxy answers...", func() {
  134. s := ctx.AddSnowflake("test")
  135. w := httptest.NewRecorder()
  136. data := bytes.NewReader([]byte("fake answer"))
  137. Convey("by passing to the client if valid.", func() {
  138. r, err := http.NewRequest("POST", "snowflake.broker/answer", data)
  139. So(err, ShouldBeNil)
  140. r.Header.Set("X-Session-ID", "test")
  141. go func(ctx *BrokerContext) {
  142. proxyAnswers(ctx, w, r)
  143. }(ctx)
  144. answer := <-s.answerChannel
  145. So(w.Code, ShouldEqual, http.StatusOK)
  146. So(answer, ShouldResemble, []byte("fake answer"))
  147. })
  148. Convey("with error if the proxy is not recognized", func() {
  149. r, err := http.NewRequest("POST", "snowflake.broker/answer", nil)
  150. So(err, ShouldBeNil)
  151. r.Header.Set("X-Session-ID", "invalid")
  152. proxyAnswers(ctx, w, r)
  153. So(w.Code, ShouldEqual, http.StatusGone)
  154. })
  155. Convey("with error if the proxy gives invalid answer", func() {
  156. data := bytes.NewReader(nil)
  157. r, err := http.NewRequest("POST", "snowflake.broker/answer", data)
  158. r.Header.Set("X-Session-ID", "test")
  159. So(err, ShouldBeNil)
  160. proxyAnswers(ctx, w, r)
  161. So(w.Code, ShouldEqual, http.StatusBadRequest)
  162. })
  163. Convey("with error if the proxy writes too much data", func() {
  164. data := bytes.NewReader(make([]byte, 100001, 100001))
  165. r, err := http.NewRequest("POST", "snowflake.broker/answer", data)
  166. r.Header.Set("X-Session-ID", "test")
  167. So(err, ShouldBeNil)
  168. proxyAnswers(ctx, w, r)
  169. So(w.Code, ShouldEqual, http.StatusBadRequest)
  170. })
  171. })
  172. })
  173. Convey("End-To-End", t, func() {
  174. done := make(chan bool)
  175. polled := make(chan bool)
  176. ctx := NewBrokerContext(NullLogger())
  177. // Proxy polls with its ID first...
  178. dataP := bytes.NewReader([]byte("test"))
  179. wP := httptest.NewRecorder()
  180. rP, err := http.NewRequest("POST", "snowflake.broker/proxy", dataP)
  181. So(err, ShouldBeNil)
  182. rP.Header.Set("X-Session-ID", "test")
  183. go func() {
  184. proxyPolls(ctx, wP, rP)
  185. polled <- true
  186. }()
  187. // Manually do the Broker goroutine action here for full control.
  188. p := <-ctx.proxyPolls
  189. So(p.id, ShouldEqual, "test")
  190. s := ctx.AddSnowflake(p.id)
  191. go func() {
  192. offer := <-s.offerChannel
  193. p.offerChannel <- offer
  194. }()
  195. So(ctx.idToSnowflake["test"], ShouldNotBeNil)
  196. // Client request blocks until proxy answer arrives.
  197. dataC := bytes.NewReader([]byte("fake offer"))
  198. wC := httptest.NewRecorder()
  199. rC, err := http.NewRequest("POST", "snowflake.broker/client", dataC)
  200. So(err, ShouldBeNil)
  201. go func() {
  202. clientOffers(ctx, wC, rC)
  203. done <- true
  204. }()
  205. <-polled
  206. So(wP.Code, ShouldEqual, http.StatusOK)
  207. So(wP.Body.String(), ShouldResemble, "fake offer")
  208. So(ctx.idToSnowflake["test"], ShouldNotBeNil)
  209. // Follow up with the answer request afterwards
  210. wA := httptest.NewRecorder()
  211. dataA := bytes.NewReader([]byte("fake answer"))
  212. rA, err := http.NewRequest("POST", "snowflake.broker/proxy", dataA)
  213. So(err, ShouldBeNil)
  214. rA.Header.Set("X-Session-ID", "test")
  215. proxyAnswers(ctx, wA, rA)
  216. So(wA.Code, ShouldEqual, http.StatusOK)
  217. <-done
  218. So(wC.Code, ShouldEqual, http.StatusOK)
  219. So(wC.Body.String(), ShouldEqual, "fake answer")
  220. })
  221. }
  222. func TestSnowflakeHeap(t *testing.T) {
  223. Convey("SnowflakeHeap", t, func() {
  224. h := new(SnowflakeHeap)
  225. heap.Init(h)
  226. So(h.Len(), ShouldEqual, 0)
  227. s1 := new(Snowflake)
  228. s2 := new(Snowflake)
  229. s3 := new(Snowflake)
  230. s4 := new(Snowflake)
  231. s1.clients = 4
  232. s2.clients = 5
  233. s3.clients = 3
  234. s4.clients = 1
  235. heap.Push(h, s1)
  236. So(h.Len(), ShouldEqual, 1)
  237. heap.Push(h, s2)
  238. So(h.Len(), ShouldEqual, 2)
  239. heap.Push(h, s3)
  240. So(h.Len(), ShouldEqual, 3)
  241. heap.Push(h, s4)
  242. So(h.Len(), ShouldEqual, 4)
  243. heap.Remove(h, 0)
  244. So(h.Len(), ShouldEqual, 3)
  245. r := heap.Pop(h).(*Snowflake)
  246. So(h.Len(), ShouldEqual, 2)
  247. So(r.clients, ShouldEqual, 3)
  248. So(r.index, ShouldEqual, -1)
  249. r = heap.Pop(h).(*Snowflake)
  250. So(h.Len(), ShouldEqual, 1)
  251. So(r.clients, ShouldEqual, 4)
  252. So(r.index, ShouldEqual, -1)
  253. r = heap.Pop(h).(*Snowflake)
  254. So(h.Len(), ShouldEqual, 0)
  255. So(r.clients, ShouldEqual, 5)
  256. So(r.index, ShouldEqual, -1)
  257. })
  258. }
  259. func TestGeoip(t *testing.T) {
  260. Convey("Geoip", t, func() {
  261. tv4 := new(GeoIPv4Table)
  262. err := GeoIPLoadFile(tv4, "test_geoip")
  263. So(err, ShouldEqual, nil)
  264. tv6 := new(GeoIPv6Table)
  265. err = GeoIPLoadFile(tv6, "test_geoip6")
  266. So(err, ShouldEqual, nil)
  267. Convey("IPv4 Country Mapping Tests", func() {
  268. for _, test := range []struct {
  269. addr, cc string
  270. ok bool
  271. }{
  272. {
  273. "129.97.208.23", //uwaterloo
  274. "CA",
  275. true,
  276. },
  277. {
  278. "127.0.0.1",
  279. "",
  280. false,
  281. },
  282. {
  283. "255.255.255.255",
  284. "",
  285. false,
  286. },
  287. {
  288. "0.0.0.0",
  289. "",
  290. false,
  291. },
  292. {
  293. "223.252.127.255", //test high end of range
  294. "JP",
  295. true,
  296. },
  297. {
  298. "223.252.127.255", //test low end of range
  299. "JP",
  300. true,
  301. },
  302. } {
  303. country, ok := GetCountryByAddr(tv4, net.ParseIP(test.addr))
  304. So(country, ShouldEqual, test.cc)
  305. So(ok, ShouldResemble, test.ok)
  306. }
  307. })
  308. Convey("IPv6 Country Mapping Tests", func() {
  309. for _, test := range []struct {
  310. addr, cc string
  311. ok bool
  312. }{
  313. {
  314. "2620:101:f000:0:250:56ff:fe80:168e", //uwaterloo
  315. "CA",
  316. true,
  317. },
  318. {
  319. "fd00:0:0:0:0:0:0:1",
  320. "",
  321. false,
  322. },
  323. {
  324. "0:0:0:0:0:0:0:0",
  325. "",
  326. false,
  327. },
  328. {
  329. "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff",
  330. "",
  331. false,
  332. },
  333. {
  334. "2a07:2e47:ffff:ffff:ffff:ffff:ffff:ffff", //test high end of range
  335. "FR",
  336. true,
  337. },
  338. {
  339. "2a07:2e40::", //test low end of range
  340. "FR",
  341. true,
  342. },
  343. } {
  344. country, ok := GetCountryByAddr(tv6, net.ParseIP(test.addr))
  345. So(country, ShouldEqual, test.cc)
  346. So(ok, ShouldResemble, test.ok)
  347. }
  348. })
  349. // Make sure things behave properly if geoip file fails to load
  350. ctx := NewBrokerContext(NullLogger())
  351. ctx.metrics.LoadGeoipDatabases("invalid_filename", "invalid_filename6")
  352. ctx.metrics.UpdateCountryStats("127.0.0.1")
  353. So(ctx.metrics.tablev4, ShouldEqual, nil)
  354. })
  355. }