wpa_supplicant.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. #!/usr/bin/python
  2. # -*- coding: utf-8 -*-
  3. import dbus
  4. WPAS_DBUS_OPATH = "/fi/epitest/hostap/WPASupplicant"
  5. WPAS_DBUS_INTERFACES_OPATH = "/fi/epitest/hostap/WPASupplicant/Interfaces"
  6. WPAS_DBUS_SERVICE = "fi.epitest.hostap.WPASupplicant"
  7. WPAS_DBUS_INTERFACE = "fi.epitest.hostap.WPASupplicant"
  8. WPAS_DBUS_INTERFACES_INTERFACE = "fi.epitest.hostap.WPASupplicant.Interface"
  9. WPAS_DBUS_NETWORK_INTERFACE = "fi.epitest.hostap.WPASupplicant.Network"
  10. WPAS_DBUS_BSSID_INTERFACE = "fi.epitest.hostap.WPASupplicant.BSSID"
  11. TIMEOUT = 60
  12. class PasswordLengthError(Exception):
  13. pass
  14. class WPA_Supplicant_Network:
  15. def __init__(self, bus, path):
  16. self.bus = bus
  17. self.path = path
  18. self.net_obj = self.bus.get_object(WPAS_DBUS_SERVICE, path)
  19. self.net = dbus.Interface(self.net_obj, WPAS_DBUS_NETWORK_INTERFACE)
  20. # dict keys: ssid, bssid, key_mgmt, psk, scan_ssid, pairwise, group, eap, identity,
  21. # anonymous_identity, ca_cert, ca_cert2, client_cert, client_cert2, private_key, private_key2,
  22. # private_key_passwd, private_key2_passwd, phase1, phase2, eapol_flags
  23. #
  24. # Example: setNetwork({"ssid":dbus.String("MySSID", variant_level=1)),
  25. # "psk":dbus.String("MyPassword", variant_level=1))})
  26. def setNetwork(self, options):
  27. self.net.set(options)
  28. def enableNetwork(self):
  29. self.net.enable()
  30. def disableNetwork(self):
  31. self.net.disable()
  32. class WPA_Supplicant_Interface:
  33. def __init__(self, bus, ifname, path):
  34. self.bus = bus
  35. self.ifname = ifname
  36. self.path = path
  37. self.if_obj = self.bus.get_object(WPAS_DBUS_SERVICE, path)
  38. self.iface = dbus.Interface(self. if_obj, WPAS_DBUS_INTERFACES_INTERFACE)
  39. def scan(self):
  40. self.iface.scan()
  41. def scanResults(self):
  42. return self.iface.scanResults()
  43. def addNetwork(self):
  44. path = self.iface.addNetwork()
  45. return self.getNetwork(path)
  46. def removeNetwork(self, network):
  47. self.iface.removeNetwork(network)
  48. def selectNetwork(self, network):
  49. self.iface.selectNetwork(network)
  50. def getNetworkPath(self, network_id):
  51. return "%s/Networks/%d" % (self.path, network_id)
  52. def getNetworkById(self, network_id):
  53. return WPA_Supplicant_Network(self.bus, self.getNetworkPath(network_id))
  54. def getNetwork(self, network):
  55. return WPA_Supplicant_Network(self.bus, network)
  56. # mode should be between 0 and 2
  57. def setAPScan(self, mode):
  58. self.iface.setAPScan(dbus.UInt32(mode))
  59. def disconnect(self):
  60. self.iface.disconnect()
  61. def getState(self):
  62. return self.iface.state()
  63. def getCapabilities(self):
  64. return self.iface.capabilities()
  65. class WPA_Supplicant:
  66. def __init__(self):
  67. self.bus = dbus.SystemBus()
  68. self.wpas_obj = self.bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_OPATH)
  69. self.wpas = dbus.Interface(self.wpas_obj, WPAS_DBUS_INTERFACE)
  70. def getInterface(self, ifname):
  71. path = self.wpas.getInterface(ifname)
  72. return WPA_Supplicant_Interface(self.bus, ifname, path)
  73. # driver=wext, hostap, prism54, madwifi, atmel, ndiswrapper, ipw, wired
  74. def addInterface(self, ifname, driver):
  75. self.wpas.addInterface(ifname, {'driver': dbus.String(driver, variant_level=1)})
  76. return self.getInterface(ifname)
  77. def removeInterface(self, ifname):
  78. try:
  79. iface_path = self.wpas.getInterface(ifname)
  80. except dbus.DBusException:
  81. iface_path = None
  82. if iface_path:
  83. self.wpas.removeInterface(dbus.ObjectPath(iface_path))
  84. def detectWpaDriver(ifname):
  85. import os
  86. import sys
  87. import pardus.netutils
  88. # if device is an ethernet device, driver is "wired"
  89. device = pardus.netutils.IF(ifname)
  90. if (device.isEthernet()) and (not device.isWireless()):
  91. return "wired"
  92. path = os.path.join("/sys/class/net/", ifname, "device/driver/module")
  93. modname = None
  94. if os.path.exists(path):
  95. modname = os.readlink(path).split("/")[-1]
  96. if "hostap" in modname:
  97. return "hostap"
  98. if "prism54" in modname:
  99. return "prism54"
  100. if "atmel" in modname:
  101. return "atmel"
  102. # we fallback to wext
  103. # wext is the generic driver for ipw2100, ipw2200, ndiswrapper > 1.12, madwifi etc...
  104. return "wext"
  105. def getWpaInterface(ifname):
  106. wpa = WPA_Supplicant()
  107. try:
  108. iface = wpa.getInterface(ifname)
  109. except dbus.DBusException:
  110. driver = detectWpaDriver(ifname)
  111. iface = wpa.addInterface(ifname, driver)
  112. return iface
  113. def waitForAuthenticationComplete(iface, timeout, wait = 0.1):
  114. import time
  115. while timeout > 0:
  116. if iface.getState() == "COMPLETED":
  117. return True
  118. else:
  119. timeout -= wait
  120. time.sleep(wait)
  121. return False
  122. def checkServiceState(serviceName):
  123. bus = dbus.SystemBus()
  124. obj = bus.get_object("tr.org.pardus.comar", "/package/%s" % serviceName)
  125. state = obj.info(dbus_interface="tr.org.pardus.comar.System.Service")[2]
  126. return state in ("on", "started")
  127. def isDBusServiceActive():
  128. return True
  129. def isWpaServiceActive():
  130. return checkServiceState("wpa_supplicant")
  131. def isWpaServiceUsable():
  132. return isDBusServiceActive() and isWpaServiceActive()
  133. def startWpaService():
  134. if not isWpaServiceActive():
  135. bus = dbus.SystemBus()
  136. obj = bus.get_object("tr.org.pardus.comar", "/package/wpa_supplicant")
  137. obj.start(dbus_interface="tr.org.pardus.comar.System.Service")
  138. import time
  139. timeout = 10
  140. while timeout > 0:
  141. try:
  142. bus = dbus.SystemBus()
  143. net_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_OPATH)
  144. net = dbus.Interface(net_obj, WPAS_DBUS_NETWORK_INTERFACE)
  145. except dbus.DBusException:
  146. time.sleep(0.1)
  147. timeout -= 0.2
  148. continue
  149. return True
  150. return False
  151. def setWpaAuthentication(ifname, ssid, password, timeout = TIMEOUT):
  152. password_length = len(password)
  153. if (password_length < 8) or (password_length > 63):
  154. raise PasswordLengthError("Password length should be between 8 and 63")
  155. iface = getWpaInterface(ifname)
  156. network = iface.addNetwork()
  157. network.setNetwork({"ssid": dbus.String(ssid, variant_level=1), "psk": dbus.String(password, variant_level=1)})
  158. iface.selectNetwork(network.path)
  159. authentication = waitForAuthenticationComplete(iface, timeout)
  160. if not authentication:
  161. disableAuthentication(ifname)
  162. return authentication
  163. def disableAuthentication(ifname):
  164. wpa = WPA_Supplicant()
  165. wpa.removeInterface(ifname)
  166. bus = dbus.SystemBus()
  167. obj = bus.get_object("tr.org.pardus.comar", "/package/wpa_supplicant")
  168. obj.stop(dbus_interface="tr.org.pardus.comar.System.Service")
  169. class Wpa_EAP:
  170. ssid = ""
  171. phase1 = ""
  172. phase2 = ""
  173. key_mgmt = "IEEE8021X"
  174. eap = "PEAP"
  175. anonymous_identity = ""
  176. ca_cert = ""
  177. client_cert = ""
  178. private_key = ""
  179. private_key_passwd = ""
  180. def __init__(self, ifname):
  181. self.ifname = ifname
  182. self.iface = getWpaInterface(ifname)
  183. self.network = self.iface.addNetwork()
  184. def authenticate(self, username, password, timeout = TIMEOUT):
  185. basic = {"ssid": dbus.String(self.ssid, variant_level=1),
  186. "key_mgmt": dbus.String(self.key_mgmt, variant_level=1),
  187. "eap": dbus.String(self.eap, variant_level=1),
  188. "identity": dbus.String(username, variant_level=1)}
  189. if self.client_cert:
  190. basic["client_cert"] = dbus.String(self.client_cert, variant_level=1)
  191. if self.ca_cert:
  192. basic["ca_cert"] = dbus.String(self.ca_cert, variant_level=1)
  193. if self.private_key:
  194. basic["private_key"] = dbus.String(self.private_key, variant_level=1)
  195. if self.private_key_passwd:
  196. basic["private_key_passwd"] = dbus.String(self.private_key_passwd, variant_level=1)
  197. if self.phase2:
  198. basic["phase2"] = dbus.String("auth=%s"%self.phase2, variant_level=1)
  199. if password:
  200. basic["password"] = dbus.String(password, variant_level=1)
  201. if self.anonymous_identity:
  202. basic["anonymous_identity"] = dbus.String(self.anonymous_identity, variant_level=1)
  203. self.network.setNetwork(basic)
  204. self.iface.selectNetwork(self.network.path)
  205. authentication = waitForAuthenticationComplete(self.iface, timeout)
  206. if not authentication:
  207. disableAuthentication(self.ifname)
  208. return authentication