gpg.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. """Utilities for signed files
  2. @contact: Debian FTP Master <ftpmaster@debian.org>
  3. @copyright: 2011-2018 Ansgar Burchardt <ansgar@debian.org>
  4. @license: GNU General Public License version 2 or later
  5. """
  6. # This program is free software; you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation; either version 2 of the License, or
  9. # (at your option) any later version.
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. # You should have received a copy of the GNU General Public License
  15. # along with this program; if not, write to the Free Software
  16. # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  17. import apt_pkg
  18. import datetime
  19. import fcntl
  20. import os
  21. import select
  22. import subprocess
  23. try:
  24. _MAXFD = os.sysconf("SC_OPEN_MAX")
  25. except:
  26. _MAXFD = 256
  27. class GpgException(Exception):
  28. pass
  29. class _Pipe(object):
  30. """context manager for pipes
  31. Note: When the pipe is closed by other means than the close_r and close_w
  32. methods, you have to set self.r (self.w) to None.
  33. """
  34. def __enter__(self):
  35. (self.r, self.w) = os.pipe()
  36. return self
  37. def __exit__(self, type, value, traceback):
  38. self.close_w()
  39. self.close_r()
  40. return False
  41. def close_r(self):
  42. """close reading side of the pipe"""
  43. if self.r:
  44. os.close(self.r)
  45. self.r = None
  46. def close_w(self):
  47. """close writing part of the pipe"""
  48. if self.w:
  49. os.close(self.w)
  50. self.w = None
  51. class SignedFile(object):
  52. """handle files signed with PGP
  53. The following attributes are available:
  54. contents - byte-string with the content (after removing PGP armor)
  55. valid - Boolean indicating a valid signature was found
  56. weak_signature - signature uses a weak algorithm (e.g. SHA-1)
  57. fingerprint - fingerprint of the key used for signing
  58. primary_fingerprint - fingerprint of the primary key associated to the key used for signing
  59. """
  60. def __init__(self, data, keyrings, require_signature=True, gpg="/usr/bin/gpg"):
  61. """
  62. @param data: byte-string containing the message
  63. @param keyrings: sequence of keyrings
  64. @param require_signature: if True (the default), will raise an exception if no valid signature was found
  65. @param gpg: location of the gpg binary
  66. """
  67. self.gpg = gpg
  68. self.keyrings = keyrings
  69. self.valid = False
  70. self.expired = False
  71. self.invalid = False
  72. self.weak_signature = False
  73. self.fingerprints = []
  74. self.primary_fingerprints = []
  75. self.signature_ids = []
  76. self._verify(data, require_signature)
  77. @property
  78. def fingerprint(self):
  79. assert len(self.fingerprints) == 1
  80. return self.fingerprints[0]
  81. @property
  82. def primary_fingerprint(self):
  83. assert len(self.primary_fingerprints) == 1
  84. return self.primary_fingerprints[0]
  85. @property
  86. def signature_id(self):
  87. assert len(self.signature_ids) == 1
  88. return self.signature_ids[0]
  89. def _verify(self, data, require_signature):
  90. with _Pipe() as stdin, \
  91. _Pipe() as contents, \
  92. _Pipe() as status, \
  93. _Pipe() as stderr:
  94. pid = os.fork()
  95. if pid == 0:
  96. self._exec_gpg(stdin.r, contents.w, stderr.w, status.w)
  97. else:
  98. stdin.close_r()
  99. contents.close_w()
  100. stderr.close_w()
  101. status.close_w()
  102. read = self._do_io([contents.r, stderr.r, status.r], {stdin.w: data})
  103. stdin.w = None # was closed by _do_io
  104. (pid_, exit_code, usage_) = os.wait4(pid, 0)
  105. self.contents = read[contents.r]
  106. self.status = read[status.r]
  107. self.stderr = read[stderr.r]
  108. if self.status == b"":
  109. stderr = self.stderr.decode('ascii', errors='replace')
  110. raise GpgException("No status output from GPG. (GPG exited with status code %s)\n%s" % (exit_code, stderr))
  111. for line in self.status.splitlines():
  112. self._parse_status(line)
  113. if self.invalid:
  114. self.valid = False
  115. if require_signature and not self.valid:
  116. stderr = self.stderr.decode('ascii', errors='replace')
  117. raise GpgException("No valid signature found. (GPG exited with status code %s)\n%s" % (exit_code, stderr))
  118. assert len(self.fingerprints) == len(self.primary_fingerprints)
  119. assert len(self.fingerprints) == len(self.signature_ids)
  120. def _do_io(self, read, write):
  121. for fd in write:
  122. old = fcntl.fcntl(fd, fcntl.F_GETFL)
  123. fcntl.fcntl(fd, fcntl.F_SETFL, old | os.O_NONBLOCK)
  124. read_lines = dict((fd, []) for fd in read)
  125. write_pos = dict((fd, 0) for fd in write)
  126. read_set = list(read)
  127. write_set = list(write)
  128. while len(read_set) > 0 or len(write_set) > 0:
  129. r, w, x_ = select.select(read_set, write_set, ())
  130. for fd in r:
  131. data = os.read(fd, 4096)
  132. if len(data) == 0:
  133. read_set.remove(fd)
  134. else:
  135. read_lines[fd].append(data)
  136. for fd in w:
  137. data = write[fd][write_pos[fd]:]
  138. if len(data) == 0:
  139. os.close(fd)
  140. write_set.remove(fd)
  141. else:
  142. bytes_written = os.write(fd, data)
  143. write_pos[fd] += bytes_written
  144. return dict((fd, b"".join(read_lines[fd])) for fd in read_lines)
  145. def _parse_timestamp(self, timestamp, datestring=None):
  146. """parse timestamp in GnuPG's format
  147. @rtype: L{datetime.datetime}
  148. @returns: datetime object for the given timestamp
  149. """
  150. # The old implementation did only return the date. As we already
  151. # used this for replay production, return the legacy value for
  152. # old signatures.
  153. if datestring is not None:
  154. year, month, day = datestring.split(b'-')
  155. date = datetime.date(int(year), int(month), int(day))
  156. time = datetime.time(0, 0)
  157. if date < datetime.date(2014, 8, 4):
  158. return datetime.datetime.combine(date, time)
  159. if b'T' in timestamp:
  160. raise Exception('No support for ISO 8601 timestamps.')
  161. return datetime.datetime.utcfromtimestamp(int(timestamp))
  162. def _parse_status(self, line):
  163. fields = line.split()
  164. if fields[0] != b"[GNUPG:]":
  165. raise GpgException("Unexpected output on status-fd: %s" % line)
  166. # VALIDSIG <fingerprint in hex> <sig_creation_date> <sig-timestamp>
  167. # <expire-timestamp> <sig-version> <reserved> <pubkey-algo>
  168. # <hash-algo> <sig-class> <primary-key-fpr>
  169. if fields[1] == b"VALIDSIG":
  170. # GnuPG accepted MD5 as a hash algorithm until gnupg 1.4.20,
  171. # which Debian 8 does not yet include. We want to make sure
  172. # to not accept uploads covered by a MD5-based signature.
  173. # RFC 4880, table 9.4:
  174. # 1 - MD5
  175. # 2 - SHA-1
  176. # 3 - RIPE-MD/160
  177. if fields[9] == b"1":
  178. raise GpgException("Digest algorithm MD5 is not trusted.")
  179. if fields[9] in (b"2", b"3"):
  180. self.weak_signature = True
  181. self.valid = True
  182. self.fingerprints.append(fields[2].decode('ascii'))
  183. self.primary_fingerprints.append(fields[11].decode('ascii'))
  184. self.signature_timestamp = self._parse_timestamp(fields[4], fields[3])
  185. elif fields[1] == b"BADARMOR":
  186. raise GpgException("Bad armor.")
  187. elif fields[1] == b"NODATA":
  188. raise GpgException("No data.")
  189. elif fields[1] == b"DECRYPTION_FAILED":
  190. raise GpgException("Decryption failed.")
  191. elif fields[1] == b"ERROR":
  192. f2 = fields[2].decode('ascii', errors='replace')
  193. f3 = fields[3].decode('ascii', errors='replace')
  194. raise GpgException("Other error: %s %s" % (f2, f3))
  195. elif fields[1] == b"SIG_ID":
  196. self.signature_ids.append(fields[2])
  197. elif fields[1] in (b'PLAINTEXT', b'GOODSIG', b'KEY_CONSIDERED',
  198. b'NEWSIG', b'NOTATION_NAME', b'NOTATION_FLAGS',
  199. b'NOTATION_DATA', b'SIGEXPIRED', b'KEYEXPIRED',
  200. b'POLICY_URL', b'PROGRESS', b'VERIFICATION_COMPLIANCE_MODE'):
  201. pass
  202. elif fields[1] in (b'EXPSIG', b'EXPKEYSIG'):
  203. self.expired = True
  204. self.invalid = True
  205. elif fields[1] in (b'REVKEYSIG', b'BADSIG', b'ERRSIG', b'KEYREVOKED', b'NO_PUBKEY'):
  206. self.invalid = True
  207. else:
  208. field = fields[1].decode('ascii', errors='replace')
  209. raise GpgException("Keyword '{0}' from GnuPG was not expected.".format(field))
  210. def _exec_gpg(self, stdin, stdout, stderr, statusfd):
  211. try:
  212. if stdin != 0:
  213. os.dup2(stdin, 0)
  214. if stdout != 1:
  215. os.dup2(stdout, 1)
  216. if stderr != 2:
  217. os.dup2(stderr, 2)
  218. if statusfd != 3:
  219. os.dup2(statusfd, 3)
  220. for fd in range(4):
  221. old = fcntl.fcntl(fd, fcntl.F_GETFD)
  222. fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC)
  223. os.closerange(4, _MAXFD)
  224. args = [self.gpg,
  225. "--status-fd=3",
  226. "--no-default-keyring",
  227. "--batch",
  228. "--no-tty",
  229. "--trust-model", "always",
  230. "--fixed-list-mode"]
  231. for k in self.keyrings:
  232. args.extend(["--keyring", k])
  233. args.extend(["--decrypt", "-"])
  234. os.execvp(self.gpg, args)
  235. finally:
  236. os._exit(1)
  237. def contents_sha1(self):
  238. return apt_pkg.sha1sum(self.contents)
  239. def sign(infile, outfile, keyids=[], inline=False, pubring=None, secring=None, homedir=None, passphrase_file=None):
  240. args = [
  241. '/usr/bin/gpg',
  242. '--no-options', '--no-tty', '--batch', '--armour',
  243. '--personal-digest-preferences', 'SHA256',
  244. ]
  245. for keyid in keyids:
  246. args.extend(['--local-user', keyid])
  247. if pubring is not None:
  248. args.extend(['--keyring', pubring])
  249. if secring is not None:
  250. args.extend(['--secret-keyring', secring])
  251. if homedir is not None:
  252. args.extend(['--homedir', homedir])
  253. if passphrase_file is not None:
  254. args.extend(['--pinentry-mode', 'loopback',
  255. '--passphrase-file', passphrase_file])
  256. args.append('--clearsign' if inline else '--detach-sign')
  257. subprocess.check_call(args, stdin=infile, stdout=outfile)
  258. # vim: set sw=4 et: