sqs_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "log"
  7. "strconv"
  8. "sync"
  9. "sync/atomic"
  10. "testing"
  11. "time"
  12. "github.com/aws/aws-sdk-go-v2/aws"
  13. "github.com/aws/aws-sdk-go-v2/service/sqs"
  14. "github.com/aws/aws-sdk-go-v2/service/sqs/types"
  15. "github.com/golang/mock/gomock"
  16. . "github.com/smartystreets/goconvey/convey"
  17. "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/sqsclient"
  18. )
  19. func TestSQS(t *testing.T) {
  20. Convey("Context", t, func() {
  21. buf := new(bytes.Buffer)
  22. ipcCtx := NewBrokerContext(log.New(buf, "", 0), "")
  23. i := &IPC{ipcCtx}
  24. Convey("Responds to SQS client offers...", func() {
  25. ctrl := gomock.NewController(t)
  26. mockSQSClient := sqsclient.NewMockSQSClient(ctrl)
  27. brokerSQSQueueName := "example-name"
  28. responseQueueURL := aws.String("https://sqs.us-east-1.amazonaws.com/testing")
  29. runSQSHandler := func(sqsHandlerContext context.Context) {
  30. mockSQSClient.EXPECT().CreateQueue(sqsHandlerContext, &sqs.CreateQueueInput{
  31. QueueName: aws.String(brokerSQSQueueName),
  32. Attributes: map[string]string{
  33. "MessageRetentionPeriod": strconv.FormatInt(int64((5 * time.Minute).Seconds()), 10),
  34. },
  35. }).Return(&sqs.CreateQueueOutput{
  36. QueueUrl: responseQueueURL,
  37. }, nil).Times(1)
  38. sqsHandler, err := newSQSHandler(sqsHandlerContext, mockSQSClient, brokerSQSQueueName, "example-region", i)
  39. So(err, ShouldBeNil)
  40. go sqsHandler.PollAndHandleMessages(sqsHandlerContext)
  41. }
  42. messageBody := aws.String("1.0\n{\"offer\": \"fake\", \"nat\": \"unknown\"}")
  43. receiptHandle := "fake-receipt-handle"
  44. sqsReceiveMessageInput := sqs.ReceiveMessageInput{
  45. QueueUrl: responseQueueURL,
  46. MaxNumberOfMessages: 10,
  47. WaitTimeSeconds: 15,
  48. MessageAttributeNames: []string{
  49. string(types.QueueAttributeNameAll),
  50. },
  51. }
  52. sqsDeleteMessageInput := sqs.DeleteMessageInput{
  53. QueueUrl: responseQueueURL,
  54. ReceiptHandle: &receiptHandle,
  55. }
  56. Convey("by ignoring it if no client id specified", func(c C) {
  57. sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
  58. mockSQSClient.EXPECT().ReceiveMessage(sqsHandlerContext, &sqsReceiveMessageInput).MinTimes(1).DoAndReturn(
  59. func(ctx context.Context, input *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) {
  60. return &sqs.ReceiveMessageOutput{
  61. Messages: []types.Message{
  62. {
  63. Body: messageBody,
  64. ReceiptHandle: &receiptHandle,
  65. },
  66. },
  67. }, nil
  68. },
  69. )
  70. mockSQSClient.EXPECT().DeleteMessage(sqsHandlerContext, &sqsDeleteMessageInput).MinTimes(1).Do(
  71. func(ctx context.Context, input *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) {
  72. sqsCancelFunc()
  73. },
  74. )
  75. // We expect no queues to be created
  76. mockSQSClient.EXPECT().CreateQueue(gomock.Any(), gomock.Any()).Times(0)
  77. runSQSHandler(sqsHandlerContext)
  78. <-sqsHandlerContext.Done()
  79. })
  80. Convey("by doing nothing if an error occurs upon receipt of the message", func(c C) {
  81. sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
  82. mockSQSClient.EXPECT().ReceiveMessage(sqsHandlerContext, &sqsReceiveMessageInput).MinTimes(1).DoAndReturn(
  83. func(ctx context.Context, input *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) {
  84. sqsCancelFunc()
  85. return nil, errors.New("error")
  86. },
  87. )
  88. // We expect no queues to be created or deleted
  89. mockSQSClient.EXPECT().CreateQueue(gomock.Any(), gomock.Any()).Times(0)
  90. mockSQSClient.EXPECT().DeleteMessage(gomock.Any(), gomock.Any()).Times(0)
  91. runSQSHandler(sqsHandlerContext)
  92. <-sqsHandlerContext.Done()
  93. })
  94. Convey("by attempting to create a new sqs queue...", func() {
  95. clientId := "fake-id"
  96. sqsCreateQueueInput := sqs.CreateQueueInput{
  97. QueueName: aws.String("snowflake-client-fake-id"),
  98. }
  99. validMessage := &sqs.ReceiveMessageOutput{
  100. Messages: []types.Message{
  101. {
  102. Body: messageBody,
  103. MessageAttributes: map[string]types.MessageAttributeValue{
  104. "ClientID": {StringValue: &clientId},
  105. },
  106. ReceiptHandle: &receiptHandle,
  107. },
  108. },
  109. }
  110. Convey("and does not attempt to send a message via SQS if queue creation fails.", func(c C) {
  111. sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
  112. mockSQSClient.EXPECT().ReceiveMessage(sqsHandlerContext, &sqsReceiveMessageInput).AnyTimes().DoAndReturn(
  113. func(ctx context.Context, input *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) {
  114. sqsCancelFunc()
  115. return validMessage, nil
  116. })
  117. mockSQSClient.EXPECT().CreateQueue(sqsHandlerContext, &sqsCreateQueueInput).Return(nil, errors.New("error")).AnyTimes()
  118. mockSQSClient.EXPECT().DeleteMessage(sqsHandlerContext, &sqsDeleteMessageInput).AnyTimes()
  119. runSQSHandler(sqsHandlerContext)
  120. <-sqsHandlerContext.Done()
  121. })
  122. Convey("and responds with a proxy answer if available.", func(c C) {
  123. sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
  124. var numTimes atomic.Uint32
  125. mockSQSClient.EXPECT().ReceiveMessage(gomock.Any(), &sqsReceiveMessageInput).AnyTimes().DoAndReturn(
  126. func(ctx context.Context, input *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) {
  127. n := numTimes.Add(1)
  128. if n == 1 {
  129. snowflake := ipcCtx.AddSnowflake("fake", "", NATUnrestricted, 0)
  130. go func(c C) {
  131. <-snowflake.offerChannel
  132. snowflake.answerChannel <- "fake answer"
  133. }(c)
  134. return validMessage, nil
  135. }
  136. return nil, errors.New("error")
  137. })
  138. mockSQSClient.EXPECT().CreateQueue(gomock.Any(), &sqsCreateQueueInput).Return(&sqs.CreateQueueOutput{
  139. QueueUrl: responseQueueURL,
  140. }, nil).AnyTimes()
  141. mockSQSClient.EXPECT().DeleteMessage(gomock.Any(), gomock.Any()).AnyTimes()
  142. mockSQSClient.EXPECT().SendMessage(gomock.Any(), gomock.Any()).Times(1).DoAndReturn(
  143. func(ctx context.Context, input *sqs.SendMessageInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageOutput, error) {
  144. c.So(input.MessageBody, ShouldEqual, aws.String("{\"answer\":\"fake answer\"}"))
  145. // Ensure that match is correctly recorded in metrics
  146. ipcCtx.metrics.printMetrics()
  147. c.So(buf.String(), ShouldContainSubstring, `client-denied-count 0
  148. client-restricted-denied-count 0
  149. client-unrestricted-denied-count 0
  150. client-snowflake-match-count 8
  151. client-snowflake-timeout-count 0
  152. client-http-count 0
  153. client-http-ips
  154. client-ampcache-count 0
  155. client-ampcache-ips
  156. client-sqs-count 8
  157. client-sqs-ips ??=8
  158. `)
  159. sqsCancelFunc()
  160. return &sqs.SendMessageOutput{}, nil
  161. },
  162. )
  163. runSQSHandler(sqsHandlerContext)
  164. <-sqsHandlerContext.Done()
  165. })
  166. })
  167. })
  168. Convey("Cleans up SQS client queues...", func() {
  169. brokerSQSQueueName := "example-name"
  170. responseQueueURL := aws.String("https://sqs.us-east-1.amazonaws.com/testing")
  171. ctrl := gomock.NewController(t)
  172. mockSQSClient := sqsclient.NewMockSQSClient(ctrl)
  173. runSQSHandler := func(sqsHandlerContext context.Context) {
  174. mockSQSClient.EXPECT().CreateQueue(sqsHandlerContext, &sqs.CreateQueueInput{
  175. QueueName: aws.String(brokerSQSQueueName),
  176. Attributes: map[string]string{
  177. "MessageRetentionPeriod": strconv.FormatInt(int64((5 * time.Minute).Seconds()), 10),
  178. },
  179. }).Return(&sqs.CreateQueueOutput{
  180. QueueUrl: responseQueueURL,
  181. }, nil).Times(1)
  182. mockSQSClient.EXPECT().ReceiveMessage(sqsHandlerContext, gomock.Any()).AnyTimes().Return(
  183. &sqs.ReceiveMessageOutput{
  184. Messages: []types.Message{},
  185. }, nil,
  186. )
  187. sqsHandler, err := newSQSHandler(sqsHandlerContext, mockSQSClient, brokerSQSQueueName, "example-region", i)
  188. So(err, ShouldBeNil)
  189. // Set the cleanup interval to 1 ns so we can immediately test the cleanup logic
  190. sqsHandler.cleanupInterval = time.Nanosecond
  191. go sqsHandler.PollAndHandleMessages(sqsHandlerContext)
  192. }
  193. Convey("does nothing if there are no open queues.", func() {
  194. var wg sync.WaitGroup
  195. wg.Add(1)
  196. sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
  197. defer wg.Wait()
  198. mockSQSClient.EXPECT().ListQueues(sqsHandlerContext, &sqs.ListQueuesInput{
  199. QueueNamePrefix: aws.String("snowflake-client-"),
  200. MaxResults: aws.Int32(1000),
  201. NextToken: nil,
  202. }).DoAndReturn(func(ctx context.Context, input *sqs.ListQueuesInput, optFns ...func(*sqs.Options)) (*sqs.ListQueuesOutput, error) {
  203. wg.Done()
  204. // Cancel the handler context since we are only interested in testing one iteration of the cleanup
  205. sqsCancelFunc()
  206. return &sqs.ListQueuesOutput{
  207. QueueUrls: []string{},
  208. }, nil
  209. })
  210. runSQSHandler(sqsHandlerContext)
  211. })
  212. Convey("deletes open queue when there is one open queue.", func(c C) {
  213. var wg sync.WaitGroup
  214. wg.Add(1)
  215. sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
  216. clientQueueUrl1 := "https://sqs.us-east-1.amazonaws.com/snowflake-client-1"
  217. clientQueueUrl2 := "https://sqs.us-east-1.amazonaws.com/snowflake-client-2"
  218. gomock.InOrder(
  219. mockSQSClient.EXPECT().ListQueues(sqsHandlerContext, &sqs.ListQueuesInput{
  220. QueueNamePrefix: aws.String("snowflake-client-"),
  221. MaxResults: aws.Int32(1000),
  222. NextToken: nil,
  223. }).Times(1).Return(&sqs.ListQueuesOutput{
  224. QueueUrls: []string{
  225. clientQueueUrl1,
  226. clientQueueUrl2,
  227. },
  228. }, nil),
  229. mockSQSClient.EXPECT().ListQueues(sqsHandlerContext, &sqs.ListQueuesInput{
  230. QueueNamePrefix: aws.String("snowflake-client-"),
  231. MaxResults: aws.Int32(1000),
  232. NextToken: nil,
  233. }).Times(1).DoAndReturn(func(ctx context.Context, input *sqs.ListQueuesInput, optFns ...func(*sqs.Options)) (*sqs.ListQueuesOutput, error) {
  234. // Executed on second iteration of cleanupClientQueues loop. This means that one full iteration has completed and we can verify the results of that iteration
  235. wg.Done()
  236. sqsCancelFunc()
  237. return &sqs.ListQueuesOutput{
  238. QueueUrls: []string{},
  239. }, nil
  240. }),
  241. )
  242. gomock.InOrder(
  243. mockSQSClient.EXPECT().GetQueueAttributes(sqsHandlerContext, &sqs.GetQueueAttributesInput{
  244. QueueUrl: aws.String(clientQueueUrl1),
  245. AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameLastModifiedTimestamp},
  246. }).Times(1).Return(&sqs.GetQueueAttributesOutput{
  247. Attributes: map[string]string{
  248. string(types.QueueAttributeNameLastModifiedTimestamp): "0",
  249. }}, nil),
  250. mockSQSClient.EXPECT().GetQueueAttributes(sqsHandlerContext, &sqs.GetQueueAttributesInput{
  251. QueueUrl: aws.String(clientQueueUrl2),
  252. AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameLastModifiedTimestamp},
  253. }).Times(1).Return(&sqs.GetQueueAttributesOutput{
  254. Attributes: map[string]string{
  255. string(types.QueueAttributeNameLastModifiedTimestamp): "0",
  256. }}, nil),
  257. )
  258. gomock.InOrder(
  259. mockSQSClient.EXPECT().DeleteQueue(sqsHandlerContext, &sqs.DeleteQueueInput{
  260. QueueUrl: aws.String(clientQueueUrl1),
  261. }).Return(&sqs.DeleteQueueOutput{}, nil),
  262. mockSQSClient.EXPECT().DeleteQueue(sqsHandlerContext, &sqs.DeleteQueueInput{
  263. QueueUrl: aws.String(clientQueueUrl2),
  264. }).Return(&sqs.DeleteQueueOutput{}, nil),
  265. )
  266. runSQSHandler(sqsHandlerContext)
  267. wg.Wait()
  268. })
  269. })
  270. })
  271. }