util.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. package lib
  2. import (
  3. "encoding/json"
  4. "log"
  5. "time"
  6. "github.com/pion/webrtc"
  7. )
  8. const (
  9. LogTimeInterval = 5
  10. )
  11. type BytesLogger interface {
  12. Log()
  13. AddOutbound(int)
  14. AddInbound(int)
  15. }
  16. // Default BytesLogger does nothing.
  17. type BytesNullLogger struct{}
  18. func (b BytesNullLogger) Log() {}
  19. func (b BytesNullLogger) AddOutbound(amount int) {}
  20. func (b BytesNullLogger) AddInbound(amount int) {}
  21. // BytesSyncLogger uses channels to safely log from multiple sources with output
  22. // occuring at reasonable intervals.
  23. type BytesSyncLogger struct {
  24. OutboundChan chan int
  25. InboundChan chan int
  26. Outbound int
  27. Inbound int
  28. OutEvents int
  29. InEvents int
  30. IsLogging bool
  31. }
  32. func (b *BytesSyncLogger) Log() {
  33. b.IsLogging = true
  34. var amount int
  35. output := func() {
  36. log.Printf("Traffic Bytes (in|out): %d | %d -- (%d OnMessages, %d Sends)",
  37. b.Inbound, b.Outbound, b.InEvents, b.OutEvents)
  38. b.Outbound = 0
  39. b.OutEvents = 0
  40. b.Inbound = 0
  41. b.InEvents = 0
  42. }
  43. last := time.Now()
  44. for {
  45. select {
  46. case amount = <-b.OutboundChan:
  47. b.Outbound += amount
  48. b.OutEvents++
  49. if time.Since(last) > time.Second*LogTimeInterval {
  50. last = time.Now()
  51. output()
  52. }
  53. case amount = <-b.InboundChan:
  54. b.Inbound += amount
  55. b.InEvents++
  56. if time.Since(last) > time.Second*LogTimeInterval {
  57. last = time.Now()
  58. output()
  59. }
  60. case <-time.After(time.Second * LogTimeInterval):
  61. if b.InEvents > 0 || b.OutEvents > 0 {
  62. output()
  63. }
  64. }
  65. }
  66. }
  67. func (b *BytesSyncLogger) AddOutbound(amount int) {
  68. if !b.IsLogging {
  69. return
  70. }
  71. b.OutboundChan <- amount
  72. }
  73. func (b *BytesSyncLogger) AddInbound(amount int) {
  74. if !b.IsLogging {
  75. return
  76. }
  77. b.InboundChan <- amount
  78. }
  79. func deserializeSessionDescription(msg string) *webrtc.SessionDescription {
  80. var parsed map[string]interface{}
  81. err := json.Unmarshal([]byte(msg), &parsed)
  82. if nil != err {
  83. log.Println(err)
  84. return nil
  85. }
  86. if _, ok := parsed["type"]; !ok {
  87. log.Println("Cannot deserialize SessionDescription without type field.")
  88. return nil
  89. }
  90. if _, ok := parsed["sdp"]; !ok {
  91. log.Println("Cannot deserialize SessionDescription without sdp field.")
  92. return nil
  93. }
  94. var stype webrtc.SDPType
  95. switch parsed["type"].(string) {
  96. default:
  97. log.Println("Unknown SDP type")
  98. return nil
  99. case "offer":
  100. stype = webrtc.SDPTypeOffer
  101. case "pranswer":
  102. stype = webrtc.SDPTypePranswer
  103. case "answer":
  104. stype = webrtc.SDPTypeAnswer
  105. case "rollback":
  106. stype = webrtc.SDPTypeRollback
  107. }
  108. if err != nil {
  109. log.Println(err)
  110. return nil
  111. }
  112. return &webrtc.SessionDescription{
  113. Type: stype,
  114. SDP: parsed["sdp"].(string),
  115. }
  116. }
  117. func serializeSessionDescription(desc *webrtc.SessionDescription) string {
  118. bytes, err := json.Marshal(*desc)
  119. if nil != err {
  120. log.Println(err)
  121. return ""
  122. }
  123. return string(bytes)
  124. }