WebSocketServer.gd 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. class_name WebSocketServer
  2. extends Node
  3. signal message_received(peer_id: int, message: String)
  4. signal client_connected(peer_id: int)
  5. signal client_disconnected(peer_id: int)
  6. @export var handshake_headers := PackedStringArray()
  7. @export var supported_protocols := PackedStringArray()
  8. @export var handshake_timout := 3000
  9. @export var use_tls := false
  10. @export var tls_cert: X509Certificate
  11. @export var tls_key: CryptoKey
  12. @export var refuse_new_connections := false:
  13. set(refuse):
  14. if refuse:
  15. pending_peers.clear()
  16. class PendingPeer:
  17. var connect_time: int
  18. var tcp: StreamPeerTCP
  19. var connection: StreamPeer
  20. var ws: WebSocketPeer
  21. func _init(p_tcp: StreamPeerTCP) -> void:
  22. tcp = p_tcp
  23. connection = p_tcp
  24. connect_time = Time.get_ticks_msec()
  25. var tcp_server := TCPServer.new()
  26. var pending_peers: Array[PendingPeer] = []
  27. var peers: Dictionary
  28. func listen(port: int) -> int:
  29. assert(not tcp_server.is_listening())
  30. return tcp_server.listen(port)
  31. func stop() -> void:
  32. tcp_server.stop()
  33. pending_peers.clear()
  34. peers.clear()
  35. func send(peer_id: int, message: String) -> int:
  36. var type := typeof(message)
  37. if peer_id <= 0:
  38. # Send to multiple peers, (zero = broadcast, negative = exclude one).
  39. for id: int in peers:
  40. if id == -peer_id:
  41. continue
  42. if type == TYPE_STRING:
  43. peers[id].send_text(message)
  44. else:
  45. peers[id].put_packet(message)
  46. return OK
  47. assert(peers.has(peer_id))
  48. var socket: WebSocketPeer = peers[peer_id]
  49. if type == TYPE_STRING:
  50. return socket.send_text(message)
  51. return socket.send(var_to_bytes(message))
  52. func get_message(peer_id: int) -> Variant:
  53. assert(peers.has(peer_id))
  54. var socket: WebSocketPeer = peers[peer_id]
  55. if socket.get_available_packet_count() < 1:
  56. return null
  57. var pkt: PackedByteArray = socket.get_packet()
  58. if socket.was_string_packet():
  59. return pkt.get_string_from_utf8()
  60. return bytes_to_var(pkt)
  61. func has_message(peer_id: int) -> bool:
  62. assert(peers.has(peer_id))
  63. return peers[peer_id].get_available_packet_count() > 0
  64. func _create_peer() -> WebSocketPeer:
  65. var ws := WebSocketPeer.new()
  66. ws.supported_protocols = supported_protocols
  67. ws.handshake_headers = handshake_headers
  68. return ws
  69. func poll() -> void:
  70. if not tcp_server.is_listening():
  71. return
  72. while not refuse_new_connections and tcp_server.is_connection_available():
  73. var conn: StreamPeerTCP = tcp_server.take_connection()
  74. assert(conn != null)
  75. pending_peers.append(PendingPeer.new(conn))
  76. var to_remove := []
  77. for p in pending_peers:
  78. if not _connect_pending(p):
  79. if p.connect_time + handshake_timout < Time.get_ticks_msec():
  80. # Timeout.
  81. to_remove.append(p)
  82. continue # Still pending.
  83. to_remove.append(p)
  84. for r: RefCounted in to_remove:
  85. pending_peers.erase(r)
  86. to_remove.clear()
  87. for id: int in peers:
  88. var p: WebSocketPeer = peers[id]
  89. p.poll()
  90. if p.get_ready_state() != WebSocketPeer.STATE_OPEN:
  91. client_disconnected.emit(id)
  92. to_remove.append(id)
  93. continue
  94. while p.get_available_packet_count():
  95. message_received.emit(id, get_message(id))
  96. for r: int in to_remove:
  97. peers.erase(r)
  98. to_remove.clear()
  99. func _connect_pending(p: PendingPeer) -> bool:
  100. if p.ws != null:
  101. # Poll websocket client if doing handshake.
  102. p.ws.poll()
  103. var state := p.ws.get_ready_state()
  104. if state == WebSocketPeer.STATE_OPEN:
  105. var id := randi_range(2, 1 << 30)
  106. peers[id] = p.ws
  107. client_connected.emit(id)
  108. return true # Success.
  109. elif state != WebSocketPeer.STATE_CONNECTING:
  110. return true # Failure.
  111. return false # Still connecting.
  112. elif p.tcp.get_status() != StreamPeerTCP.STATUS_CONNECTED:
  113. return true # TCP disconnected.
  114. elif not use_tls:
  115. # TCP is ready, create WS peer.
  116. p.ws = _create_peer()
  117. p.ws.accept_stream(p.tcp)
  118. return false # WebSocketPeer connection is pending.
  119. else:
  120. if p.connection == p.tcp:
  121. assert(tls_key != null and tls_cert != null)
  122. var tls := StreamPeerTLS.new()
  123. tls.accept_stream(p.tcp, TLSOptions.server(tls_key, tls_cert))
  124. p.connection = tls
  125. p.connection.poll()
  126. var status: StreamPeerTLS.Status = p.connection.get_status()
  127. if status == StreamPeerTLS.STATUS_CONNECTED:
  128. p.ws = _create_peer()
  129. p.ws.accept_stream(p.connection)
  130. return false # WebSocketPeer connection is pending.
  131. if status != StreamPeerTLS.STATUS_HANDSHAKING:
  132. return true # Failure.
  133. return false
  134. func _process(_delta: float) -> void:
  135. poll()