serial.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. from pyprofibus.compat import *
  2. import time
  3. PARITY_EVEN = "E"
  4. PARITY_ODD = "O"
  5. STOPBITS_ONE = 1
  6. STOPBITS_TWO = 2
  7. class SerialException(Exception):
  8. pass
  9. class Serial(object):
  10. def __init__(self):
  11. self.__isMicropython = isMicropython
  12. self.port = "/dev/ttyS0"
  13. self.__portNum = None
  14. self.baudrate = 9600
  15. self.bytesize = 8
  16. self.parity = PARITY_EVEN
  17. self.stopbits = STOPBITS_ONE
  18. self.timeout = 0
  19. self.xonxoff = False
  20. self.rtscts = False
  21. self.dsrdtr = False
  22. self.__lowlevel = None
  23. def open(self):
  24. if self.__isMicropython:
  25. port = self.port
  26. for sub in ("/dev/ttyS", "/dev/ttyUSB", "/dev/ttyACM", "COM", "UART", ):
  27. port = port.replace(sub, "")
  28. try:
  29. self.__portNum = int(port.strip())
  30. except ValueError:
  31. raise SerialException("Invalid port: %s" % self.port)
  32. try:
  33. import machine
  34. self.__lowlevel = machine.UART(
  35. self.__portNum,
  36. self.baudrate,
  37. self.bytesize,
  38. 0 if self.parity == PARITY_EVEN else 1,
  39. 1 if self.stopbits == STOPBITS_ONE else 2)
  40. print("Opened machine.UART(%d)" % self.__portNum)
  41. except Exception as e:
  42. raise SerialException("UART%d: Failed to open:\n%s" % (
  43. self.__portNum, str(e)))
  44. return
  45. raise NotImplementedError
  46. def close(self):
  47. if self.__isMicropython:
  48. try:
  49. if self.__lowlevel is not None:
  50. self.__lowlevel.deinit()
  51. self.__lowlevel = None
  52. print("Closed machine.UART(%d)" % self.__portNum)
  53. except Exception as e:
  54. raise SerialException("UART%d: Failed to close:\n%s" % (
  55. self.__portNum, str(e)))
  56. return
  57. raise NotImplementedError
  58. def write(self, data):
  59. if self.__isMicropython:
  60. try:
  61. self.__lowlevel.write(data)
  62. except Exception as e:
  63. raise SerialException("UART%d write(%d bytes) failed: %s" % (
  64. self.__portNum, len(data), str(e)))
  65. return
  66. raise NotImplementedError
  67. def read(self, size=1):
  68. if self.__isMicropython:
  69. try:
  70. data = self.__lowlevel.read(size)
  71. if data is None:
  72. return b""
  73. return data
  74. except Exception as e:
  75. raise SerialException("UART%d read(%d bytes) failed: %s" % (
  76. self.__portNum, size, str(e)))
  77. raise NotImplementedError
  78. def flushInput(self):
  79. if self.__isMicropython:
  80. while self.__lowlevel.any():
  81. self.__lowlevel.read()
  82. return
  83. raise NotImplementedError
  84. def flushOutput(self):
  85. if self.__isMicropython:
  86. time.sleep(0.01)
  87. return
  88. raise NotImplementedError