1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- from pyprofibus.compat import *
- import time
- PARITY_EVEN = "E"
- PARITY_ODD = "O"
- STOPBITS_ONE = 1
- STOPBITS_TWO = 2
- class SerialException(Exception):
- pass
- class Serial(object):
- def __init__(self):
- self.__isMicropython = isMicropython
- self.port = "/dev/ttyS0"
- self.__portNum = None
- self.baudrate = 9600
- self.bytesize = 8
- self.parity = PARITY_EVEN
- self.stopbits = STOPBITS_ONE
- self.timeout = 0
- self.xonxoff = False
- self.rtscts = False
- self.dsrdtr = False
- self.__lowlevel = None
- def open(self):
- if self.__isMicropython:
- port = self.port
- for sub in ("/dev/ttyS", "/dev/ttyUSB", "/dev/ttyACM", "COM", "UART", ):
- port = port.replace(sub, "")
- try:
- self.__portNum = int(port.strip())
- except ValueError:
- raise SerialException("Invalid port: %s" % self.port)
- try:
- import machine
- self.__lowlevel = machine.UART(
- self.__portNum,
- self.baudrate,
- self.bytesize,
- 0 if self.parity == PARITY_EVEN else 1,
- 1 if self.stopbits == STOPBITS_ONE else 2)
- print("Opened machine.UART(%d)" % self.__portNum)
- except Exception as e:
- raise SerialException("UART%d: Failed to open:\n%s" % (
- self.__portNum, str(e)))
- return
- raise NotImplementedError
- def close(self):
- if self.__isMicropython:
- try:
- if self.__lowlevel is not None:
- self.__lowlevel.deinit()
- self.__lowlevel = None
- print("Closed machine.UART(%d)" % self.__portNum)
- except Exception as e:
- raise SerialException("UART%d: Failed to close:\n%s" % (
- self.__portNum, str(e)))
- return
- raise NotImplementedError
- def write(self, data):
- if self.__isMicropython:
- try:
- self.__lowlevel.write(data)
- except Exception as e:
- raise SerialException("UART%d write(%d bytes) failed: %s" % (
- self.__portNum, len(data), str(e)))
- return
- raise NotImplementedError
- def read(self, size=1):
- if self.__isMicropython:
- try:
- data = self.__lowlevel.read(size)
- if data is None:
- return b""
- return data
- except Exception as e:
- raise SerialException("UART%d read(%d bytes) failed: %s" % (
- self.__portNum, size, str(e)))
- raise NotImplementedError
- def flushInput(self):
- if self.__isMicropython:
- while self.__lowlevel.any():
- self.__lowlevel.read()
- return
- raise NotImplementedError
- def flushOutput(self):
- if self.__isMicropython:
- time.sleep(0.01)
- return
- raise NotImplementedError
|