sqs.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. package main
  2. import (
  3. "context"
  4. "log"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/aws/aws-sdk-go-v2/aws"
  9. "github.com/aws/aws-sdk-go-v2/service/sqs"
  10. "github.com/aws/aws-sdk-go-v2/service/sqs/types"
  11. "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/messages"
  12. "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/sqsclient"
  13. "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/util"
  14. )
  15. const (
  16. cleanupThreshold = -2 * time.Minute
  17. )
  18. type sqsHandler struct {
  19. SQSClient sqsclient.SQSClient
  20. SQSQueueURL *string
  21. IPC *IPC
  22. cleanupInterval time.Duration
  23. }
  24. func (r *sqsHandler) pollMessages(ctx context.Context, chn chan<- *types.Message) {
  25. for {
  26. select {
  27. case <-ctx.Done():
  28. // if context is cancelled
  29. return
  30. default:
  31. res, err := r.SQSClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
  32. QueueUrl: r.SQSQueueURL,
  33. MaxNumberOfMessages: 10,
  34. WaitTimeSeconds: 15,
  35. MessageAttributeNames: []string{
  36. string(types.QueueAttributeNameAll),
  37. },
  38. })
  39. if err != nil {
  40. log.Printf("SQSHandler: encountered error while polling for messages: %v\n", err)
  41. continue
  42. }
  43. for _, message := range res.Messages {
  44. chn <- &message
  45. }
  46. }
  47. }
  48. }
  49. func (r *sqsHandler) cleanupClientQueues(ctx context.Context) {
  50. for range time.NewTicker(r.cleanupInterval).C {
  51. // Runs at fixed intervals to clean up any client queues that were last changed more than 2 minutes ago
  52. select {
  53. case <-ctx.Done():
  54. // if context is cancelled
  55. return
  56. default:
  57. queueURLsList := []string{}
  58. var nextToken *string
  59. for {
  60. res, err := r.SQSClient.ListQueues(ctx, &sqs.ListQueuesInput{
  61. QueueNamePrefix: aws.String("snowflake-client-"),
  62. MaxResults: aws.Int32(1000),
  63. NextToken: nextToken,
  64. })
  65. if err != nil {
  66. log.Printf("SQSHandler: encountered error while retrieving client queues to clean up: %v\n", err)
  67. // client queues will be cleaned up the next time the cleanup operation is triggered automatically
  68. break
  69. }
  70. queueURLsList = append(queueURLsList, res.QueueUrls...)
  71. if res.NextToken == nil {
  72. break
  73. } else {
  74. nextToken = res.NextToken
  75. }
  76. }
  77. numDeleted := 0
  78. cleanupCutoff := time.Now().Add(cleanupThreshold)
  79. for _, queueURL := range queueURLsList {
  80. if !strings.Contains(queueURL, "snowflake-client-") {
  81. continue
  82. }
  83. res, err := r.SQSClient.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
  84. QueueUrl: aws.String(queueURL),
  85. AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameLastModifiedTimestamp},
  86. })
  87. if err != nil {
  88. // According to the AWS SQS docs, the deletion process for a queue can take up to 60 seconds. So the queue
  89. // can be in the process of being deleted, but will still be returned by the ListQueues operation, but
  90. // fail when we try to GetQueueAttributes for the queue
  91. log.Printf("SQSHandler: encountered error while getting attribute of client queue %s. queue may already be deleted.\n", queueURL)
  92. continue
  93. }
  94. lastModifiedInt64, err := strconv.ParseInt(res.Attributes[string(types.QueueAttributeNameLastModifiedTimestamp)], 10, 64)
  95. if err != nil {
  96. log.Printf("SQSHandler: encountered invalid lastModifiedTimetamp value from client queue %s: %v\n", queueURL, err)
  97. continue
  98. }
  99. lastModified := time.Unix(lastModifiedInt64, 0)
  100. if lastModified.Before(cleanupCutoff) {
  101. _, err := r.SQSClient.DeleteQueue(ctx, &sqs.DeleteQueueInput{
  102. QueueUrl: aws.String(queueURL),
  103. })
  104. if err != nil {
  105. log.Printf("SQSHandler: encountered error when deleting client queue %s: %v\n", queueURL, err)
  106. continue
  107. } else {
  108. numDeleted += 1
  109. }
  110. }
  111. }
  112. log.Printf("SQSHandler: finished running iteration of client queue cleanup. found and deleted %d client queues.\n", numDeleted)
  113. }
  114. }
  115. }
  116. func (r *sqsHandler) handleMessage(context context.Context, message *types.Message) {
  117. var encPollReq []byte
  118. var response []byte
  119. var err error
  120. clientID := message.MessageAttributes["ClientID"].StringValue
  121. if clientID == nil {
  122. log.Println("SQSHandler: got SDP offer in SQS message with no client ID. ignoring this message.")
  123. return
  124. }
  125. res, err := r.SQSClient.CreateQueue(context, &sqs.CreateQueueInput{
  126. QueueName: aws.String("snowflake-client-" + *clientID),
  127. })
  128. if err != nil {
  129. log.Printf("SQSHandler: error encountered when creating answer queue for client %s: %v\n", *clientID, err)
  130. return
  131. }
  132. answerSQSURL := res.QueueUrl
  133. encPollReq = []byte(*message.Body)
  134. // Get best guess Client IP for geolocating
  135. remoteAddr := ""
  136. req, err := messages.DecodeClientPollRequest(encPollReq)
  137. if err != nil {
  138. log.Printf("SQSHandler: error encounted when decoding client poll request %s: %v\n", *clientID, err)
  139. } else {
  140. sdp, err := util.DeserializeSessionDescription(req.Offer)
  141. if err != nil {
  142. log.Printf("SQSHandler: error encounted when deserializing session desc %s: %v\n", *clientID, err)
  143. } else {
  144. candidateAddrs := util.GetCandidateAddrs(sdp.SDP)
  145. if len(candidateAddrs) > 0 {
  146. remoteAddr = candidateAddrs[0].String()
  147. }
  148. }
  149. }
  150. arg := messages.Arg{
  151. Body: encPollReq,
  152. RemoteAddr: remoteAddr,
  153. RendezvousMethod: messages.RendezvousSqs,
  154. }
  155. err = r.IPC.ClientOffers(arg, &response)
  156. if err != nil {
  157. log.Printf("SQSHandler: error encountered when handling message: %v\n", err)
  158. return
  159. }
  160. r.SQSClient.SendMessage(context, &sqs.SendMessageInput{
  161. QueueUrl: answerSQSURL,
  162. MessageBody: aws.String(string(response)),
  163. })
  164. }
  165. func (r *sqsHandler) deleteMessage(context context.Context, message *types.Message) {
  166. r.SQSClient.DeleteMessage(context, &sqs.DeleteMessageInput{
  167. QueueUrl: r.SQSQueueURL,
  168. ReceiptHandle: message.ReceiptHandle,
  169. })
  170. }
  171. func newSQSHandler(context context.Context, client sqsclient.SQSClient, sqsQueueName string, region string, i *IPC) (*sqsHandler, error) {
  172. // Creates the queue if a queue with the same name doesn't exist. If a queue with the same name and attributes
  173. // already exists, then nothing will happen. If a queue with the same name, but different attributes exists, then
  174. // an error will be returned
  175. res, err := client.CreateQueue(context, &sqs.CreateQueueInput{
  176. QueueName: aws.String(sqsQueueName),
  177. Attributes: map[string]string{
  178. "MessageRetentionPeriod": strconv.FormatInt(int64((5 * time.Minute).Seconds()), 10),
  179. },
  180. })
  181. if err != nil {
  182. return nil, err
  183. }
  184. return &sqsHandler{
  185. SQSClient: client,
  186. SQSQueueURL: res.QueueUrl,
  187. IPC: i,
  188. cleanupInterval: time.Second * 30,
  189. }, nil
  190. }
  191. func (r *sqsHandler) PollAndHandleMessages(ctx context.Context) {
  192. log.Println("SQSHandler: Starting to poll for messages at: " + *r.SQSQueueURL)
  193. messagesChn := make(chan *types.Message, 2)
  194. go r.pollMessages(ctx, messagesChn)
  195. go r.cleanupClientQueues(ctx)
  196. for message := range messagesChn {
  197. select {
  198. case <-ctx.Done():
  199. // if context is cancelled
  200. return
  201. default:
  202. r.handleMessage(ctx, message)
  203. r.deleteMessage(ctx, message)
  204. }
  205. }
  206. }