ntp_packet.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. import struct
  2. import time
  3. class NTPShort(object):
  4. seconds = 0
  5. fraction = 0
  6. def __init__(self, seconds, fraction):
  7. self.seconds = seconds
  8. self.fraction = fraction
  9. @staticmethod
  10. def from_bytes(data):
  11. if len(data) != 4:
  12. raise ValueError('data must contain 8 bytes')
  13. seconds, fraction = struct.unpack(b'>HH', data)
  14. return NTPShort(seconds, fraction)
  15. @staticmethod
  16. def from_float(value):
  17. seconds = int(value)
  18. fraction = int(round( (value - seconds) * 0x10000 ))
  19. if fraction > 0xffff:
  20. seconds += 1
  21. fraction = 0
  22. return NTPShort(seconds, fraction)
  23. def to_bytes(self):
  24. return struct.pack(b'>HH', self.seconds, self.fraction)
  25. def to_float(self):
  26. return self.seconds + float(self.fraction) / 65536.0
  27. def __repr__(self):
  28. return 'NTPShort({seconds}, {fraction})'.format(seconds=repr(self.seconds), fraction=repr(self.fraction))
  29. def __str__(self):
  30. return str(self.to_float())
  31. NTPShort.ZERO = NTPShort(0, 0)
  32. class NTPTimestamp(object):
  33. seconds = 0
  34. fraction = 0
  35. def __init__(self, seconds, fraction):
  36. self.seconds = int(seconds)
  37. self.fraction = int(fraction)
  38. @staticmethod
  39. def from_bytes(data):
  40. if len(data) != 8:
  41. raise ValueError('data must contain 8 bytes')
  42. seconds, fraction = struct.unpack(b'>II', data)
  43. return NTPTimestamp(seconds, fraction)
  44. @staticmethod
  45. def from_unix_timestamp(timestamp):
  46. seconds = int(timestamp)
  47. fraction = int(round( (timestamp - seconds) * 0x100000000 ))
  48. if fraction > 0xffffffff:
  49. fraction = 0xffffffff
  50. seconds += 0x83aa7e80
  51. seconds = (seconds & 0xffffffff)
  52. return NTPTimestamp(seconds, fraction)
  53. def to_bytes(self):
  54. return struct.pack(b'>II', self.seconds, self.fraction)
  55. def to_unix_timestamp(self):
  56. return self.seconds - 0x83aa7e80 + float(self.fraction) / 0x100000000
  57. def __repr__(self):
  58. return 'NTPTimestamp({seconds}, {fraction})'.format(seconds=repr(self.seconds), fraction=repr(self.fraction))
  59. def __str__(self):
  60. ts = self.to_unix_timestamp()
  61. secs = int(ts)
  62. frac = int(round( (ts - secs) * 1000000 ))
  63. if frac >= 1000000:
  64. secs += 1
  65. frac = 0
  66. return time.strftime('%Y-%m-%dT%H:%M:%S', time.gmtime(secs)) + '.{:06d}Z'.format(frac)
  67. NTPTimestamp.ZERO = NTPTimestamp(0, 0)
  68. class NTPPacket(object):
  69. leap_indicator = 0
  70. version = 3
  71. mode = 3
  72. stratum = 0
  73. poll = 0
  74. precision = 0
  75. root_delay = NTPShort.ZERO
  76. root_dispersion = NTPShort.ZERO
  77. reference_identifier = b"\x00\x00\x00\x00"
  78. reference_timestamp = NTPTimestamp.ZERO
  79. origin_timestamp = NTPTimestamp.ZERO
  80. receive_timestamp = NTPTimestamp.ZERO
  81. transmit_timestamp = NTPTimestamp.ZERO
  82. @staticmethod
  83. def from_bytes(data):
  84. if len(data) < 48 and len(data) != 68:
  85. raise ValueError('packet too short')
  86. packet = NTPPacket()
  87. (b1, packet.stratum, packet.poll, packet.precision) = struct.unpack(b'>BBbb', data[0:4])
  88. packet.leap_indicator = (b1 >> 6) & 0x3
  89. packet.version = (b1 >> 3) & 0x7
  90. packet.mode = b1 & 0x7
  91. packet.root_delay = NTPShort.from_bytes(data[4:8])
  92. packet.root_dispersion = NTPShort.from_bytes(data[8:12])
  93. packet.reference_identifier = data[12:16]
  94. packet.reference_timestamp = NTPTimestamp.from_bytes(data[16:24])
  95. packet.origin_timestamp = NTPTimestamp.from_bytes(data[24:32])
  96. packet.receive_timestamp = NTPTimestamp.from_bytes(data[32:40])
  97. packet.transmit_timestamp = NTPTimestamp.from_bytes(data[40:48])
  98. return packet
  99. def to_bytes(self):
  100. b1 = self.leap_indicator << 6 | self.version << 3 | self.mode
  101. data = struct.pack(b'>BBbb', b1, self.stratum, self.poll, self.precision)
  102. data += self.root_delay.to_bytes()
  103. data += self.root_dispersion.to_bytes()
  104. data += self.reference_identifier
  105. data += self.reference_timestamp.to_bytes()
  106. data += self.origin_timestamp.to_bytes()
  107. data += self.receive_timestamp.to_bytes()
  108. data += self.transmit_timestamp.to_bytes()
  109. return data
  110. def __repr__(self):
  111. fields = (
  112. 'leap_indicator',
  113. 'version',
  114. 'mode',
  115. 'stratum',
  116. 'poll',
  117. 'precision',
  118. 'root_delay',
  119. 'root_dispersion',
  120. 'reference_identifier',
  121. 'reference_timestamp',
  122. 'origin_timestamp',
  123. 'receive_timestamp',
  124. 'transmit_timestamp',
  125. )
  126. return 'NTPPacket(' + ', '.join([ field + '=' + repr(getattr(self, field)) for field in fields ]) + ')'