gpg.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. """Utilities for signed files
  2. @contact: Debian FTP Master <ftpmaster@debian.org>
  3. @copyright: 2011-2018 Ansgar Burchardt <ansgar@debian.org>
  4. @license: GNU General Public License version 2 or later
  5. """
  6. # This program is free software; you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation; either version 2 of the License, or
  9. # (at your option) any later version.
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. # You should have received a copy of the GNU General Public License
  15. # along with this program; if not, write to the Free Software
  16. # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  17. import apt_pkg
  18. import datetime
  19. import fcntl
  20. import os
  21. import select
  22. import subprocess
  23. try:
  24. _MAXFD = os.sysconf("SC_OPEN_MAX")
  25. except:
  26. _MAXFD = 256
  27. class GpgException(Exception):
  28. pass
  29. class _Pipe:
  30. """context manager for pipes
  31. Note: When the pipe is closed by other means than the close_r and close_w
  32. methods, you have to set self.r (self.w) to None.
  33. """
  34. def __enter__(self):
  35. (self.r, self.w) = os.pipe()
  36. return self
  37. def __exit__(self, type, value, traceback):
  38. self.close_w()
  39. self.close_r()
  40. return False
  41. def close_r(self):
  42. """close reading side of the pipe"""
  43. if self.r:
  44. os.close(self.r)
  45. self.r = None
  46. def close_w(self):
  47. """close writing part of the pipe"""
  48. if self.w:
  49. os.close(self.w)
  50. self.w = None
  51. # Can be replaced with `os.waitstatus_to_exitcode` (Python 3.9)
  52. def waitstatus_to_exitcode(status):
  53. if os.WIFEXITED(status):
  54. return os.WEXITSTATUS(status)
  55. elif os.WIFSIGNALED(status):
  56. return -os.WTERMSIG(status)
  57. else:
  58. raise ValueError(f"Unexpected status code '{status}'")
  59. class SignedFile:
  60. """handle files signed with PGP
  61. The following attributes are available:
  62. contents - byte-string with the content (after removing PGP armor)
  63. valid - Boolean indicating a valid signature was found
  64. weak_signature - signature uses a weak algorithm (e.g. SHA-1)
  65. fingerprint - fingerprint of the key used for signing
  66. primary_fingerprint - fingerprint of the primary key associated to the key used for signing
  67. """
  68. def __init__(self, data, keyrings, require_signature=True, gpg="/usr/bin/gpg"):
  69. """
  70. @param data: byte-string containing the message
  71. @param keyrings: sequence of keyrings
  72. @param require_signature: if True (the default), will raise an exception if no valid signature was found
  73. @param gpg: location of the gpg binary
  74. """
  75. self.gpg = gpg
  76. self.keyrings = keyrings
  77. self.valid = False
  78. self.expired = False
  79. self.invalid = False
  80. self.weak_signature = False
  81. self.fingerprints = []
  82. self.primary_fingerprints = []
  83. self.signature_ids = []
  84. self._verify(data, require_signature)
  85. @property
  86. def fingerprint(self):
  87. assert len(self.fingerprints) == 1
  88. return self.fingerprints[0]
  89. @property
  90. def primary_fingerprint(self):
  91. assert len(self.primary_fingerprints) == 1
  92. return self.primary_fingerprints[0]
  93. @property
  94. def signature_id(self):
  95. assert len(self.signature_ids) == 1
  96. return self.signature_ids[0]
  97. def _verify(self, data, require_signature):
  98. with _Pipe() as stdin, \
  99. _Pipe() as contents, \
  100. _Pipe() as status, \
  101. _Pipe() as stderr:
  102. pid = os.fork()
  103. if pid == 0:
  104. self._exec_gpg(stdin.r, contents.w, stderr.w, status.w)
  105. else:
  106. stdin.close_r()
  107. contents.close_w()
  108. stderr.close_w()
  109. status.close_w()
  110. read = self._do_io([contents.r, stderr.r, status.r], {stdin.w: data})
  111. stdin.w = None # was closed by _do_io
  112. (pid_, status_code, usage_) = os.wait4(pid, 0)
  113. if pid_ != pid:
  114. raise Exception(f"wait4() waited for pid {pid_}, but we expected {pid}")
  115. exit_code = waitstatus_to_exitcode(status_code)
  116. self.contents = read[contents.r]
  117. self.status = read[status.r]
  118. self.stderr = read[stderr.r]
  119. if self.status == b"":
  120. stderr = self.stderr.decode('ascii', errors='replace')
  121. raise GpgException("No status output from GPG. (GPG exited with status code %s)\n%s" % (exit_code, stderr))
  122. # gpg exits with 0 (no error), 1 (at least one invalid sig),
  123. # or anything else (fatal error). Even though status 2
  124. # indicates a fatal error, we still allow it as it is also
  125. # returned when the public key is not known.
  126. if exit_code not in (0, 1, 2):
  127. stderr = self.stderr.decode('ascii', errors='replace')
  128. raise GpgException(f"GPG exited with a fatal error (exit code {exit_code})\n{stderr}")
  129. for line in self.status.splitlines():
  130. self._parse_status(line)
  131. if self.invalid:
  132. self.valid = False
  133. if require_signature and not self.valid:
  134. stderr = self.stderr.decode('ascii', errors='replace')
  135. raise GpgException("No valid signature found. (GPG exited with status code %s)\n%s" % (exit_code, stderr))
  136. assert len(self.fingerprints) == len(self.primary_fingerprints)
  137. assert len(self.fingerprints) == len(self.signature_ids)
  138. def _do_io(self, read, write):
  139. for fd in write:
  140. old = fcntl.fcntl(fd, fcntl.F_GETFL)
  141. fcntl.fcntl(fd, fcntl.F_SETFL, old | os.O_NONBLOCK)
  142. read_lines = dict((fd, []) for fd in read)
  143. write_pos = dict((fd, 0) for fd in write)
  144. read_set = list(read)
  145. write_set = list(write)
  146. while len(read_set) > 0 or len(write_set) > 0:
  147. r, w, x_ = select.select(read_set, write_set, ())
  148. for fd in r:
  149. data = os.read(fd, 4096)
  150. if len(data) == 0:
  151. read_set.remove(fd)
  152. else:
  153. read_lines[fd].append(data)
  154. for fd in w:
  155. data = write[fd][write_pos[fd]:]
  156. if len(data) == 0:
  157. os.close(fd)
  158. write_set.remove(fd)
  159. else:
  160. bytes_written = os.write(fd, data)
  161. write_pos[fd] += bytes_written
  162. return dict((fd, b"".join(read_lines[fd])) for fd in read_lines)
  163. def _parse_timestamp(self, timestamp, datestring=None):
  164. """parse timestamp in GnuPG's format
  165. @rtype: L{datetime.datetime}
  166. @returns: datetime object for the given timestamp
  167. """
  168. # The old implementation did only return the date. As we already
  169. # used this for replay production, return the legacy value for
  170. # old signatures.
  171. if datestring is not None:
  172. year, month, day = datestring.split(b'-')
  173. date = datetime.date(int(year), int(month), int(day))
  174. time = datetime.time(0, 0)
  175. if date < datetime.date(2014, 8, 4):
  176. return datetime.datetime.combine(date, time)
  177. if b'T' in timestamp:
  178. raise Exception('No support for ISO 8601 timestamps.')
  179. return datetime.datetime.utcfromtimestamp(int(timestamp))
  180. def _parse_status(self, line):
  181. fields = line.split()
  182. if fields[0] != b"[GNUPG:]":
  183. raise GpgException("Unexpected output on status-fd: %s" % line)
  184. # VALIDSIG <fingerprint in hex> <sig_creation_date> <sig-timestamp>
  185. # <expire-timestamp> <sig-version> <reserved> <pubkey-algo>
  186. # <hash-algo> <sig-class> <primary-key-fpr>
  187. if fields[1] == b"VALIDSIG":
  188. # GnuPG accepted MD5 as a hash algorithm until gnupg 1.4.20,
  189. # which Debian 8 does not yet include. We want to make sure
  190. # to not accept uploads covered by a MD5-based signature.
  191. # RFC 4880, table 9.4:
  192. # 1 - MD5
  193. # 2 - SHA-1
  194. # 3 - RIPE-MD/160
  195. if fields[9] == b"1":
  196. raise GpgException("Digest algorithm MD5 is not trusted.")
  197. if fields[9] in (b"2", b"3"):
  198. self.weak_signature = True
  199. self.valid = True
  200. self.fingerprints.append(fields[2].decode('ascii'))
  201. self.primary_fingerprints.append(fields[11].decode('ascii'))
  202. self.signature_timestamp = self._parse_timestamp(fields[4], fields[3])
  203. elif fields[1] == b"BADARMOR":
  204. raise GpgException("Bad armor.")
  205. elif fields[1] == b"NODATA":
  206. raise GpgException("No data.")
  207. elif fields[1] == b"DECRYPTION_FAILED":
  208. raise GpgException("Decryption failed.")
  209. elif fields[1] == b"ERROR":
  210. f2 = fields[2].decode('ascii', errors='replace')
  211. f3 = fields[3].decode('ascii', errors='replace')
  212. raise GpgException("Other error: %s %s" % (f2, f3))
  213. elif fields[1] == b"SIG_ID":
  214. self.signature_ids.append(fields[2])
  215. elif fields[1] in (b'PLAINTEXT', b'GOODSIG', b'KEY_CONSIDERED',
  216. b'NEWSIG', b'NOTATION_NAME', b'NOTATION_FLAGS',
  217. b'NOTATION_DATA', b'SIGEXPIRED', b'KEYEXPIRED',
  218. b'POLICY_URL', b'PROGRESS', b'VERIFICATION_COMPLIANCE_MODE'):
  219. pass
  220. elif fields[1] in (b'EXPSIG', b'EXPKEYSIG'):
  221. self.expired = True
  222. self.invalid = True
  223. elif fields[1] in (b'REVKEYSIG', b'BADSIG', b'ERRSIG', b'KEYREVOKED', b'NO_PUBKEY'):
  224. self.invalid = True
  225. else:
  226. field = fields[1].decode('ascii', errors='replace')
  227. raise GpgException("Keyword '{0}' from GnuPG was not expected.".format(field))
  228. def _exec_gpg(self, stdin, stdout, stderr, statusfd):
  229. try:
  230. if stdin != 0:
  231. os.dup2(stdin, 0)
  232. if stdout != 1:
  233. os.dup2(stdout, 1)
  234. if stderr != 2:
  235. os.dup2(stderr, 2)
  236. if statusfd != 3:
  237. os.dup2(statusfd, 3)
  238. for fd in range(4):
  239. old = fcntl.fcntl(fd, fcntl.F_GETFD)
  240. fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC)
  241. os.closerange(4, _MAXFD)
  242. args = [self.gpg,
  243. "--status-fd=3",
  244. "--no-default-keyring",
  245. "--batch",
  246. "--no-tty",
  247. "--trust-model", "always",
  248. "--fixed-list-mode"]
  249. for k in self.keyrings:
  250. args.extend(["--keyring", k])
  251. args.extend(["--decrypt", "-"])
  252. os.execvp(self.gpg, args)
  253. finally:
  254. try:
  255. print("Failed to execute gpg.", file=sys.stderr)
  256. sys.stderr.flush()
  257. except:
  258. # Ignore errors, we want to reach the `exit` call below.
  259. pass
  260. os._exit(3)
  261. def contents_sha1(self):
  262. return apt_pkg.sha1sum(self.contents)
  263. def sign(infile, outfile, keyids=[], inline=False, pubring=None, secring=None, homedir=None, passphrase_file=None):
  264. args = [
  265. '/usr/bin/gpg',
  266. '--no-options', '--no-tty', '--batch', '--armour',
  267. '--personal-digest-preferences', 'SHA256',
  268. ]
  269. for keyid in keyids:
  270. args.extend(['--local-user', keyid])
  271. if pubring is not None:
  272. args.extend(['--keyring', pubring])
  273. if secring is not None:
  274. args.extend(['--secret-keyring', secring])
  275. if homedir is not None:
  276. args.extend(['--homedir', homedir])
  277. if passphrase_file is not None:
  278. args.extend(['--pinentry-mode', 'loopback',
  279. '--passphrase-file', passphrase_file])
  280. args.append('--clearsign' if inline else '--detach-sign')
  281. subprocess.check_call(args, stdin=infile, stdout=outfile)
  282. # vim: set sw=4 et: