sqs_test.go 12 KB


  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "log"
  7. "strconv"
  8. "sync"
  9. "testing"
  10. "time"
  11. "github.com/aws/aws-sdk-go-v2/aws"
  12. "github.com/aws/aws-sdk-go-v2/service/sqs"
  13. "github.com/aws/aws-sdk-go-v2/service/sqs/types"
  14. "github.com/golang/mock/gomock"
  15. . "github.com/smartystreets/goconvey/convey"
  16. "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/sqsclient"
  17. )
  18. func TestSQS(t *testing.T) {
  19. Convey("Context", t, func() {
  20. buf := new(bytes.Buffer)
  21. ipcCtx := NewBrokerContext(log.New(buf, "", 0))
  22. i := &IPC{ipcCtx}
  23. var logBuffer bytes.Buffer
  24. log.SetOutput(&logBuffer)
  25. Convey("Responds to SQS client offers...", func() {
  26. ctrl := gomock.NewController(t)
  27. mockSQSClient := sqsclient.NewMockSQSClient(ctrl)
  28. brokerSQSQueueName := "example-name"
  29. responseQueueURL := aws.String("https://sqs.us-east-1.amazonaws.com/testing")
  30. runSQSHandler := func(sqsHandlerContext context.Context) {
  31. mockSQSClient.EXPECT().CreateQueue(sqsHandlerContext, &sqs.CreateQueueInput{
  32. QueueName: aws.String(brokerSQSQueueName),
  33. Attributes: map[string]string{
  34. "MessageRetentionPeriod": strconv.FormatInt(int64((5 * time.Minute).Seconds()), 10),
  35. },
  36. }).Return(&sqs.CreateQueueOutput{
  37. QueueUrl: responseQueueURL,
  38. }, nil).Times(1)
  39. sqsHandler, err := newSQSHandler(sqsHandlerContext, mockSQSClient, brokerSQSQueueName, "example-region", i)
  40. So(err, ShouldBeNil)
  41. go sqsHandler.PollAndHandleMessages(sqsHandlerContext)
  42. }
  43. messageBody := aws.String("1.0\n{\"offer\": \"fake\", \"nat\": \"unknown\"}")
  44. receiptHandle := "fake-receipt-handle"
  45. sqsReceiveMessageInput := sqs.ReceiveMessageInput{
  46. QueueUrl: responseQueueURL,
  47. MaxNumberOfMessages: 10,
  48. WaitTimeSeconds: 15,
  49. MessageAttributeNames: []string{
  50. string(types.QueueAttributeNameAll),
  51. },
  52. }
  53. sqsDeleteMessageInput := sqs.DeleteMessageInput{
  54. QueueUrl: responseQueueURL,
  55. ReceiptHandle: &receiptHandle,
  56. }
  57. Convey("by ignoring it if no client id specified", func(c C) {
  58. var wg sync.WaitGroup
  59. wg.Add(1)
  60. sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
  61. defer sqsCancelFunc()
  62. defer wg.Wait()
  63. mockSQSClient.EXPECT().ReceiveMessage(sqsHandlerContext, &sqsReceiveMessageInput).MinTimes(1).DoAndReturn(
  64. func(ctx context.Context, input *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) {
  65. return &sqs.ReceiveMessageOutput{
  66. Messages: []types.Message{
  67. {
  68. Body: messageBody,
  69. ReceiptHandle: &receiptHandle,
  70. },
  71. },
  72. }, nil
  73. },
  74. )
  75. mockSQSClient.EXPECT().DeleteMessage(sqsHandlerContext, &sqsDeleteMessageInput).Times(1).Do(
  76. func(ctx context.Context, input *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) {
  77. defer wg.Done()
  78. c.So(logBuffer.String(), ShouldContainSubstring, "SQSHandler: got SDP offer in SQS message with no client ID. ignoring this message.")
  79. mockSQSClient.EXPECT().DeleteMessage(sqsHandlerContext, &sqsDeleteMessageInput).AnyTimes()
  80. },
  81. )
  82. runSQSHandler(sqsHandlerContext)
  83. })
  84. Convey("by doing nothing if an error occurs upon receipt of the message", func(c C) {
  85. var wg sync.WaitGroup
  86. wg.Add(2)
  87. sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
  88. defer sqsCancelFunc()
  89. defer wg.Wait()
  90. numTimes := 0
  91. // When ReceiveMessage is called for the first time, the error has not had a chance to be logged yet.
  92. // Therefore, we opt to wait for the second call because we are guaranteed that the error was logged
  93. // by then.
  94. mockSQSClient.EXPECT().ReceiveMessage(sqsHandlerContext, &sqsReceiveMessageInput).MinTimes(2).DoAndReturn(
  95. func(ctx context.Context, input *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) {
  96. numTimes += 1
  97. if numTimes <= 2 {
  98. wg.Done()
  99. if numTimes == 2 {
  100. c.So(logBuffer.String(), ShouldContainSubstring, "SQSHandler: encountered error while polling for messages: error")
  101. }
  102. }
  103. return nil, errors.New("error")
  104. },
  105. )
  106. runSQSHandler(sqsHandlerContext)
  107. })
  108. Convey("by attempting to create a new sqs queue...", func() {
  109. clientId := "fake-id"
  110. sqsCreateQueueInput := sqs.CreateQueueInput{
  111. QueueName: aws.String("snowflake-client-fake-id"),
  112. }
  113. expectReceiveMessageReturnsValidMessage := func(sqsHandlerContext context.Context) {
  114. mockSQSClient.EXPECT().ReceiveMessage(sqsHandlerContext, &sqsReceiveMessageInput).AnyTimes().DoAndReturn(
  115. func(ctx context.Context, input *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) {
  116. return &sqs.ReceiveMessageOutput{
  117. Messages: []types.Message{
  118. {
  119. Body: messageBody,
  120. MessageAttributes: map[string]types.MessageAttributeValue{
  121. "ClientID": {StringValue: &clientId},
  122. },
  123. ReceiptHandle: &receiptHandle,
  124. },
  125. },
  126. }, nil
  127. },
  128. )
  129. }
  130. Convey("and does not attempt to send a message via SQS if queue creation fails.", func(c C) {
  131. var wg sync.WaitGroup
  132. wg.Add(2)
  133. sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
  134. defer sqsCancelFunc()
  135. defer wg.Wait()
  136. expectReceiveMessageReturnsValidMessage(sqsHandlerContext)
  137. mockSQSClient.EXPECT().CreateQueue(sqsHandlerContext, &sqsCreateQueueInput).Return(nil, errors.New("error")).AnyTimes()
  138. numTimes := 0
  139. mockSQSClient.EXPECT().DeleteMessage(sqsHandlerContext, &sqsDeleteMessageInput).MinTimes(2).Do(
  140. func(ctx context.Context, input *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) {
  141. numTimes += 1
  142. if numTimes <= 2 {
  143. wg.Done()
  144. if numTimes == 2 {
  145. c.So(logBuffer.String(), ShouldContainSubstring, "SQSHandler: error encountered when creating answer queue for client fake-id: error")
  146. }
  147. }
  148. },
  149. )
  150. runSQSHandler(sqsHandlerContext)
  151. })
  152. Convey("and responds with a proxy answer if available.", func(c C) {
  153. var wg sync.WaitGroup
  154. wg.Add(1)
  155. sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
  156. defer sqsCancelFunc()
  157. defer wg.Wait()
  158. expectReceiveMessageReturnsValidMessage(sqsHandlerContext)
  159. mockSQSClient.EXPECT().CreateQueue(sqsHandlerContext, &sqsCreateQueueInput).Return(&sqs.CreateQueueOutput{
  160. QueueUrl: responseQueueURL,
  161. }, nil).AnyTimes()
  162. mockSQSClient.EXPECT().DeleteMessage(sqsHandlerContext, &sqsDeleteMessageInput).AnyTimes()
  163. numTimes := 0
  164. mockSQSClient.EXPECT().SendMessage(sqsHandlerContext, gomock.Any()).MinTimes(1).DoAndReturn(
  165. func(ctx context.Context, input *sqs.SendMessageInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageOutput, error) {
  166. numTimes += 1
  167. if numTimes == 1 {
  168. c.So(input.MessageBody, ShouldEqual, aws.String("{\"answer\":\"fake answer\"}"))
  169. // Ensure that match is correctly recorded in metrics
  170. ipcCtx.metrics.printMetrics()
  171. c.So(buf.String(), ShouldContainSubstring, `client-denied-count 0
  172. client-restricted-denied-count 0
  173. client-unrestricted-denied-count 0
  174. client-snowflake-match-count 8
  175. client-http-count 0
  176. client-http-ips
  177. client-ampcache-count 0
  178. client-ampcache-ips
  179. client-sqs-count 8
  180. client-sqs-ips ??=8
  181. `)
  182. wg.Done()
  183. }
  184. return &sqs.SendMessageOutput{}, nil
  185. },
  186. )
  187. runSQSHandler(sqsHandlerContext)
  188. snowflake := ipcCtx.AddSnowflake("fake", "", NATUnrestricted, 0)
  189. offer := <-snowflake.offerChannel
  190. So(offer.sdp, ShouldResemble, []byte("fake"))
  191. snowflake.answerChannel <- "fake answer"
  192. })
  193. })
  194. })
  195. Convey("Cleans up SQS client queues...", func() {
  196. brokerSQSQueueName := "example-name"
  197. responseQueueURL := aws.String("https://sqs.us-east-1.amazonaws.com/testing")
  198. ctrl := gomock.NewController(t)
  199. mockSQSClient := sqsclient.NewMockSQSClient(ctrl)
  200. runSQSHandler := func(sqsHandlerContext context.Context) {
  201. mockSQSClient.EXPECT().CreateQueue(sqsHandlerContext, &sqs.CreateQueueInput{
  202. QueueName: aws.String(brokerSQSQueueName),
  203. Attributes: map[string]string{
  204. "MessageRetentionPeriod": strconv.FormatInt(int64((5 * time.Minute).Seconds()), 10),
  205. },
  206. }).Return(&sqs.CreateQueueOutput{
  207. QueueUrl: responseQueueURL,
  208. }, nil).Times(1)
  209. mockSQSClient.EXPECT().ReceiveMessage(sqsHandlerContext, gomock.Any()).AnyTimes().Return(
  210. &sqs.ReceiveMessageOutput{
  211. Messages: []types.Message{},
  212. }, nil,
  213. )
  214. sqsHandler, err := newSQSHandler(sqsHandlerContext, mockSQSClient, brokerSQSQueueName, "example-region", i)
  215. So(err, ShouldBeNil)
  216. // Set the cleanup interval to 1 ns so we can immediately test the cleanup logic
  217. sqsHandler.cleanupInterval = time.Nanosecond
  218. go sqsHandler.PollAndHandleMessages(sqsHandlerContext)
  219. }
  220. Convey("does nothing if there are no open queues.", func() {
  221. var wg sync.WaitGroup
  222. wg.Add(1)
  223. sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
  224. defer wg.Wait()
  225. mockSQSClient.EXPECT().ListQueues(sqsHandlerContext, &sqs.ListQueuesInput{
  226. QueueNamePrefix: aws.String("snowflake-client-"),
  227. MaxResults: aws.Int32(1000),
  228. NextToken: nil,
  229. }).DoAndReturn(func(ctx context.Context, input *sqs.ListQueuesInput, optFns ...func(*sqs.Options)) (*sqs.ListQueuesOutput, error) {
  230. wg.Done()
  231. // Cancel the handler context since we are only interested in testing one iteration of the cleanup
  232. sqsCancelFunc()
  233. return &sqs.ListQueuesOutput{
  234. QueueUrls: []string{},
  235. }, nil
  236. })
  237. runSQSHandler(sqsHandlerContext)
  238. })
  239. Convey("deletes open queue when there is one open queue.", func(c C) {
  240. var wg sync.WaitGroup
  241. wg.Add(1)
  242. sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
  243. clientQueueUrl1 := "https://sqs.us-east-1.amazonaws.com/snowflake-client-1"
  244. clientQueueUrl2 := "https://sqs.us-east-1.amazonaws.com/snowflake-client-2"
  245. gomock.InOrder(
  246. mockSQSClient.EXPECT().ListQueues(sqsHandlerContext, &sqs.ListQueuesInput{
  247. QueueNamePrefix: aws.String("snowflake-client-"),
  248. MaxResults: aws.Int32(1000),
  249. NextToken: nil,
  250. }).Times(1).Return(&sqs.ListQueuesOutput{
  251. QueueUrls: []string{
  252. clientQueueUrl1,
  253. clientQueueUrl2,
  254. },
  255. }, nil),
  256. mockSQSClient.EXPECT().ListQueues(sqsHandlerContext, &sqs.ListQueuesInput{
  257. QueueNamePrefix: aws.String("snowflake-client-"),
  258. MaxResults: aws.Int32(1000),
  259. NextToken: nil,
  260. }).Times(1).DoAndReturn(func(ctx context.Context, input *sqs.ListQueuesInput, optFns ...func(*sqs.Options)) (*sqs.ListQueuesOutput, error) {
  261. // Executed on second iteration of cleanupClientQueues loop. This means that one full iteration has completed and we can verify the results of that iteration
  262. wg.Done()
  263. sqsCancelFunc()
  264. c.So(logBuffer.String(), ShouldContainSubstring, "SQSHandler: finished running iteration of client queue cleanup. found and deleted 2 client queues.")
  265. return &sqs.ListQueuesOutput{
  266. QueueUrls: []string{},
  267. }, nil
  268. }),
  269. )
  270. gomock.InOrder(
  271. mockSQSClient.EXPECT().GetQueueAttributes(sqsHandlerContext, &sqs.GetQueueAttributesInput{
  272. QueueUrl: aws.String(clientQueueUrl1),
  273. AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameLastModifiedTimestamp},
  274. }).Times(1).Return(&sqs.GetQueueAttributesOutput{
  275. Attributes: map[string]string{
  276. string(types.QueueAttributeNameLastModifiedTimestamp): "0",
  277. }}, nil),
  278. mockSQSClient.EXPECT().GetQueueAttributes(sqsHandlerContext, &sqs.GetQueueAttributesInput{
  279. QueueUrl: aws.String(clientQueueUrl2),
  280. AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameLastModifiedTimestamp},
  281. }).Times(1).Return(&sqs.GetQueueAttributesOutput{
  282. Attributes: map[string]string{
  283. string(types.QueueAttributeNameLastModifiedTimestamp): "0",
  284. }}, nil),
  285. )
  286. gomock.InOrder(
  287. mockSQSClient.EXPECT().DeleteQueue(sqsHandlerContext, &sqs.DeleteQueueInput{
  288. QueueUrl: aws.String(clientQueueUrl1),
  289. }).Return(&sqs.DeleteQueueOutput{}, nil),
  290. mockSQSClient.EXPECT().DeleteQueue(sqsHandlerContext, &sqs.DeleteQueueInput{
  291. QueueUrl: aws.String(clientQueueUrl2),
  292. }).Return(&sqs.DeleteQueueOutput{}, nil),
  293. )
  294. runSQSHandler(sqsHandlerContext)
  295. wg.Wait()
  296. })
  297. })
  298. })
  299. }