gpg.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. """Utilities for signed files
  2. @contact: Debian FTP Master <ftpmaster@debian.org>
  3. @copyright: 2011-2018 Ansgar Burchardt <ansgar@debian.org>
  4. @license: GNU General Public License version 2 or later
  5. """
  6. # This program is free software; you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation; either version 2 of the License, or
  9. # (at your option) any later version.
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. # You should have received a copy of the GNU General Public License
  15. # along with this program; if not, write to the Free Software
  16. # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  17. import apt_pkg
  18. import datetime
  19. import fcntl
  20. import os
  21. import select
  22. import daklib.daksubprocess
  23. try:
  24. _MAXFD = os.sysconf("SC_OPEN_MAX")
  25. except:
  26. _MAXFD = 256
  27. class GpgException(Exception):
  28. pass
  29. class _Pipe(object):
  30. """context manager for pipes
  31. Note: When the pipe is closed by other means than the close_r and close_w
  32. methods, you have to set self.r (self.w) to None.
  33. """
  34. def __enter__(self):
  35. (self.r, self.w) = os.pipe()
  36. return self
  37. def __exit__(self, type, value, traceback):
  38. self.close_w()
  39. self.close_r()
  40. return False
  41. def close_r(self):
  42. """close reading side of the pipe"""
  43. if self.r:
  44. os.close(self.r)
  45. self.r = None
  46. def close_w(self):
  47. """close writing part of the pipe"""
  48. if self.w:
  49. os.close(self.w)
  50. self.w = None
  51. class SignedFile(object):
  52. """handle files signed with PGP
  53. The following attributes are available:
  54. contents - string with the content (after removing PGP armor)
  55. valid - Boolean indicating a valid signature was found
  56. weak_signature - signature uses a weak algorithm (e.g. SHA-1)
  57. fingerprint - fingerprint of the key used for signing
  58. primary_fingerprint - fingerprint of the primary key associated to the key used for signing
  59. """
  60. def __init__(self, data, keyrings, require_signature=True, gpg="/usr/bin/gpg"):
  61. """
  62. @param data: string containing the message
  63. @param keyrings: sequence of keyrings
  64. @param require_signature: if True (the default), will raise an exception if no valid signature was found
  65. @param gpg: location of the gpg binary
  66. """
  67. self.gpg = gpg
  68. self.keyrings = keyrings
  69. self.valid = False
  70. self.expired = False
  71. self.invalid = False
  72. self.weak_signature = False
  73. self.fingerprints = []
  74. self.primary_fingerprints = []
  75. self.signature_ids = []
  76. self._verify(data, require_signature)
  77. @property
  78. def fingerprint(self):
  79. assert len(self.fingerprints) == 1
  80. return self.fingerprints[0]
  81. @property
  82. def primary_fingerprint(self):
  83. assert len(self.primary_fingerprints) == 1
  84. return self.primary_fingerprints[0]
  85. @property
  86. def signature_id(self):
  87. assert len(self.signature_ids) == 1
  88. return self.signature_ids[0]
  89. def _verify(self, data, require_signature):
  90. with _Pipe() as stdin, \
  91. _Pipe() as contents, \
  92. _Pipe() as status, \
  93. _Pipe() as stderr:
  94. pid = os.fork()
  95. if pid == 0:
  96. self._exec_gpg(stdin.r, contents.w, stderr.w, status.w)
  97. else:
  98. stdin.close_r()
  99. contents.close_w()
  100. stderr.close_w()
  101. status.close_w()
  102. read = self._do_io([contents.r, stderr.r, status.r], {stdin.w: data})
  103. stdin.w = None # was closed by _do_io
  104. (pid_, exit_code, usage_) = os.wait4(pid, 0)
  105. self.contents = read[contents.r]
  106. self.status = read[status.r]
  107. self.stderr = read[stderr.r]
  108. if self.status == "":
  109. raise GpgException("No status output from GPG. (GPG exited with status code %s)\n%s" % (exit_code, self.stderr))
  110. for line in self.status.splitlines():
  111. self._parse_status(line)
  112. if self.invalid:
  113. self.valid = False
  114. if require_signature and not self.valid:
  115. raise GpgException("No valid signature found. (GPG exited with status code %s)\n%s" % (exit_code, self.stderr))
  116. assert len(self.fingerprints) == len(self.primary_fingerprints)
  117. assert len(self.fingerprints) == len(self.signature_ids)
  118. def _do_io(self, read, write):
  119. for fd in write.keys():
  120. old = fcntl.fcntl(fd, fcntl.F_GETFL)
  121. fcntl.fcntl(fd, fcntl.F_SETFL, old | os.O_NONBLOCK)
  122. read_lines = dict((fd, []) for fd in read)
  123. write_pos = dict((fd, 0) for fd in write)
  124. read_set = list(read)
  125. write_set = write.keys()
  126. while len(read_set) > 0 or len(write_set) > 0:
  127. r, w, x_ = select.select(read_set, write_set, ())
  128. for fd in r:
  129. data = os.read(fd, 4096)
  130. if data == "":
  131. read_set.remove(fd)
  132. read_lines[fd].append(data)
  133. for fd in w:
  134. data = write[fd][write_pos[fd]:]
  135. if data == "":
  136. os.close(fd)
  137. write_set.remove(fd)
  138. else:
  139. bytes_written = os.write(fd, data)
  140. write_pos[fd] += bytes_written
  141. return dict((fd, "".join(read_lines[fd])) for fd in read_lines.keys())
  142. def _parse_timestamp(self, timestamp, datestring=None):
  143. """parse timestamp in GnuPG's format
  144. @rtype: L{datetime.datetime}
  145. @returns: datetime object for the given timestamp
  146. """
  147. # The old implementation did only return the date. As we already
  148. # used this for replay production, return the legacy value for
  149. # old signatures.
  150. if datestring is not None:
  151. year, month, day = datestring.split('-')
  152. date = datetime.date(int(year), int(month), int(day))
  153. time = datetime.time(0, 0)
  154. if date < datetime.date(2014, 8, 4):
  155. return datetime.datetime.combine(date, time)
  156. if 'T' in timestamp:
  157. raise Exception('No support for ISO 8601 timestamps.')
  158. return datetime.datetime.utcfromtimestamp(long(timestamp))
  159. def _parse_status(self, line):
  160. fields = line.split()
  161. if fields[0] != "[GNUPG:]":
  162. raise GpgException("Unexpected output on status-fd: %s" % line)
  163. # VALIDSIG <fingerprint in hex> <sig_creation_date> <sig-timestamp>
  164. # <expire-timestamp> <sig-version> <reserved> <pubkey-algo>
  165. # <hash-algo> <sig-class> <primary-key-fpr>
  166. if fields[1] == "VALIDSIG":
  167. # GnuPG accepted MD5 as a hash algorithm until gnupg 1.4.20,
  168. # which Debian 8 does not yet include. We want to make sure
  169. # to not accept uploads covered by a MD5-based signature.
  170. # RFC 4880, table 9.4:
  171. # 1 - MD5
  172. # 2 - SHA-1
  173. # 3 - RIPE-MD/160
  174. if fields[9] == "1":
  175. raise GpgException("Digest algorithm MD5 is not trusted.")
  176. if fields[9] in ("2", "3"):
  177. self.weak_signature = True
  178. self.valid = True
  179. self.fingerprints.append(fields[2])
  180. self.primary_fingerprints.append(fields[11])
  181. self.signature_timestamp = self._parse_timestamp(fields[4], fields[3])
  182. elif fields[1] == "BADARMOR":
  183. raise GpgException("Bad armor.")
  184. elif fields[1] == "NODATA":
  185. raise GpgException("No data.")
  186. elif fields[1] == "DECRYPTION_FAILED":
  187. raise GpgException("Decryption failed.")
  188. elif fields[1] == "ERROR":
  189. raise GpgException("Other error: %s %s" % (fields[2], fields[3]))
  190. elif fields[1] == "SIG_ID":
  191. self.signature_ids.append(fields[2])
  192. elif fields[1] in ('PLAINTEXT', 'GOODSIG', 'KEY_CONSIDERED',
  193. 'NEWSIG', 'NOTATION_NAME', 'NOTATION_FLAGS',
  194. 'NOTATION_DATA', 'SIGEXPIRED', 'KEYEXPIRED',
  195. 'POLICY_URL', 'PROGRESS'):
  196. pass
  197. elif fields[1] in ('EXPSIG', 'EXPKEYSIG'):
  198. self.expired = True
  199. self.invalid = True
  200. elif fields[1] in ('REVKEYSIG', 'BADSIG', 'ERRSIG', 'KEYREVOKED', 'NO_PUBKEY'):
  201. self.invalid = True
  202. else:
  203. raise GpgException("Keyword '{0}' from GnuPG was not expected.".format(fields[1]))
  204. def _exec_gpg(self, stdin, stdout, stderr, statusfd):
  205. try:
  206. if stdin != 0:
  207. os.dup2(stdin, 0)
  208. if stdout != 1:
  209. os.dup2(stdout, 1)
  210. if stderr != 2:
  211. os.dup2(stderr, 2)
  212. if statusfd != 3:
  213. os.dup2(statusfd, 3)
  214. for fd in range(4):
  215. old = fcntl.fcntl(fd, fcntl.F_GETFD)
  216. fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC)
  217. os.closerange(4, _MAXFD)
  218. args = [self.gpg,
  219. "--status-fd=3",
  220. "--no-default-keyring",
  221. "--batch",
  222. "--no-tty",
  223. "--trust-model", "always",
  224. "--fixed-list-mode"]
  225. for k in self.keyrings:
  226. args.extend(["--keyring", k])
  227. args.extend(["--decrypt", "-"])
  228. os.execvp(self.gpg, args)
  229. finally:
  230. os._exit(1)
  231. def contents_sha1(self):
  232. return apt_pkg.sha1sum(self.contents)
  233. def sign(infile, outfile, keyids=[], inline=False, pubring=None, secring=None, homedir=None, passphrase_file=None):
  234. args = [
  235. '/usr/bin/gpg',
  236. '--no-options', '--no-tty', '--batch', '--armour',
  237. '--personal-digest-preferences', 'SHA256',
  238. ]
  239. for keyid in keyids:
  240. args.extend(['--local-user', keyid])
  241. if pubring is not None:
  242. args.extend(['--keyring', pubring])
  243. if secring is not None:
  244. args.extend(['--secret-keyring', secring])
  245. if homedir is not None:
  246. args.extend(['--homedir', homedir])
  247. if passphrase_file is not None:
  248. args.extend(['--pinentry-mode', 'loopback',
  249. '--passphrase-file', passphrase_file])
  250. args.append('--clearsign' if inline else '--detach-sign')
  251. daklib.daksubprocess.check_call(args, stdin=infile, stdout=outfile)
  252. # vim: set sw=4 et: