123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998 |
- #!/usr/bin/env python3
- """
- #
- # CNC-control - Host driver
- #
- # Copyright (C) 2011-2023 Michael Büsch <m@bues.ch>
- #
- """
- import sys
- import usb
- import errno
- import time
- from datetime import datetime, timedelta
- IDVENDOR = 0x6666
- IDPRODUCT = 0xC8CC
- EP_IN = 0x82
- EP_OUT = 0x02
- EP_IRQ = 0x81
- ALL_AXES = "xyzuvwabc"
- AXIS2NUMBER = dict((x[1], x[0]) for x in enumerate(ALL_AXES))
- NUMBER2AXIS = dict(enumerate(ALL_AXES))
- def equal(a, b, threshold=0.00001):
- if type(a) == float or type(b) == float:
- return abs(float(a) - float(b)) <= threshold
- return a == b
- def twos32(nr): # Convert 32bit two's complement value to python int.
- if nr & 0x80000000:
- return (-(~nr & 0xFFFFFFFF)) - 1
- return nr
- def crc8(crc, data):
- crc = crc ^ data
- for i in range(0, 8):
- if crc & 0x01:
- crc = (crc >> 1) ^ 0x8C
- else:
- crc >>= 1
- return crc & 0xFF
- def crc8Buf(crc, iterable):
- for b in iterable:
- crc = crc8(crc, b)
- return crc
- class CNCCException(Exception):
- @classmethod
- def info(cls, message):
- print("CNC-Control:", message)
- @classmethod
- def warn(cls, message):
- print("CNC-Control WARNING:", message)
- @classmethod
- def error(cls, message):
- raise cls(message)
- class CNCCFatal(CNCCException):
- pass
- class FixPt:
- FIXPT_FRAC_BITS = 16
- MIN_INT_LIMIT = -(1 << (FIXPT_FRAC_BITS - 1))
- MIN_LIMIT = float(MIN_INT_LIMIT) - 0.99999
- MAX_INT_LIMIT = (1 << (FIXPT_FRAC_BITS - 1)) - 1
- MAX_LIMIT = float(MAX_INT_LIMIT) + 0.99999
- def __init__(self, val):
- if isinstance(val, FixPt):
- self.raw = val.raw
- self.floatval = val.floatval
- return
- if type(val) == float:
- self.assertRepresentable(val)
- raw = (int(val * float(1 << self.FIXPT_FRAC_BITS)) + 1) & 0xFFFFFFFF
- self.raw = self.__int2raw(raw)
- self.floatval = val
- return
- if type(val) == int:
- self.assertRepresentable(val)
- self.raw = self.__int2raw(val << self.FIXPT_FRAC_BITS)
- self.floatval = float(val)
- return
- try: # Try 32bit LE two's complement
- self.raw = tuple(val)
- raw = twos32(val[0] | (val[1] << 8) |\
- (val[2] << 16) | (val[3] << 24))
- self.floatval = round(float(raw) / float(1 << self.FIXPT_FRAC_BITS), 4)
- except (TypeError, IndexError) as e:
- CNCCException.error("FixPt TypeError")
- @classmethod
- def representable(cls, val):
- # Returns true, if the integer part is representable by FixPt
- ival = int(val)
- return ival > cls.MIN_INT_LIMIT if ival < 0 else\
- ival < cls.MAX_INT_LIMIT
- @classmethod
- def assertRepresentable(cls, val):
- if cls.representable(val):
- return
- CNCCException.error("FixPt: Value %s is not representable" %\
- str(val))
- @staticmethod
- def __int2raw(val):
- return (val & 0xFF, (val >> 8) & 0xFF,
- (val >> 16) & 0xFF, (val >> 24) & 0xFF)
- def __repr__(self):
- return "0x%02X%02X%02X%02X -> %f" %\
- (self.raw[3], self.raw[2], self.raw[1], self.raw[0],
- self.floatval)
- def getRaw(self):
- return self.raw
- def __eq__(self, other):
- return self.raw == other.raw
- def __ne__(self, other):
- return self.raw != other.raw
- def isNegative(self):
- return bool(self.raw[3] & 0x80)
- class ControlMsg:
- # IDs
- CONTROL_PING = 0
- CONTROL_RESET = 1
- CONTROL_DEVFLAGS = 2
- CONTROL_AXISUPDATE = 3
- CONTROL_SPINDLEUPDATE = 4
- CONTROL_FOUPDATE = 5
- CONTROL_AXISENABLE = 6
- CONTROL_ESTOPUPDATE = 7
- CONTROL_SETINCREMENT = 8
- CONTROL_ENTERBOOT = 0xA0
- CONTROL_EXITBOOT = 0xA1
- CONTROL_BOOT_WRITEBUF = 0xA2
- CONTROL_BOOT_FLASHPG = 0xA3
- # Flags
- CONTROL_FLG_BOOTLOADER = 0x80
- # Targets
- TARGET_CPU = 0
- TARGET_COPROC = 1
- def __init__(self, id, flags, seqno):
- self.id = id
- self.flags = flags
- self.reserved = 0
- self.seqno = seqno
- def setSeqno(self, seqno):
- self.seqno = seqno
- def getRaw(self):
- return [self.id & 0xFF, self.flags & 0xFF,
- self.reserved & 0xFF, self.seqno & 0xFF]
- class ControlMsgPing(ControlMsg):
- def __init__(self, hdrFlags=0, hdrSeqno=0):
- ControlMsg.__init__(self, ControlMsg.CONTROL_PING,
- hdrFlags, hdrSeqno)
- class ControlMsgReset(ControlMsg):
- def __init__(self, hdrFlags=0, hdrSeqno=0):
- ControlMsg.__init__(self, ControlMsg.CONTROL_RESET,
- hdrFlags, hdrSeqno)
- class ControlMsgDevflags(ControlMsg):
- DEVICE_FLG_NODEBUG = (1 << 0)
- DEVICE_FLG_VERBOSEDBG = (1 << 1)
- DEVICE_FLG_ON = (1 << 2)
- DEVICE_FLG_TWOHANDEN = (1 << 3)
- DEVICE_FLG_USBLOGMSG = (1 << 4)
- DEVICE_FLG_G53COORDS = (1 << 5)
- def __init__(self, devFlagsMask, devFlagsSet, hdrFlags=0, hdrSeqno=0):
- ControlMsg.__init__(self, ControlMsg.CONTROL_DEVFLAGS,
- hdrFlags, hdrSeqno)
- self.devFlagsMask = devFlagsMask
- self.devFlagsSet = devFlagsSet
- def getRaw(self):
- raw = ControlMsg.getRaw(self)
- raw.extend( [self.devFlagsMask & 0xFF, (self.devFlagsMask >> 8) & 0xFF] )
- raw.extend( [self.devFlagsSet & 0xFF, (self.devFlagsSet >> 8) & 0xFF] )
- return raw
- class ControlMsgAxisupdate(ControlMsg):
- def __init__(self, pos, axis, hdrFlags=0, hdrSeqno=0):
- ControlMsg.__init__(self, ControlMsg.CONTROL_AXISUPDATE,
- hdrFlags, hdrSeqno)
- self.pos = FixPt(pos)
- self.axis = AXIS2NUMBER[axis]
- def getRaw(self):
- raw = ControlMsg.getRaw(self)
- raw.extend(self.pos.getRaw())
- raw.append(self.axis & 0xFF)
- return raw
- class ControlMsgSpindleupdate(ControlMsg):
- SPINDLE_OFF = 0
- SPINDLE_CW = 1
- SPINDLE_CCW = 2
- def __init__(self, state, hdrFlags=0, hdrSeqno=0):
- ControlMsg.__init__(self, ControlMsg.CONTROL_SPINDLEUPDATE,
- hdrFlags, hdrSeqno)
- self.state = state
- def getRaw(self):
- raw = ControlMsg.getRaw(self)
- raw.append(self.state & 0xFF)
- return raw
- class ControlMsgFoupdate(ControlMsg):
- def __init__(self, percent, hdrFlags=0, hdrSeqno=0):
- ControlMsg.__init__(self, ControlMsg.CONTROL_FOUPDATE,
- hdrFlags, hdrSeqno)
- self.percent = percent
- def getRaw(self):
- raw = ControlMsg.getRaw(self)
- raw.append(self.percent & 0xFF)
- return raw
- class ControlMsgAxisenable(ControlMsg):
- def __init__(self, mask, hdrFlags=0, hdrSeqno=0):
- ControlMsg.__init__(self, ControlMsg.CONTROL_AXISENABLE,
- hdrFlags, hdrSeqno)
- self.mask = mask
- def getRaw(self):
- raw = ControlMsg.getRaw(self)
- raw.extend( [self.mask & 0xFF, (self.mask >> 8) & 0xFF] )
- return raw
- class ControlMsgEstopupdate(ControlMsg):
- def __init__(self, asserted, hdrFlags=0, hdrSeqno=0):
- ControlMsg.__init__(self, ControlMsg.CONTROL_ESTOPUPDATE,
- hdrFlags, hdrSeqno)
- self.asserted = 1 if asserted else 0
- def getRaw(self):
- raw = ControlMsg.getRaw(self)
- raw.append(self.asserted & 0xFF)
- return raw
- class ControlMsgSetincrement(ControlMsg):
- MAX_INDEX = 5
- MAX_INC_FLOAT = 9.999
- def __init__(self, increment, index, hdrFlags=0, hdrSeqno=0):
- ControlMsg.__init__(self, ControlMsg.CONTROL_SETINCREMENT,
- hdrFlags, hdrSeqno)
- self.increment = FixPt(increment)
- self.index = index
- if self.increment.isNegative():
- CNCCFatal.error("Invalid negative JOG increment %f at index %d" %\
- (self.increment.floatval, index))
- if self.increment.floatval > ControlMsgSetincrement.MAX_INC_FLOAT:
- CNCCFatal.error("JOG increment %f at index %d is too big. Max = %f" %\
- (self.increment.floatval, index, ControlMsgSetincrement.MAX_INC_FLOAT))
- def getRaw(self):
- raw = ControlMsg.getRaw(self)
- raw.extend(self.increment.getRaw())
- raw.append(self.index & 0xFF)
- return raw
- class ControlMsgEnterboot(ControlMsg):
- ENTERBOOT_MAGIC0 = 0xB0
- ENTERBOOT_MAGIC1 = 0x07
- def __init__(self, target, hdrFlags=0, hdrSeqno=0):
- ControlMsg.__init__(self, ControlMsg.CONTROL_ENTERBOOT,
- hdrFlags, hdrSeqno)
- self.magic = (
- ControlMsgEnterboot.ENTERBOOT_MAGIC0,
- ControlMsgEnterboot.ENTERBOOT_MAGIC1,
- )
- self.target = target
- def getRaw(self):
- raw = ControlMsg.getRaw(self)
- raw.extend(self.magic)
- raw.append(self.target & 0xFF)
- return raw
- class ControlMsgExitboot(ControlMsg):
- def __init__(self, target,
- hdrFlags=ControlMsg.CONTROL_FLG_BOOTLOADER, hdrSeqno=0):
- ControlMsg.__init__(self, ControlMsg.CONTROL_EXITBOOT,
- hdrFlags, hdrSeqno)
- self.target = target
- def getRaw(self):
- raw = ControlMsg.getRaw(self)
- raw.append(self.target & 0xFF)
- return raw
- class ControlMsgBootWritebuf(ControlMsg):
- DATA_MAX_BYTES = 32
- def __init__(self, offset, data,
- hdrFlags=ControlMsg.CONTROL_FLG_BOOTLOADER, hdrSeqno=0):
- ControlMsg.__init__(self, ControlMsg.CONTROL_BOOT_WRITEBUF,
- hdrFlags, hdrSeqno)
- self.offset = offset
- self.size = len(data)
- self.crc = crc8Buf(0, data) ^ 0xFF
- nrPadding = ControlMsgBootWritebuf.DATA_MAX_BYTES - len(data)
- if nrPadding < 0:
- CNCCException.error("ControlMsg-BootWritebuf: invalid data length %d" %\
- (len(data)))
- self.data = data
- self.data.extend([0] * nrPadding)
- def getRaw(self):
- raw = ControlMsg.getRaw(self)
- raw.extend( [self.offset & 0xFF, (self.offset >> 8) & 0xFF] )
- raw.append(self.size & 0xFF)
- raw.append(self.crc & 0xFF)
- raw.extend(self.data)
- return raw
- class ControlMsgBootFlashpg(ControlMsg):
- def __init__(self, address, target,
- hdrFlags=ControlMsg.CONTROL_FLG_BOOTLOADER, hdrSeqno=0):
- ControlMsg.__init__(self, ControlMsg.CONTROL_BOOT_FLASHPG,
- hdrFlags, hdrSeqno)
- self.address = address
- self.target = target
- def getRaw(self):
- raw = ControlMsg.getRaw(self)
- raw.extend( [self.address & 0xFF, (self.address >> 8) & 0xFF] )
- raw.append(self.target & 0xFF)
- return raw
- class ControlReply:
- MAX_SIZE = 6
- # IDs
- REPLY_OK = 0
- REPLY_ERROR = 1
- REPLY_VAL16 = 2
- def __init__(self, id, flags, seqno):
- self.id = id
- self.flags = flags
- self.reserved = 0
- self.seqno = seqno
- @staticmethod
- def parseRaw(raw):
- try:
- id = raw[0]
- flags = raw[1]
- #reserved = raw[2]
- seqno = raw[3]
- raw = raw[4:]
- if id == ControlReply.REPLY_OK:
- return ControlReplyOk(hdrFlags=flags, hdrSeqno=seqno)
- elif id == ControlReply.REPLY_ERROR:
- return ControlReplyError(raw[0],
- hdrFlags=flags, hdrSeqno=seqno)
- elif id == ControlReply.REPLY_VAL16:
- return ControlReplyVal16(raw[0] | (raw[1] << 8),
- hdrFlags=flags, hdrSeqno=seqno)
- else:
- CNCCException.error("Unknown ControlReply ID: %d" % id)
- except (IndexError, KeyError):
- CNCCException.error("Failed to parse ControlReply (%d bytes)" % len(raw))
- def __repr__(self):
- return "Unknown reply code"
- def isOK(self):
- return False
- class ControlReplyOk(ControlReply):
- def __init__(self, hdrFlags=0, hdrSeqno=0):
- ControlReply.__init__(self, ControlReply.REPLY_OK,
- hdrFlags, hdrSeqno)
- def __repr__(self):
- return "Ok"
- def isOK(self):
- return True
- class ControlReplyError(ControlReply):
- # Error codes
- CTLERR_UNDEFINED = 0
- CTLERR_COMMAND = 1
- CTLERR_SIZE = 2
- CTLERR_BUSY = 3
- CTLERR_PERMISSION = 4
- CTLERR_INVAL = 5
- CTLERR_CONTEXT = 6
- CTLERR_CHECKSUM = 7
- CTLERR_CMDFAIL = 8
- def __init__(self, code, hdrFlags=0, hdrSeqno=0):
- ControlReply.__init__(self, ControlReply.REPLY_ERROR,
- hdrFlags, hdrSeqno)
- self.code = code
- def __repr__(self):
- code2text = {
- self.CTLERR_UNDEFINED : "Undefined error",
- self.CTLERR_COMMAND : "Command not implemented",
- self.CTLERR_SIZE : "Payload size error",
- self.CTLERR_BUSY : "Device is busy",
- self.CTLERR_PERMISSION : "Permission denied",
- self.CTLERR_INVAL : "Invalid parameters",
- self.CTLERR_CONTEXT : "Not supported in this context",
- self.CTLERR_CHECKSUM : "Data checksum error",
- self.CTLERR_CMDFAIL : "Command failed",
- }
- try:
- return code2text[self.code]
- except KeyError:
- return "Unknown error"
- class ControlReplyVal16(ControlReply):
- def __init__(self, value, hdrFlags=0, hdrSeqno=0):
- ControlReply.__init__(self, ControlReply.REPLY_VAL16,
- hdrFlags, hdrSeqno)
- self.value = value
- def __repr__(self):
- return "0x%04X" % self.value
- def isOK(self):
- return True
- class ControlIrq:
- MAX_SIZE = 14
- # IDs
- IRQ_JOG = 0
- IRQ_JOG_KEEPALIFE = 1
- IRQ_SPINDLE = 2
- IRQ_FEEDOVERRIDE = 3
- IRQ_DEVFLAGS = 4
- IRQ_HALT = 5
- IRQ_LOGMSG = 6
- # Flags
- IRQ_FLG_TXQOVR = (1 << 0)
- IRQ_FLG_PRIO = (1 << 1)
- IRQ_FLG_DROPPABLE = (1 << 2)
- def __init__(self, id, flags, seqno):
- self.id = id
- self.flags = flags
- self.reserved = 0
- self.seqno = seqno
- @staticmethod
- def parseRaw(raw):
- try:
- id = raw[0]
- flags = raw[1]
- #reserved = raw[2]
- seqno = raw[3]
- raw = raw[4:]
- if id == ControlIrq.IRQ_JOG:
- return ControlIrqJog(raw[0:4], raw[4:8], raw[8], raw[9],
- hdrFlags=flags, hdrSeqno=seqno)
- elif id == ControlIrq.IRQ_JOG_KEEPALIFE:
- return ControlIrqJogKeepalife(hdrFlags=flags, hdrSeqno=seqno)
- elif id == ControlIrq.IRQ_SPINDLE:
- return ControlIrqSpindle(raw[0],
- hdrFlags=flags, hdrSeqno=seqno)
- elif id == ControlIrq.IRQ_FEEDOVERRIDE:
- return ControlIrqFeedoverride(raw[0],
- hdrFlags=flags, hdrSeqno=seqno)
- elif id == ControlIrq.IRQ_DEVFLAGS:
- return ControlIrqDevflags(raw[0:2],
- hdrFlags=flags, hdrSeqno=seqno)
- elif id == ControlIrq.IRQ_HALT:
- return ControlIrqHalt(hdrFlags=flags, hdrSeqno=seqno)
- elif id == ControlIrq.IRQ_LOGMSG:
- return ControlIrqLogmsg(raw[0:10],
- hdrFlags=flags, hdrSeqno=seqno)
- else:
- CNCCException.error("Unknown ControlIrq ID: %d" % id)
- except (IndexError, KeyError):
- CNCCException.error("Failed to parse ControlIrq (%d bytes)" % len(raw))
- def __repr__(self):
- return "Unknown interrupt"
- class ControlIrqJog(ControlIrq):
- IRQ_JOG_CONTINUOUS = (1 << 0)
- IRQ_JOG_RAPID = (1 << 1)
- def __init__(self, increment, velocity, axis, flags,
- hdrFlags=0, hdrSeqno=0):
- ControlIrq.__init__(self, ControlIrq.IRQ_JOG,
- hdrFlags, hdrSeqno)
- self.increment = FixPt(increment)
- self.velocity = FixPt(velocity)
- self.axis = NUMBER2AXIS[axis]
- self.jogFlags = flags
- def __repr__(self):
- return "JOG interrupt (nr%d): %s, %s, %s, 0x%X" %\
- (self.seqno, str(self.increment),
- str(self.velocity), self.axis, self.jogFlags)
- class ControlIrqJogKeepalife(ControlIrq):
- def __init__(self, hdrFlags=0, hdrSeqno=0):
- ControlIrq.__init__(self, ControlIrq.IRQ_JOG_KEEPALIFE,
- hdrFlags, hdrSeqno)
- def __repr__(self):
- return "JOG KEEPALIFE interrupt (nr%d)" % (self.seqno)
- class ControlIrqSpindle(ControlIrq):
- def __init__(self, state, hdrFlags=0, hdrSeqno=0):
- ControlIrq.__init__(self, ControlIrq.IRQ_SPINDLE,
- hdrFlags, hdrSeqno)
- self.state = state
- def __repr__(self):
- return "SPINDLE interrupt (nr%d): %d" % (self.seqno, self.state)
- class ControlIrqFeedoverride(ControlIrq):
- def __init__(self, state, hdrFlags=0, hdrSeqno=0):
- ControlIrq.__init__(self, ControlIrq.IRQ_FEEDOVERRIDE,
- hdrFlags, hdrSeqno)
- self.state = state
- def __repr__(self):
- return "FEEDOVERRIDE interrupt (nr%d): %d" % (self.seqno, self.state)
- class ControlIrqDevflags(ControlIrq):
- def __init__(self, devFlags, hdrFlags=0, hdrSeqno=0):
- ControlIrq.__init__(self, ControlIrq.IRQ_DEVFLAGS,
- hdrFlags, hdrSeqno)
- self.devFlags = devFlags[0] | (devFlags[1] << 8)
- def __repr__(self):
- return "DEVFLAGS interrupt (nr%d): %04X" % (self.seqno, self.devFlags)
- class ControlIrqHalt(ControlIrq):
- def __init__(self, hdrFlags=0, hdrSeqno=0):
- ControlIrq.__init__(self, ControlIrq.IRQ_HALT,
- hdrFlags, hdrSeqno)
- def __repr__(self):
- return "HALT interrupt (nr%d)" % (self.seqno)
- class ControlIrqLogmsg(ControlIrq):
- def __init__(self, msg, hdrFlags=0, hdrSeqno=0):
- ControlIrq.__init__(self, ControlIrq.IRQ_LOGMSG,
- hdrFlags, hdrSeqno)
- self.msg = "".join([c if type(c) == str else chr(c) for c in msg])
- def __repr__(self):
- return "LOGMSG interrupt (nr%d)" % (self.seqno)
- class JogState:
- KEEPALIFE_TIMEOUT = 0.3
- STOPDATA = (FixPt(0.0), False, FixPt(0.0))
- def __init__(self):
- self.reset()
- def get(self):
- direction, incremental, velocity =\
- self.__direction, self.__incremental, self.__velocity
- if not equal(direction.floatval, 0.0):
- if datetime.now() > self.__timeout:
- CNCCException.warn("Jog keepalife timer expired")
- self.reset()
- return self.STOPDATA
- return (direction, incremental, velocity)
- def set(self, direction, incremental, velocity):
- self.__direction, self.__incremental, self.__velocity =\
- direction, incremental, velocity
- self.keepAlife()
- def reset(self):
- self.set(self.STOPDATA[0], self.STOPDATA[1], self.STOPDATA[2])
- def keepAlife(self):
- self.__timeout = datetime.now() + timedelta(seconds=self.KEEPALIFE_TIMEOUT)
- class CNCControl:
- def __init__(self, verbose=False):
- self.deviceAvailable = False
- self.verbose = verbose
- @staticmethod
- def __haveEndpoint(interface, epAddress):
- found = [ep for ep in interface.endpoints if ep.address == epAddress]
- return bool(found)
- def __epClearHalt(self, interface, epAddress):
- if self.__haveEndpoint(interface, epAddress):
- self.usbh.clearHalt(epAddress)
- def deviceRunsBootloader(self):
- # Check if we're in application code
- ping = ControlMsgPing(hdrFlags=0)
- reply = self.controlMsgSyncReply(ping)
- if reply.isOK():
- return False
- if reply.id == ControlReply.REPLY_ERROR and\
- reply.code != ControlReplyError.CTLERR_CONTEXT:
- CNCCException.error("Failed to ping the application: %s" % str(reply))
- # Check if we're in the bootloader code
- ping = ControlMsgPing(hdrFlags=ControlMsg.CONTROL_FLG_BOOTLOADER)
- reply = self.controlMsgSyncReply(ping)
- if reply.isOK():
- return True
- if reply.id == ControlReply.REPLY_ERROR and\
- reply.code != ControlReplyError.CTLERR_CONTEXT:
- CNCCException.error("Failed to ping the bootloader: %s" % str(reply))
- CNCCException.error("Unknown PING error")
- def probe(self):
- if self.deviceAvailable:
- return True
- self.__initializeData()
- try:
- self.usbdev = self.__findDevice(IDVENDOR, IDPRODUCT)
- if not self.usbdev:
- return False
- time.sleep(0.1)
- self.usbh = self.usbdev.open()
- self.usbh.reset()
- time.sleep(0.2)
- config = self.usbdev.configurations[0]
- interface = config.interfaces[0][0]
- self.usbh.setConfiguration(config)
- self.usbh.claimInterface(interface)
- self.usbh.setAltInterface(interface)
- self.__epClearHalt(interface, EP_IN)
- self.__epClearHalt(interface, EP_OUT)
- self.__epClearHalt(interface, EP_IRQ)
- except usb.USBError as e:
- self.__usbError(e, fatal=True, origin="init")
- self.__devicePlug()
- return True
- def deviceReset(self):
- self.__initializeData()
- reply = self.controlMsgSyncReply(ControlMsgReset())
- if not reply.isOK():
- CNCCException.error("Failed to reset the device state")
- reply = self.controlMsgSyncReply(ControlMsgDevflags(0, 0))
- if not reply.isOK():
- CNCCException.error("Failed to read device flags")
- self.__interpretDevFlags(reply.value)
- def deviceAppPing(self):
- return self.controlMsgSyncReply(ControlMsgPing()).isOK()
- def reconnect(self, timeoutSec=15):
- if not self.deviceAvailable:
- return False
- timeout = datetime.now() + timedelta(seconds=timeoutSec)
- self.__deviceUnplug()
- while timeout > datetime.now():
- try:
- if self.probe():
- return True
- except CNCCException as e:
- pass
- time.sleep(0.05)
- return False
- def __initializeData(self):
- self.messageSequenceNumber = 0
- self.deviceIsOn = False
- self.g53coords = False
- self.estop = False
- self.motionHaltRequest = False
- self.axisPositions = { }
- self.axisPosUpdatePending = { }
- self.lastAxisPosUpdate = { }
- self.jogStates = { }
- for ax in ALL_AXES:
- self.axisPositions[ax] = FixPt(0.0)
- self.axisPosUpdatePending[ax] = False
- self.lastAxisPosUpdate[ax] = datetime(1970, 1, 1)
- self.jogStates[ax] = JogState()
- self.foState = 0
- self.spindleCommand = 0
- self.spindleState = 0
- self.feedOverridePercent = 0
- self.logMsgBuf = ""
- def __interpretDevFlags(self, devFlags):
- if devFlags & ControlMsgDevflags.DEVICE_FLG_ON:
- if not self.deviceIsOn:
- CNCCException.info("turned ON")
- self.deviceIsOn = True
- else:
- if self.deviceIsOn:
- CNCCException.info("turned OFF")
- self.deviceIsOn = False
- self.g53coords = bool(devFlags & ControlMsgDevflags.DEVICE_FLG_G53COORDS)
- @staticmethod
- def __findDevice(idVendor, idProduct):
- for bus in usb.busses():
- for device in bus.devices:
- if device.idVendor == idVendor and\
- device.idProduct == idProduct:
- return device
- return None
- def __devicePlug(self):
- self.deviceAvailable = True
- if self.verbose:
- CNCCException.info("device connected")
- def __deviceUnplug(self):
- if self.deviceAvailable:
- self.deviceAvailable = False
- CNCCException.info("device disconnected")
- def __deviceUnplugException(self, message):
- self.__deviceUnplug()
- CNCCFatal.error(message)
- def __usbError(self, usbException, fatal=False, origin=None):
- if usbException.errno == errno.ENODEV or\
- str(usbException).lower().find("no such device") >= 0:
- self.__deviceUnplugException("Unplug exception")
- cls = CNCCFatal if fatal else CNCCException
- cls.error("USB error%s: %s" %(
- ((" (%s)" % origin) if origin else ""),
- str(usbException)))
- def eventWait(self, timeoutMs=30):
- if not self.deviceAvailable:
- self.__deviceUnplugException()
- try:
- data = self.usbh.interruptRead(EP_IRQ, ControlIrq.MAX_SIZE,
- timeoutMs)
- except usb.USBError as e:
- if not e.errno:
- return False # Timeout. No event.
- self.__usbError(e, origin="eventWait")
- if data:
- self.__handleInterrupt(data)
- return True
- def __handleInterrupt(self, rawData):
- irq = ControlIrq.parseRaw(rawData)
- if irq.flags & ControlIrq.IRQ_FLG_TXQOVR:
- CNCCException.warn("Interrupt queue overflow detected")
- if irq.id == ControlIrq.IRQ_JOG:
- cont = bool(irq.jogFlags & ControlIrqJog.IRQ_JOG_CONTINUOUS)
- velocity = irq.velocity
- if irq.jogFlags & ControlIrqJog.IRQ_JOG_RAPID:
- velocity = FixPt(-1.0)
- state = self.jogStates[irq.axis]
- state.set(direction = irq.increment,
- incremental = not cont,
- velocity = velocity)
- elif irq.id == ControlIrq.IRQ_JOG_KEEPALIFE:
- for jogState in self.jogStates.values():
- jogState.keepAlife()
- elif irq.id == ControlIrq.IRQ_SPINDLE:
- irq2direction = {
- ControlMsgSpindleupdate.SPINDLE_OFF: 0,
- ControlMsgSpindleupdate.SPINDLE_CW: 1,
- ControlMsgSpindleupdate.SPINDLE_CCW: -1,
- }
- self.spindleCommand = irq2direction[irq.state]
- elif irq.id == ControlIrq.IRQ_FEEDOVERRIDE:
- self.foState = irq.state
- elif irq.id == ControlIrq.IRQ_DEVFLAGS:
- self.__interpretDevFlags(irq.devFlags)
- elif irq.id == ControlIrq.IRQ_HALT:
- self.motionHaltRequest = True
- for jogState in self.jogStates.values():
- jogState.reset()
- self.spindleCommand = 0
- elif irq.id == ControlIrq.IRQ_LOGMSG:
- msg = self.logMsgBuf + irq.msg
- msg = msg.split('\n')
- while len(msg) > 1:
- print("[dev debug]:", msg[0])
- msg = msg[1:]
- self.logMsgBuf = msg[0]
- else:
- CNCCException.warn("Unhandled IRQ: " + str(irq))
- def controlMsg(self, msg, timeoutMs=300):
- try:
- msg.setSeqno(self.messageSequenceNumber)
- self.messageSequenceNumber = (self.messageSequenceNumber + 1) & 0xFF
- rawData = msg.getRaw()
- size = self.usbh.bulkWrite(EP_OUT, rawData, timeoutMs)
- if len(rawData) != size:
- CNCCException.error("Only wrote %d bytes of %d bytes "
- "bulk write" % (size, len(rawData)))
- except usb.USBError as e:
- self.__usbError(e, origin="controlMsg")
- def controlReply(self, timeoutMs=300):
- try:
- data = self.usbh.bulkRead(EP_IN, ControlReply.MAX_SIZE, timeoutMs)
- except usb.USBError as e:
- self.__usbError(e, origin="controlReply")
- return ControlReply.parseRaw(data)
- def controlMsgSyncReply(self, msg, timeoutMs=300):
- self.controlMsg(msg, timeoutMs)
- reply = self.controlReply(timeoutMs)
- if msg.seqno != reply.seqno:
- CNCCException.error("Got invalid reply sequence number: %d vs %d" %\
- (msg.seqno, reply.seqno))
- return reply
- def setTwohandEnabled(self, enable):
- if not self.deviceAvailable:
- self.__deviceUnplugException()
- flg = 0
- if enable:
- flg = ControlMsgDevflags.DEVICE_FLG_TWOHANDEN
- msg = ControlMsgDevflags(ControlMsgDevflags.DEVICE_FLG_TWOHANDEN, flg)
- reply = self.controlMsgSyncReply(msg)
- if not reply.isOK():
- CNCCException.error("Failed to set Twohand flag")
- def setIncrementAtIndex(self, index, increment):
- if not self.deviceAvailable:
- self.__deviceUnplugException()
- if equal(increment, 0.0):
- increment = FixPt(0)
- msg = ControlMsgSetincrement(increment, index)
- reply = self.controlMsgSyncReply(msg)
- if not reply.isOK():
- CNCCException.error("Failed to set increment %f at index %d: %s" %\
- (increment, index, str(reply)))
- def setDebugging(self, debugLevel, usbMessages):
- # 0 => disabled, 1 => enabled, 2 => verbose
- if not self.deviceAvailable:
- self.__deviceUnplugException()
- flg = ControlMsgDevflags.DEVICE_FLG_NODEBUG
- if debugLevel >= 1:
- flg &= ~ControlMsgDevflags.DEVICE_FLG_NODEBUG
- if debugLevel >= 2:
- flg |= ControlMsgDevflags.DEVICE_FLG_VERBOSEDBG
- if usbMessages:
- flg |= ControlMsgDevflags.DEVICE_FLG_USBLOGMSG
- msg = ControlMsgDevflags(ControlMsgDevflags.DEVICE_FLG_NODEBUG |
- ControlMsgDevflags.DEVICE_FLG_VERBOSEDBG |
- ControlMsgDevflags.DEVICE_FLG_USBLOGMSG,
- flg)
- reply = self.controlMsgSyncReply(msg)
- if not reply.isOK():
- CNCCException.error("Failed to set debugging flags")
- def setEstopState(self, asserted):
- # Send the estop state to the device
- if not self.deviceAvailable:
- self.__deviceUnplugException()
- if asserted == self.estop:
- return # No change
- msg = ControlMsgEstopupdate(asserted)
- reply = self.controlMsgSyncReply(msg)
- if not reply.isOK():
- CNCCException.error("Failed to send ESTOP update")
- self.estop = asserted
- def deviceIsTurnedOn(self):
- if not self.deviceAvailable:
- self.__deviceUnplugException()
- return self.deviceIsOn
- def haveMotionHaltRequest(self):
- if not self.deviceAvailable:
- self.__deviceUnplugException()
- halt = self.motionHaltRequest
- self.motionHaltRequest = False
- return halt
- def getSpindleCommand(self):
- # Returns -1, 0 or 1 for reverse, stop or forward.
- if not self.deviceAvailable:
- self.__deviceUnplugException()
- return self.spindleCommand
- def setSpindleState(self, direction):
- if not self.deviceAvailable:
- self.__deviceUnplugException()
- if self.spindleState == direction:
- return
- self.spindleState = direction
- direction2state = {
- 0: ControlMsgSpindleupdate.SPINDLE_OFF,
- 1: ControlMsgSpindleupdate.SPINDLE_CW,
- -1: ControlMsgSpindleupdate.SPINDLE_CCW,
- }
- msg = ControlMsgSpindleupdate(direction2state[direction])
- reply = self.controlMsgSyncReply(msg)
- if not reply.isOK():
- CNCCException.error("Failed to send spindle update")
- def getFeedOverrideState(self, minValue, maxValue):
- # Returns override state in percent (float)
- if not self.deviceAvailable:
- self.__deviceUnplugException()
- inRange = 256
- outRange = maxValue - minValue
- mult = outRange / inRange
- return self.foState * mult + minValue
- def setFeedOverrideState(self, percent):
- # Sends the current FO percentage state to the device
- if not self.deviceAvailable:
- self.__deviceUnplugException()
- if self.feedOverridePercent == percent:
- return # No change
- msg = ControlMsgFoupdate(int(round(percent)))
- reply = self.controlMsgSyncReply(msg)
- if not reply.isOK():
- CNCCException.error("Failed to send feed override state")
- self.feedOverridePercent = percent
- def setAxisPosition(self, axis, position):
- # Update axis position on device
- if not self.deviceAvailable:
- self.__deviceUnplugException()
- pos = FixPt(position)
- if pos != self.axisPositions[axis]:
- self.axisPositions[axis] = pos
- self.axisPosUpdatePending[axis] = True
- if self.axisPosUpdatePending[axis]:
- now = datetime.now()
- if now < self.lastAxisPosUpdate[axis] + timedelta(seconds=0.1):
- return # Not yet
- msg = ControlMsgAxisupdate(pos, axis)
- reply = self.controlMsgSyncReply(msg)
- if not reply.isOK():
- CNCCException.error("Axis update failed: %s" % str(reply))
- self.axisPosUpdatePending[axis] = False
- self.lastAxisPosUpdate[axis] = now
- def wantG53Coords(self):
- return self.g53coords
- def setEnabledAxes(self, axes):
- # Set the enabled axes.
- if not self.deviceAvailable:
- self.__deviceUnplugException()
- mask = 0
- for ax in axes:
- mask |= (1 << AXIS2NUMBER[ax])
- msg = ControlMsgAxisenable(mask)
- reply = self.controlMsgSyncReply(msg)
- if not reply.isOK():
- CNCCException.error("Failed to set axis mask: %s" % str(reply))
- def getJogState(self, axis):
- # Returns (direction, incremental, velocity)
- if not self.deviceAvailable:
- self.__deviceUnplugException()
- state = self.jogStates[axis]
- (direction, incremental, velocity) = state.get()
- retval = (direction.floatval,
- incremental,
- velocity.floatval)
- if incremental:
- state.reset()
- return retval
|