hardware_access_usb.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. """
  2. # TOP2049 Open Source programming suite
  3. #
  4. # Lowlevel USB hardware access.
  5. #
  6. # Copyright (c) 2012 Michael Buesch <m@bues.ch>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU General Public License as published by
  10. # the Free Software Foundation; either version 2 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU General Public License along
  19. # with this program; if not, write to the Free Software Foundation, Inc.,
  20. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  21. """
  22. from util import *
  23. from command_queue import *
  24. try:
  25. import usb
  26. except (ImportError), e:
  27. print "Python USB support module not found. Please install python-usb."
  28. sys.exit(1)
  29. class FoundUSBDev(object):
  30. def __init__(self, usbdev, busNr, devNr):
  31. self.usbdev = usbdev
  32. self.busNr = busNr
  33. self.devNr = devNr
  34. class HardwareAccessUSB(CommandQueue):
  35. "Lowlevel USB hardware access"
  36. TIMEOUT_MSEC = 2000
  37. @classmethod
  38. def scan(cls, checkCallback):
  39. "Scan for devices. Returns a list of FoundUSBDev()."
  40. devices = []
  41. for bus in usb.busses():
  42. for dev in bus.devices:
  43. if not checkCallback(dev):
  44. continue
  45. try:
  46. busNr = int(bus.dirname, 10)
  47. devNr = int(dev.filename, 10)
  48. except (ValueError), e:
  49. continue
  50. devices.append(FoundUSBDev(dev, busNr, devNr))
  51. return devices
  52. def __init__(self, usbdev, maxPacketBytes, noQueue,
  53. doRawDump=False):
  54. CommandQueue.__init__(self,
  55. maxPacketBytes = maxPacketBytes,
  56. synchronous = noQueue)
  57. self.doRawDump = doRawDump
  58. self.usbdev = usbdev
  59. self.usbh = None
  60. self.__initUSB()
  61. def __initUSB(self):
  62. try:
  63. self.usbh = self.usbdev.open()
  64. config = self.usbdev.configurations[0]
  65. interface = config.interfaces[0][0]
  66. # Find the endpoints
  67. self.bulkOut = None
  68. self.bulkIn = None
  69. for ep in interface.endpoints:
  70. if not self.bulkIn and \
  71. ep.type == usb.ENDPOINT_TYPE_BULK and \
  72. (ep.address & (usb.ENDPOINT_IN | usb.ENDPOINT_OUT)) == usb.ENDPOINT_IN:
  73. self.bulkIn = ep
  74. if not self.bulkOut and \
  75. ep.type == usb.ENDPOINT_TYPE_BULK and \
  76. (ep.address & (usb.ENDPOINT_IN | usb.ENDPOINT_OUT)) == usb.ENDPOINT_OUT:
  77. self.bulkOut = ep
  78. if not self.bulkIn or not self.bulkOut:
  79. raise TOPException("Did not find all USB EPs")
  80. self.usbh.setConfiguration(config)
  81. self.usbh.claimInterface(interface)
  82. self.usbh.setAltInterface(interface)
  83. self.usbh.clearHalt(self.bulkOut.address)
  84. self.usbh.clearHalt(self.bulkIn.address)
  85. except (usb.USBError), e:
  86. self.usbh = None
  87. raise TOPException("USB error: " + str(e))
  88. def shutdown(self):
  89. "Shutdown the USB connection"
  90. try:
  91. if self.usbh:
  92. self.usbh.releaseInterface()
  93. self.usbh = None
  94. except (usb.USBError), e:
  95. raise TOPException("USB error: " + str(e))
  96. def send(self, data):
  97. try:
  98. assert(len(data) <= self.maxPacketBytes)
  99. if self.doRawDump:
  100. print("Sending command:")
  101. dumpMem(data)
  102. self.usbh.bulkWrite(self.bulkOut.address, data,
  103. self.TIMEOUT_MSEC)
  104. except (usb.USBError), e:
  105. raise TOPException("USB bulk write error: " + str(e))
  106. def receive(self, size):
  107. """Receive 'size' bytes on the bulk-in ep."""
  108. # If there are blocked commands in the queue, send them now.
  109. self.flushCommands()
  110. try:
  111. ep = self.bulkIn.address
  112. data = b"".join([ int2byte(b) for b in
  113. self.usbh.bulkRead(ep, size,
  114. self.TIMEOUT_MSEC) ])
  115. if len(data) != size:
  116. raise TOPException("USB bulk read error: Could not read the " +\
  117. "requested number of bytes (req %d, got %d)" % (size, len(data)))
  118. if self.doRawDump:
  119. print("Received data:")
  120. dumpMem(data)
  121. except (usb.USBError), e:
  122. raise TOPException("USB bulk read error: " + str(e))
  123. return data