123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789 |
- #!/usr/bin/env python3
- """
- # Copyright (C) 2008-2019 Michael Buesch <m@bues.ch>
- #
- # This program is free software: you can redistribute it and/or modify
- # it under the terms of the GNU General Public License version 3
- # as published by the Free Software Foundation.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with this program. If not, see <http://www.gnu.org/licenses/>.
- """
- import getopt
- import sys
- try:
- from serial import *
- except ImportError:
- print("ERROR: pyserial module not available.")
- print("On Debian Linux please do: apt install python3-serial")
- sys.exit(1)
- from PyQt5.QtCore import *
- from PyQt5.QtGui import *
- from PyQt5.QtWidgets import *
- # Serial communication port configuration
- CONFIG_BAUDRATE = 115200
- CONFIG_BYTESIZE = 8
- CONFIG_PARITY = PARITY_NONE
- CONFIG_STOPBITS = 2
- # The size of one message
- MSG_SIZE = 6
- MSG_PAYLOAD_SIZE = 4
- # Message IDs
- MSG_ID_MASK = 0x3F
- MSG_INVALID = 0
- MSG_ERROR = 1
- MSG_LOGMESSAGE = 2
- MSG_PING = 3
- MSG_PONG = 4
- MSG_GET_CURRENT_PRESSURE = 5
- MSG_CURRENT_PRESSURE = 6
- MSG_GET_DESIRED_PRESSURE = 7
- MSG_DESIRED_PRESSURE = 8
- MSG_SET_DESIRED_PRESSURE = 9
- MSG_GET_HYSTERESIS = 10
- MSG_HYSTERESIS = 11
- MSG_SET_HYSTERESIS = 12
- MSG_GET_CONFIG_FLAGS = 13
- MSG_CONFIG_FLAGS = 14
- MSG_SET_CONFIG_FLAGS = 15
- MSG_SET_VALVE = 16
- MSG_RESTARTED = 17
- MSG_SHUTDOWN = 18
- MSG_TURNON = 19
- MSG_GET_MAXIMA = 20
- MSG_MAXIMA = 21
- # Message flags
- MSG_FLAG_QOVERFLOW = 0x40
- MSG_FLAG_REQ_ERRCODE = 0x80
- # Message error codes
- MSG_ERR_NONE = 0 # No error
- MSG_ERR_CHKSUM = 1 # Checksum error
- MSG_ERR_NOCMD = 2 # Unknown command
- MSG_ERR_BUSY = 3 # Busy
- MSG_ERR_INVAL = 4 # Invalid argument
- MSG_ERR_NOREPLY = -1 # internal. Not sent over wire.
- # Config flags
- CFG_FLAG_AUTOADJUST_ENABLE = 0
- def usage():
- print("Pressure control - remote configuration")
- print("")
- print("Usage: pctl-remote [OPTIONS] /dev/ttyS0")
- print("")
- print("-h|--help Print this help text")
- print("-p|--noping Don't initially ping the device")
- print("-f|--nofetch Don't initially fetch the device state")
- print("-k|--noka Don't send keep-alive pings")
- print("-l|--log FILE Log status information to FILE")
- def parseArgs():
- global opt_ttyfile
- global opt_noping
- global opt_nofetch
- global opt_noka
- global opt_logfile
- if len(sys.argv) < 2:
- usage()
- sys.exit(1)
- opt_ttyfile = sys.argv[-1]
- opt_noping = 0
- opt_nofetch = 0
- opt_noka = 0
- opt_logfile = None
- try:
- (opts, args) = getopt.getopt(sys.argv[1:-1],
- "hpfkl:",
- [ "help", "noping", "nofetch", "noka", "log=", ])
- except getopt.GetoptError:
- usage()
- sys.exit(1)
- for (o, v) in opts:
- if o in ("-h", "--help"):
- usage()
- sys.exit(0)
- if o in ("-p", "--noping"):
- opt_noping = 1
- if o in ("-f", "--nofetch"):
- opt_nofetch = 1
- if o in ("-k", "--noka"):
- opt_noka = 1
- if o in ("-l", "--log"):
- opt_logfile = v
- class LogFile(QObject):
- def __init__(self, logfileName):
- QObject.__init__(self)
- self.fd = None
- if not logfileName:
- return
- try:
- self.fd = open(logfileName, "w+b")
- except IOError as e:
- print("Failed to open logfile %s: %s" % (logfileName, e.strerror))
- sys.exit(1)
- self.write("X/Y,X/Y lower threshold,X/Y upper threshold,"+\
- "Z,Z lower threshold,Z upper threshold,\n")
- print("Logging to: %s" % logfileName)
- def write(self, message):
- if not self.fd:
- return
- if isinstance(message, str):
- message = message.encode("UTF-8", "ignore")
- self.fd.write(message)
- self.fd.flush()
- def logPressure(self, xy, xy_desired, xy_hyst, z, z_desired, z_hyst):
- xy_lower = xy_desired - xy_hyst
- xy_upper = xy_desired + xy_hyst
- z_lower = z_desired - z_hyst
- z_upper = z_desired + z_hyst
- self.write("%d,%d,%d,%d,%d,%d,\n" %\
- (xy, xy_lower, xy_upper, z, z_lower, z_upper))
- class RemoteProtocol(QObject):
- def __init__(self, ttyfile):
- QObject.__init__(self)
- global remote
- try:
- remote = self
- self.serial = Serial(ttyfile, CONFIG_BAUDRATE,
- CONFIG_BYTESIZE, CONFIG_PARITY,
- CONFIG_STOPBITS)
- self.__setResetLine(False)
- QThread.msleep(100)
- self.serial.flushInput()
- self.devRestarted = False
- self.pollTimer = QTimer(self)
- self.pollTimer.timeout.connect(self.__poll)
- self.pollTimer.start(50)
- self.keepAliveTimer = QTimer(self)
- self.keepAliveTimer.timeout.connect(self.__keepAlive)
- if not opt_noka:
- self.keepAliveTimer.start(1000)
- if not opt_noping:
- reply = self.sendMessageSyncReply(MSG_PING, 0, b"", MSG_PONG)
- if reply:
- mainwnd.centralWidget().log.hostLog(
- "PING->PONG success. Device is alife.\n")
- else:
- mainwnd.centralWidget().log.hostLog(
- "Communication with device failed. "+\
- "No reply to PING request.\n")
- except (SerialException, OSError, IOError) as e:
- print(e)
- sys.exit(1)
- def __setResetLine(self, resetOn):
- # RTS is connected to the microcontroller reset line.
- # If RTS is logic high, the reset line is pulled low
- # and the microcontroller is put into reset.
- # If RTS is logic low, the microcontroller is in
- # normal operation.
- try:
- self.serial.setRTS(bool(resetOn))
- except (SerialException, OSError, IOError) as e:
- mainwnd.statusBar().showMessage("Failed to toggle reset. %s" % e)
- def rebootDevice(self):
- # Reboot the microcontroller
- self.__setResetLine(True)
- QThread.msleep(50)
- self.__setResetLine(False)
- QThread.msleep(50)
- def stopKeepAliveTimer(self):
- self.keepAliveTimer.stop()
- def __keepAlive(self):
- reply = self.sendMessageSyncReply(MSG_PING, 0, b"", MSG_PONG)
- if not reply:
- mainwnd.centralWidget().log.hostLog(self.tr(
- "Keep-alife: Device did not reply to ping "+\
- "request. Rebooting device...\n"))
- self.rebootDevice()
- if not mainwnd.centralWidget().fetchState():
- mainwnd.centralWidget().log.hostLog(self.tr(
- "Keep-alife: Failed to fetch configuration\n"))
- def __poll(self):
- try:
- if self.serial.inWaiting() >= MSG_SIZE:
- self.parseMessage(self.__readMessage())
- except (SerialException, OSError, IOError) as e:
- mainwnd.statusBar().showMessage("Failed to poll message. %s" % e)
- return
- if self.devRestarted:
- if mainwnd.centralWidget().fetchState():
- mainwnd.centralWidget().log.hostLog(self.tr("Device rebooted\n"))
- mainwnd.centralWidget().turnOnDevice()
- self.devRestarted = False
- def checksumMessage(self, msg):
- calc_crc = self.__crc8_update_buffer(0xFF, msg[0:-1])
- calc_crc ^= 0xFF
- want_crc = msg[-1]
- if calc_crc != want_crc:
- text = self.tr("ERROR: message CRC mismatch\n")
- mainwnd.centralWidget().log.hostLog(text)
- try:
- self.serial.flushInput()
- except (SerialException, OSError, IOError) as e:
- pass
- return False
- return True
- def __readMessage(self):
- msg = self.serial.read(MSG_SIZE)
- flags = msg[0] & ~MSG_ID_MASK
- if flags & MSG_FLAG_QOVERFLOW:
- mainwnd.centralWidget().log.hostLog(
- "Warning: TX queue overflow on the device")
- return msg
- def parseMessage(self, msg):
- if not self.checksumMessage(msg):
- return
- id = msg[0] & MSG_ID_MASK
- if id == MSG_LOGMESSAGE:
- str = self.getPayload(msg).rstrip(b'\0').decode("UTF-8", "ignore")
- mainwnd.centralWidget().log.devLog(str)
- if id == MSG_CURRENT_PRESSURE:
- mainwnd.centralWidget().parseCurrentPressureMsg(msg)
- if id == MSG_RESTARTED:
- self.devRestarted = True
- def getPayload(self, msg):
- return msg[1:-1]
- def sendMessage(self, id, flags, payload):
- """Send a message"""
- assert(len(payload) <= MSG_PAYLOAD_SIZE)
- # Create the header
- msg = b"%c" % (id | flags)
- # Add the payload
- msg += payload
- # Pad the payload up to the constant size
- msg += b'\0' * (MSG_PAYLOAD_SIZE - len(payload))
- # Calculate the CRC
- crc = self.__crc8_update_buffer(0xFF, msg)
- crc ^= 0xFF
- # Add the CRC to the message
- msg += b"%c" % crc
- # Send the message
- assert(len(msg) == MSG_SIZE)
- try:
- self.serial.write(msg)
- except (SerialException, OSError, IOError) as e:
- mainwnd.statusBar().showMessage("Failed to send message. %s" % e)
- def sendMessageSyncReply(self, id, flags, payload, replyId):
- """Send a message and synchronously wait for the reply."""
- self.pollTimer.stop()
- self.sendMessage(id, flags, payload)
- timeout = QDateTime.currentDateTime().addMSecs(500)
- try:
- while True:
- if QDateTime.currentDateTime() >= timeout:
- msg = None
- break
- if self.serial.inWaiting() < MSG_SIZE:
- QThread.msleep(1)
- continue
- msg = self.__readMessage()
- if not self.checksumMessage(msg):
- continue
- msgid = msg[0] & MSG_ID_MASK
- if msgid == replyId:
- break
- # This is not a reply to our message.
- self.parseMessage(msg)
- except (SerialException, OSError, IOError) as e:
- mainwnd.statusBar().showMessage("Failed to fetch reply. %s" % e)
- msg = b"\0" * MSG_SIZE
- self.pollTimer.start()
- return msg
- def sendMessageSyncError(self, id, flags, payload):
- """Sends a message and synchronously waits for the MSG_ERROR reply."""
- flags |= MSG_FLAG_REQ_ERRCODE
- reply = self.sendMessageSyncReply(id, flags, payload, MSG_ERROR)
- if not reply:
- return MSG_ERR_NOREPLY
- return self.getPayload(reply)[0]
- def configFlagsFetch(self):
- reply = self.sendMessageSyncReply(MSG_GET_CONFIG_FLAGS, 0, b"",
- MSG_CONFIG_FLAGS)
- if not reply:
- return None
- reply = remote.getPayload(reply)
- xy = reply[0]
- z = reply[1]
- return (xy, z)
- def configFlagsSet(self, island, flags):
- data = b"%c%c" % (island, flags)
- err = self.sendMessageSyncError(MSG_SET_CONFIG_FLAGS, 0, data)
- return err
- def setValve(self, islandId, valveNr, state):
- data = b"%c%c%c" % (islandId, valveNr, (state != 0))
- i = 5 # Retry a few times
- while i != 0:
- err = self.sendMessageSyncError(MSG_SET_VALVE, 0, data)
- if err == MSG_ERR_NONE:
- break
- i -= 1
- return err
- def __crc8_update_buffer(self, crc, buf):
- for c in buf:
- crc ^= c
- for i in range(8):
- if crc & 1:
- crc = (crc >> 1) ^ 0x8C
- else:
- crc >>= 1
- return crc & 0xFF
- class StatusBar(QStatusBar):
- def showMessage(self, msg):
- print(msg)
- QStatusBar.showMessage(self, msg, 3000)
- class LogBrowser(QTextEdit):
- def __init__(self, parent=None):
- QTextEdit.__init__(self, parent)
- self.msgs = []
- self.curDevMsg = ""
- self.setReadOnly(1)
- self.hostLog(self.tr("Pressure Control logging started\n"));
- def __commit(self):
- MSG_LIMIT = 100
- if len(self.msgs) > MSG_LIMIT:
- self.msgs = self.msgs[1:]
- assert(len(self.msgs) == MSG_LIMIT)
- self.setPlainText("".join(self.msgs))
- # Scroll to the end of the log
- vScroll = self.verticalScrollBar()
- vScroll.setValue(vScroll.maximum())
- def __dateString(self):
- date = QDateTime.currentDateTime()
- return str(date.toString("[hh:mm:ss] "))
- def devLog(self, text):
- text = str(text) # to python string
- if not text:
- return
- if not self.curDevMsg:
- text = self.__dateString() + "Dev: " + text
- self.curDevMsg += text
- if text[-1] in "\r\n":
- self.msgs.append(self.curDevMsg)
- self.curDevMsg = ""
- self.__commit()
- def hostLog(self, text):
- text = str(text) # to python string
- text = self.__dateString() + "Host: " + text
- self.msgs.append(text)
- self.__commit()
- class PressureGauge(QWidget):
- def __init__(self, name, min, max, units, parent):
- QWidget.__init__(self, parent)
- self.min = min
- self.max = max
- self.units = units
- self.setLayout(QVBoxLayout())
- self.title = QLabel(name, self)
- self.title.setAlignment(Qt.AlignHCenter)
- self.layout().addWidget(self.title)
- self.dial = QDial(self)
- self.dial.setNotchesVisible(1)
- self.dial.setEnabled(0)
- self.dial.setSingleStep(100)
- self.dial.setPageStep(1000)
- self.dial.setNotchTarget(2)
- self.dial.setMinimum(int(min * 1000))
- self.dial.setMaximum(int(max * 1000))
- self.dial.setFixedSize(100, 100)
- self.layout().addWidget(self.dial)
- self.num = QLabel(self)
- self.num.setAlignment(Qt.AlignHCenter)
- self.layout().addWidget(self.num)
- self.layout().addStretch()
- self.setValue(0)
- def setValue(self, value):
- if (value < self.min):
- value = self.min
- if (value > self.max):
- value = self.max
- self.num.setText("%.2f %s" % (float(value), self.units))
- self.dial.setValue(int(value * 1000))
- class ValveIslandWidget(QGroupBox):
- def __init__(self, name, islandId, parent):
- QGroupBox.__init__(self, name, parent)
- self.islandId = islandId
- self.setLayout(QGridLayout())
- self.gauge = PressureGauge("Current pressure", 0, 10, "Bar", self)
- self.layout().addWidget(self.gauge, 0, 0, 3, 1)
- h = QHBoxLayout()
- h.addStretch()
- self.autoCheckbox = QCheckBox(self.tr("Automatically adjust pressure"), self)
- self.autoCheckbox.stateChanged.connect(self.autoadjustChanged)
- h.addWidget(self.autoCheckbox)
- self.layout().addLayout(h, 0, 1)
- h = QHBoxLayout()
- h.addStretch()
- label = QLabel(self.tr("Desired pressure:"), self)
- h.addWidget(label)
- self.pressureSpin = QDoubleSpinBox(self)
- self.pressureSpin.setMinimum(0.5)
- self.pressureSpin.setMaximum(8)
- self.pressureSpin.setSingleStep(0.1)
- self.pressureSpin.setSuffix(self.tr(" Bar"))
- self.pressureSpin.valueChanged.connect(self.desiredPressureChanged)
- h.addWidget(self.pressureSpin)
- self.layout().addLayout(h, 1, 1)
- h = QHBoxLayout()
- h.addStretch()
- label = QLabel(self.tr("Hysteresis:"), self)
- h.addWidget(label)
- self.hystSpin = QDoubleSpinBox(self)
- self.hystSpin.setMinimum(0.05)
- self.hystSpin.setMaximum(8)
- self.hystSpin.setSingleStep(0.05)
- self.hystSpin.setSuffix(self.tr(" Bar"))
- self.hystSpin.valueChanged.connect(self.desiredHysteresisChanged)
- h.addWidget(self.hystSpin)
- self.layout().addLayout(h, 2, 1)
- h = QHBoxLayout()
- self.inButton = QPushButton(self.tr("IN-Valve"), self)
- self.inButton.pressed.connect(self.inValvePressed)
- self.inButton.released.connect(self.inValveReleased)
- h.addWidget(self.inButton)
- self.outButton = QPushButton(self.tr("OUT-Valve"), self)
- h.addWidget(self.outButton)
- self.outButton.pressed.connect(self.outValvePressed)
- self.outButton.released.connect(self.outValveReleased)
- self.layout().addLayout(h, 3, 0, 1, 2)
- self.autoadjustChanged(Qt.Unchecked)
- def desiredPressureChanged(self, value):
- if not self.parent().initialized:
- return
- mainwnd.centralWidget().pokeUiTimer()
- mbar = int(value * 1000)
- data = b"%c%c%c" % (self.islandId, (mbar & 0xFF), ((mbar >> 8) & 0xFF))
- err = remote.sendMessageSyncError(MSG_SET_DESIRED_PRESSURE, 0, data)
- if err != MSG_ERR_NONE:
- self.parent().log.hostLog(self.tr("Failed to change pressure. Error=%u\n" % err))
-
- def getDesiredPressure(self):
- return int(self.pressureSpin.value() * 1000)
- def desiredHysteresisChanged(self, value):
- if not self.parent().initialized:
- return
- mainwnd.centralWidget().pokeUiTimer()
- mbar = int(value * 1000)
- data = b"%c%c%c" % (self.islandId, (mbar & 0xFF), ((mbar >> 8) & 0xFF))
- err = remote.sendMessageSyncError(MSG_SET_HYSTERESIS, 0, data)
- if err != MSG_ERR_NONE:
- self.parent().log.hostLog(self.tr("Failed to change hysteresis. Error=%u\n" % err))
- def getHysteresis(self):
- return int(self.hystSpin.value() * 1000)
- def autoadjustChanged(self, state):
- self.inButton.setEnabled(state == Qt.Unchecked)
- self.outButton.setEnabled(state == Qt.Unchecked)
- if not self.parent().initialized:
- return
- mainwnd.centralWidget().pokeUiTimer()
- flags = remote.configFlagsFetch()
- if flags is None:
- self.parent().log.hostLog(self.tr("Failed to fetch config flags\n"))
- return
- flags = flags[self.islandId]
- if state == Qt.Checked:
- flags |= (1 << CFG_FLAG_AUTOADJUST_ENABLE)
- else:
- flags &= ~(1 << CFG_FLAG_AUTOADJUST_ENABLE)
- err = remote.configFlagsSet(self.islandId, flags)
- if err != MSG_ERR_NONE:
- self.parent().log.hostLog(self.tr("Failed to set config flags\n"))
- def inValvePressed(self):
- mainwnd.centralWidget().stopUiTimer()
- err = remote.setValve(self.islandId, 0, 1)
- if err != MSG_ERR_NONE:
- self.parent().log.hostLog(self.tr("Failed to switch valve 0 ON\n"))
- def inValveReleased(self):
- mainwnd.centralWidget().startUiTimer()
- err = remote.setValve(self.islandId, 0, 0)
- if err != MSG_ERR_NONE:
- self.parent().log.hostLog(self.tr("Failed to switch valve 0 OFF\n"))
- def outValvePressed(self):
- mainwnd.centralWidget().stopUiTimer()
- err = remote.setValve(self.islandId, 1, 1)
- if err != MSG_ERR_NONE:
- self.parent().log.hostLog(self.tr("Failed to switch valve 1 ON\n"))
- def outValveReleased(self):
- mainwnd.centralWidget().startUiTimer()
- err = remote.setValve(self.islandId, 1, 0)
- if err != MSG_ERR_NONE:
- self.parent().log.hostLog(self.tr("Failed to switch valve 1 OFF\n"))
- class MainWidget(QWidget):
- def __init__(self, parent=None):
- QWidget.__init__(self, parent)
- self.initialized = False
- self.setLayout(QVBoxLayout())
- self.uiLock = QCheckBox(self.tr("User interface enabled"), self)
- self.uiLock.setCheckState(Qt.Unchecked)
- self.uiLock.stateChanged.connect(self.__uiLockChanged)
- self.layout().addWidget(self.uiLock)
- self.uiLockTimer = QTimer(self)
- self.uiLockTimer.timeout.connect(self.__uiLockTimerExpired)
- self.xy = ValveIslandWidget("X/Y joints", 0, self)
- self.layout().addWidget(self.xy)
- self.z = ValveIslandWidget("Z joint", 1, self)
- self.layout().addWidget(self.z)
- self.log = LogBrowser(self)
- self.layout().addWidget(self.log)
- self.__uiEnable(False)
- def __uiEnable(self, enabled):
- self.xy.setEnabled(enabled)
- self.z.setEnabled(enabled)
- def stopUiTimer(self):
- self.uiLockTimer.stop()
- def startUiTimer(self):
- self.uiLockTimer.start(10000)
- def pokeUiTimer(self):
- self.stopUiTimer()
- self.startUiTimer()
- def __uiLockChanged(self, state):
- enabled = (state != Qt.Unchecked)
- self.__uiEnable(enabled)
- if enabled:
- self.pokeUiTimer()
- else:
- self.stopUiTimer()
- def __uiLockTimerExpired(self):
- self.uiLock.setCheckState(Qt.Unchecked)
- def initializeState(self):
- if not opt_nofetch:
- if not self.fetchState():
- self.log.hostLog(
- "Failed to fetch active configuration "+\
- "from device.\n")
- self.turnOnDevice()
- self.initialized = True
- def turnOnDevice(self):
- error = remote.sendMessageSyncError(MSG_TURNON, 0, b"")
- if error:
- self.log.hostLog("Failed to turn on device\n")
- else:
- self.log.hostLog("Device turned on\n")
- def shutdown(self):
- remote.stopKeepAliveTimer()
- if self.initialized:
- error = remote.sendMessageSyncError(MSG_SHUTDOWN, 0, b"")
- if error != MSG_ERR_NONE:
- QMessageBox.critical(self,
- "Pressure Control",
- "Failed to shutdown device")
- def fetchState(self):
- # Get the current pressure
- reply = remote.sendMessageSyncReply(MSG_GET_CURRENT_PRESSURE, 0, b"",
- MSG_CURRENT_PRESSURE)
- if not reply:
- print("Failed to fetch current pressure. No reply.")
- return False
- self.parseCurrentPressureMsg(reply)
- # Get the X/Y maxima
- reply = remote.sendMessageSyncReply(MSG_GET_MAXIMA, 0, b"%c" % 0,
- MSG_MAXIMA)
- if not reply:
- print("Failed to fetch X/Y maxima. No reply.")
- return False
- reply = remote.getPayload(reply)
- pressureMbar = reply[0] | (reply[1] << 8)
- hysteresisMbar = reply[2] | (reply[3] << 8)
- self.xy.pressureSpin.setMaximum(float(pressureMbar) / 1000)
- self.xy.hystSpin.setMaximum(float(pressureMbar) / 1000)
- # Get the Z maxima
- reply = remote.sendMessageSyncReply(MSG_GET_MAXIMA, 0, b"%c" % 1,
- MSG_MAXIMA)
- if not reply:
- print("Failed to fetch Z maxima. No reply.")
- return False
- reply = remote.getPayload(reply)
- pressureMbar = reply[0] | (reply[1] << 8)
- hysteresisMbar = reply[2] | (reply[3] << 8)
- self.z.pressureSpin.setMaximum(float(pressureMbar) / 1000)
- self.z.hystSpin.setMaximum(float(pressureMbar) / 1000)
- # Get the desired pressure
- reply = remote.sendMessageSyncReply(MSG_GET_DESIRED_PRESSURE, 0, b"",
- MSG_DESIRED_PRESSURE)
- if not reply:
- print("Failed to fetch desired pressure. No reply.")
- return False
- reply = remote.getPayload(reply)
- xy_mbar = reply[0] | (reply[1] << 8)
- z_mbar = reply[2] | (reply[3] << 8)
- self.xy.pressureSpin.setValue(float(xy_mbar) / 1000)
- self.z.pressureSpin.setValue(float(z_mbar) / 1000)
- # Get the hysteresis
- reply = remote.sendMessageSyncReply(MSG_GET_HYSTERESIS, 0, b"",
- MSG_HYSTERESIS)
- if not reply:
- print("Failed to fetch hysteresis. No reply.")
- return False
- reply = remote.getPayload(reply)
- xy_mbar = reply[0] | (reply[1] << 8)
- z_mbar = reply[2] | (reply[3] << 8)
- self.xy.hystSpin.setValue(float(xy_mbar) / 1000)
- self.z.hystSpin.setValue(float(z_mbar) / 1000)
- # Get the config flags
- flags = remote.configFlagsFetch()
- if flags is None:
- print("Failed to fetch config flags. No reply.")
- return False
- if flags[0] & (1 << CFG_FLAG_AUTOADJUST_ENABLE):
- self.xy.autoCheckbox.setCheckState(Qt.Checked)
- if flags[1] & (1 << CFG_FLAG_AUTOADJUST_ENABLE):
- self.z.autoCheckbox.setCheckState(Qt.Checked)
- return True
- def parseCurrentPressureMsg(self, msg):
- msg = remote.getPayload(msg)
- xy_mbar = msg[0] | (msg[1] << 8)
- z_mbar = msg[2] | (msg[3] << 8)
- self.xy.gauge.setValue(float(xy_mbar) / 1000)
- self.z.gauge.setValue(float(z_mbar) / 1000)
- logfile.logPressure(xy_mbar, self.xy.getDesiredPressure(),
- self.xy.getHysteresis(),
- z_mbar, self.z.getDesiredPressure(),
- self.z.getHysteresis())
- class MainWindow(QMainWindow):
- def __init__(self, parent=None):
- QMainWindow.__init__(self, parent)
- self.setWindowTitle(self.tr("Pneumatic pressure control"))
- mb = QMenuBar(self)
- ctlmen = QMenu(self.tr("Control"), mb)
- ctlmen.addAction(self.tr("Exit"), self.close)
- mb.addMenu(ctlmen)
- helpmen = QMenu(self.tr("Help"), mb)
- helpmen.addAction(self.tr("About"), self.about)
- mb.addMenu(helpmen)
- self.setMenuBar(mb)
- self.setStatusBar(StatusBar())
- self.setCentralWidget(MainWidget())
- self.resize(400, 350)
- def initializeState(self):
- self.centralWidget().initializeState()
- def shutdown(self):
- self.centralWidget().shutdown()
- def about(self):
- QMessageBox.information(self, self.tr("About"),
- self.tr("Pneumatic pressure control\n"
- "Copyright (c) 2008-2019 Michael Buesch"))
- def main():
- global remote
- global mainwnd
- global app
- global logfile
- mainwnd = None
- app = QApplication(sys.argv)
- parseArgs()
- logfile = LogFile(opt_logfile)
- mainwnd = MainWindow()
- remote = RemoteProtocol(opt_ttyfile)
- mainwnd.initializeState()
- mainwnd.show()
- result = app.exec_()
- mainwnd.shutdown()
- exit(result)
- if __name__ == "__main__":
- main()
|