gpg.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. """Utilities for signed files
  2. @contact: Debian FTP Master <ftpmaster@debian.org>
  3. @copyright: 2011-2018 Ansgar Burchardt <ansgar@debian.org>
  4. @license: GNU General Public License version 2 or later
  5. """
  6. # This program is free software; you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation; either version 2 of the License, or
  9. # (at your option) any later version.
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. # You should have received a copy of the GNU General Public License
  15. # along with this program; if not, write to the Free Software
  16. # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  17. import apt_pkg
  18. import datetime
  19. import fcntl
  20. import os
  21. import select
  22. import subprocess
  23. from collections.abc import Iterable
  24. try:
  25. _MAXFD = os.sysconf("SC_OPEN_MAX")
  26. except:
  27. _MAXFD = 256
  28. class GpgException(Exception):
  29. pass
  30. class _Pipe:
  31. """context manager for pipes
  32. Note: When the pipe is closed by other means than the close_r and close_w
  33. methods, you have to set self.r (self.w) to None.
  34. """
  35. def __enter__(self):
  36. (self.r, self.w) = os.pipe()
  37. return self
  38. def __exit__(self, type, value, traceback):
  39. self.close_w()
  40. self.close_r()
  41. return False
  42. def close_r(self):
  43. """close reading side of the pipe"""
  44. if self.r:
  45. os.close(self.r)
  46. self.r = None
  47. def close_w(self):
  48. """close writing part of the pipe"""
  49. if self.w:
  50. os.close(self.w)
  51. self.w = None
  52. # Can be replaced with `os.waitstatus_to_exitcode` (Python 3.9)
  53. def waitstatus_to_exitcode(status):
  54. if os.WIFEXITED(status):
  55. return os.WEXITSTATUS(status)
  56. elif os.WIFSIGNALED(status):
  57. return -os.WTERMSIG(status)
  58. else:
  59. raise ValueError(f"Unexpected status code '{status}'")
  60. class SignedFile:
  61. """handle files signed with PGP
  62. The following attributes are available:
  63. contents - byte-string with the content (after removing PGP armor)
  64. valid - Boolean indicating a valid signature was found
  65. weak_signature - signature uses a weak algorithm (e.g. SHA-1)
  66. fingerprint - fingerprint of the key used for signing
  67. primary_fingerprint - fingerprint of the primary key associated to the key used for signing
  68. """
  69. def __init__(self, data: bytes, keyrings: Iterable[str], require_signature: bool = True, gpg: str = "/usr/bin/gpg"):
  70. """
  71. :param data: byte-string containing the message
  72. :param keyrings: sequence of keyrings
  73. :param require_signature: if True (the default), will raise an exception if no valid signature was found
  74. :param gpg: location of the gpg binary
  75. """
  76. self.gpg = gpg
  77. self.keyrings = keyrings
  78. self.valid: bool = False #: valid signature
  79. self.expired = False
  80. self.invalid = False
  81. self.weak_signature = False
  82. self.fingerprints: list[str] = []
  83. self.primary_fingerprints: list[str] = []
  84. self.signature_ids: list[str] = []
  85. self._verify(data, require_signature)
  86. @property
  87. def fingerprint(self) -> str:
  88. """fingerprint of the (sub)key used for the signature"""
  89. assert len(self.fingerprints) == 1
  90. return self.fingerprints[0]
  91. @property
  92. def primary_fingerprint(self) -> str:
  93. """fingerprint of the primary key used for the signature"""
  94. assert len(self.primary_fingerprints) == 1
  95. return self.primary_fingerprints[0]
  96. @property
  97. def signature_id(self):
  98. assert len(self.signature_ids) == 1
  99. return self.signature_ids[0]
  100. def _verify(self, data, require_signature):
  101. with _Pipe() as stdin, \
  102. _Pipe() as contents, \
  103. _Pipe() as status, \
  104. _Pipe() as stderr:
  105. pid = os.fork()
  106. if pid == 0:
  107. self._exec_gpg(stdin.r, contents.w, stderr.w, status.w)
  108. else:
  109. stdin.close_r()
  110. contents.close_w()
  111. stderr.close_w()
  112. status.close_w()
  113. read = self._do_io([contents.r, stderr.r, status.r], {stdin.w: data})
  114. stdin.w = None # was closed by _do_io
  115. (pid_, status_code, usage_) = os.wait4(pid, 0)
  116. if pid_ != pid:
  117. raise Exception(f"wait4() waited for pid {pid_}, but we expected {pid}")
  118. exit_code = waitstatus_to_exitcode(status_code)
  119. self.contents = read[contents.r]
  120. self.status = read[status.r]
  121. self.stderr = read[stderr.r]
  122. if self.status == b"":
  123. stderr = self.stderr.decode('ascii', errors='replace')
  124. raise GpgException("No status output from GPG. (GPG exited with status code %s)\n%s" % (exit_code, stderr))
  125. # gpg exits with 0 (no error), 1 (at least one invalid sig),
  126. # or anything else (fatal error). Even though status 2
  127. # indicates a fatal error, we still allow it as it is also
  128. # returned when the public key is not known.
  129. if exit_code not in (0, 1, 2):
  130. stderr = self.stderr.decode('ascii', errors='replace')
  131. raise GpgException(f"GPG exited with a fatal error (exit code {exit_code})\n{stderr}")
  132. for line in self.status.splitlines():
  133. self._parse_status(line)
  134. if self.invalid:
  135. self.valid = False
  136. if require_signature and not self.valid:
  137. stderr = self.stderr.decode('ascii', errors='replace')
  138. raise GpgException("No valid signature found. (GPG exited with status code %s)\n%s" % (exit_code, stderr))
  139. assert len(self.fingerprints) == len(self.primary_fingerprints)
  140. assert len(self.fingerprints) == len(self.signature_ids)
  141. def _do_io(self, read, write):
  142. for fd in write:
  143. old = fcntl.fcntl(fd, fcntl.F_GETFL)
  144. fcntl.fcntl(fd, fcntl.F_SETFL, old | os.O_NONBLOCK)
  145. read_lines = dict((fd, []) for fd in read)
  146. write_pos = dict((fd, 0) for fd in write)
  147. read_set = list(read)
  148. write_set = list(write)
  149. while len(read_set) > 0 or len(write_set) > 0:
  150. r, w, x_ = select.select(read_set, write_set, ())
  151. for fd in r:
  152. data = os.read(fd, 4096)
  153. if len(data) == 0:
  154. read_set.remove(fd)
  155. else:
  156. read_lines[fd].append(data)
  157. for fd in w:
  158. data = write[fd][write_pos[fd]:]
  159. if len(data) == 0:
  160. os.close(fd)
  161. write_set.remove(fd)
  162. else:
  163. bytes_written = os.write(fd, data)
  164. write_pos[fd] += bytes_written
  165. return dict((fd, b"".join(read_lines[fd])) for fd in read_lines)
  166. def _parse_timestamp(self, timestamp, datestring=None) -> datetime.datetime:
  167. """parse timestamp in GnuPG's format
  168. :return: datetime object for the given timestamp
  169. """
  170. # The old implementation did only return the date. As we already
  171. # used this for replay production, return the legacy value for
  172. # old signatures.
  173. if datestring is not None:
  174. year, month, day = datestring.split(b'-')
  175. date = datetime.date(int(year), int(month), int(day))
  176. time = datetime.time(0, 0)
  177. if date < datetime.date(2014, 8, 4):
  178. return datetime.datetime.combine(date, time)
  179. if b'T' in timestamp:
  180. raise Exception('No support for ISO 8601 timestamps.')
  181. return datetime.datetime.utcfromtimestamp(int(timestamp))
  182. def _parse_status(self, line):
  183. fields = line.split()
  184. if fields[0] != b"[GNUPG:]":
  185. raise GpgException("Unexpected output on status-fd: %s" % line)
  186. # VALIDSIG <fingerprint in hex> <sig_creation_date> <sig-timestamp>
  187. # <expire-timestamp> <sig-version> <reserved> <pubkey-algo>
  188. # <hash-algo> <sig-class> <primary-key-fpr>
  189. if fields[1] == b"VALIDSIG":
  190. # GnuPG accepted MD5 as a hash algorithm until gnupg 1.4.20,
  191. # which Debian 8 does not yet include. We want to make sure
  192. # to not accept uploads covered by a MD5-based signature.
  193. # RFC 4880, table 9.4:
  194. # 1 - MD5
  195. # 2 - SHA-1
  196. # 3 - RIPE-MD/160
  197. if fields[9] == b"1":
  198. raise GpgException("Digest algorithm MD5 is not trusted.")
  199. if fields[9] in (b"2", b"3"):
  200. self.weak_signature = True
  201. self.valid = True
  202. self.fingerprints.append(fields[2].decode('ascii'))
  203. self.primary_fingerprints.append(fields[11].decode('ascii'))
  204. self.signature_timestamp = self._parse_timestamp(fields[4], fields[3])
  205. elif fields[1] == b"BADARMOR":
  206. raise GpgException("Bad armor.")
  207. elif fields[1] == b"NODATA":
  208. raise GpgException("No data.")
  209. elif fields[1] == b"DECRYPTION_FAILED":
  210. raise GpgException("Decryption failed.")
  211. elif fields[1] == b"ERROR":
  212. f2 = fields[2].decode('ascii', errors='replace')
  213. f3 = fields[3].decode('ascii', errors='replace')
  214. raise GpgException("Other error: %s %s" % (f2, f3))
  215. elif fields[1] == b"SIG_ID":
  216. self.signature_ids.append(fields[2])
  217. elif fields[1] in (b'PLAINTEXT', b'GOODSIG', b'KEY_CONSIDERED',
  218. b'NEWSIG', b'NOTATION_NAME', b'NOTATION_FLAGS',
  219. b'NOTATION_DATA', b'SIGEXPIRED', b'KEYEXPIRED',
  220. b'POLICY_URL', b'PROGRESS', b'VERIFICATION_COMPLIANCE_MODE'):
  221. pass
  222. elif fields[1] in (b'EXPSIG', b'EXPKEYSIG'):
  223. self.expired = True
  224. self.invalid = True
  225. elif fields[1] in (b'REVKEYSIG', b'BADSIG', b'ERRSIG', b'KEYREVOKED', b'NO_PUBKEY'):
  226. self.invalid = True
  227. else:
  228. field = fields[1].decode('ascii', errors='replace')
  229. raise GpgException("Keyword '{0}' from GnuPG was not expected.".format(field))
  230. def _exec_gpg(self, stdin, stdout, stderr, statusfd):
  231. try:
  232. if stdin != 0:
  233. os.dup2(stdin, 0)
  234. if stdout != 1:
  235. os.dup2(stdout, 1)
  236. if stderr != 2:
  237. os.dup2(stderr, 2)
  238. if statusfd != 3:
  239. os.dup2(statusfd, 3)
  240. for fd in range(4):
  241. old = fcntl.fcntl(fd, fcntl.F_GETFD)
  242. fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC)
  243. os.closerange(4, _MAXFD)
  244. args = [self.gpg,
  245. "--status-fd=3",
  246. "--no-default-keyring",
  247. "--batch",
  248. "--no-tty",
  249. "--trust-model", "always",
  250. "--fixed-list-mode"]
  251. for k in self.keyrings:
  252. args.extend(["--keyring", k])
  253. args.extend(["--decrypt", "-"])
  254. os.execvp(self.gpg, args)
  255. finally:
  256. try:
  257. print("Failed to execute gpg.", file=sys.stderr)
  258. sys.stderr.flush()
  259. except:
  260. # Ignore errors, we want to reach the `exit` call below.
  261. pass
  262. os._exit(3)
  263. @property
  264. def contents_sha1(self) -> str:
  265. return apt_pkg.sha1sum(self.contents) # type: ignore[attr-defined]
  266. def sign(infile, outfile=None, keyids=[], inline=False, pubring=None, secring=None, homedir=None, passphrase_file=None, *, digest_algorithm="SHA256"):
  267. args = [
  268. '/usr/bin/gpg',
  269. '--no-options', '--no-tty', '--batch', '--armour',
  270. '--personal-digest-preferences', digest_algorithm,
  271. ]
  272. for keyid in keyids:
  273. args.extend(['--local-user', keyid])
  274. if pubring is not None:
  275. args.extend(['--keyring', pubring])
  276. if secring is not None:
  277. args.extend(['--secret-keyring', secring])
  278. if homedir is not None:
  279. args.extend(['--homedir', homedir])
  280. if passphrase_file is not None:
  281. args.extend(['--pinentry-mode', 'loopback',
  282. '--passphrase-file', passphrase_file])
  283. args.append('--clearsign' if inline else '--detach-sign')
  284. kwargs = {}
  285. if isinstance(infile, str):
  286. infile = infile.encode('utf-8')
  287. if isinstance(infile, bytes):
  288. kwargs['input'] = infile
  289. else:
  290. kwargs['stdin'] = infile
  291. if outfile is None:
  292. kwargs['stdout'] = subprocess.PIPE
  293. else:
  294. kwargs['stdout'] = outfile
  295. result = subprocess.run(args, check=True, **kwargs)
  296. if outfile is None:
  297. return result.stdout
  298. # vim: set sw=4 et: