pdiff.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513
  1. import asyncio
  2. import collections
  3. import os
  4. import subprocess
  5. import sys
  6. import tempfile
  7. import apt_pkg
  8. from daklib.dakapt import DakHashes
  9. HASH_FIELDS = [
  10. ('SHA1-History', 0, 1, "", True),
  11. ('SHA256-History', 0, 2, "", True),
  12. ('SHA1-Patches', 1, 1, "", True),
  13. ('SHA256-Patches', 1, 2, "", True),
  14. ('SHA1-Download', 2, 1, ".gz", True),
  15. ('SHA256-Download', 2, 2, ".gz", True),
  16. ('X-Unmerged-SHA1-History', 0, 1, "", False),
  17. ('X-Unmerged-SHA256-History', 0, 2, "", False),
  18. ('X-Unmerged-SHA1-Patches', 1, 1, "", False),
  19. ('X-Unmerged-SHA256-Patches', 1, 2, "", False),
  20. ('X-Unmerged-SHA1-Download', 2, 1, ".gz", False),
  21. ('X-Unmerged-SHA256-Download', 2, 2, ".gz", False),
  22. ]
  23. HASH_FIELDS_TABLE = {x[0]: (x[1], x[2], x[4]) for x in HASH_FIELDS}
  24. _PDiffHashes = collections.namedtuple('_PDiffHashes', ['size', 'sha1', 'sha256'])
  25. async def asyncio_check_call(*args, **kwargs):
  26. """async variant of subprocess.check_call
  27. Parameters reflect that of asyncio.create_subprocess_exec or
  28. (if "shell=True") that of asyncio.create_subprocess_shell
  29. with restore_signals=True being the default.
  30. """
  31. kwargs.setdefault('restore_signals', True)
  32. shell = kwargs.pop('shell', False)
  33. if shell:
  34. proc = await asyncio.create_subprocess_shell(*args, **kwargs)
  35. else:
  36. proc = await asyncio.create_subprocess_exec(*args, **kwargs)
  37. retcode = await proc.wait()
  38. if retcode != 0:
  39. raise subprocess.CalledProcessError(retcode, args[0])
  40. return 0
  41. async def open_decompressed(file, named_temp_file=False):
  42. async def call_decompressor(cmd, inpath):
  43. fh = tempfile.NamedTemporaryFile("w+") if named_temp_file \
  44. else tempfile.TemporaryFile("w+")
  45. with open(inpath, "rb") as rfd:
  46. await asyncio_check_call(
  47. *cmd,
  48. stdin=rfd,
  49. stdout=fh,
  50. )
  51. fh.seek(0)
  52. return fh
  53. if os.path.isfile(file):
  54. return open(file, "r")
  55. elif os.path.isfile("%s.gz" % file):
  56. return await call_decompressor(['zcat'], '{}.gz'.format(file))
  57. elif os.path.isfile("%s.bz2" % file):
  58. return await call_decompressor(['bzcat'], '{}.bz2'.format(file))
  59. elif os.path.isfile("%s.xz" % file):
  60. return await call_decompressor(['xzcat'], '{}.xz'.format(file))
  61. elif os.path.isfile(f"{file}.zst"):
  62. return await call_decompressor(['zstdcat'], f'{file}.zst')
  63. else:
  64. return None
  65. async def _merge_pdiffs(patch_a, patch_b, resulting_patch_without_extension):
  66. """Merge two pdiff in to a merged pdiff
  67. While rred support merging more than 2, we only need support for merging two.
  68. In the steady state, we will have N merged patches plus 1 new patch. Here
  69. we need to do N pairwise merges (i.e. merge two patches N times).
  70. Therefore, supporting merging of 3+ patches does not help at all.
  71. The setup state looks like it could do with a bulk merging. However, if you
  72. merge from "latest to earliest" then you will be building in optimal order
  73. and still only need to do N-1 pairwise merges (rather than N-1 merges
  74. between N, N-1, N-2, ... 3, 2 patches).
  75. Combined, supporting pairwise merges is sufficient for our use case.
  76. """
  77. with await open_decompressed(patch_a, named_temp_file=True) as fd_a, \
  78. await open_decompressed(patch_b, named_temp_file=True) as fd_b:
  79. await asyncio_check_call(
  80. '/usr/lib/apt/methods/rred %s %s | gzip -9n > %s' % (fd_a.name, fd_b.name,
  81. resulting_patch_without_extension + ".gz"),
  82. shell=True,
  83. )
  84. class PDiffHashes(_PDiffHashes):
  85. @classmethod
  86. def from_file(cls, fd):
  87. size = os.fstat(fd.fileno())[6]
  88. hashes = DakHashes(fd)
  89. return cls(size, hashes.sha1, hashes.sha256)
  90. async def _pdiff_hashes_from_patch(path_without_extension):
  91. with await open_decompressed(path_without_extension) as difff:
  92. hashes_decompressed = PDiffHashes.from_file(difff)
  93. with open(path_without_extension + ".gz", "r") as difffgz:
  94. hashes_compressed = PDiffHashes.from_file(difffgz)
  95. return hashes_decompressed, hashes_compressed
  96. def _prune_history(order, history, maximum):
  97. cnt = len(order)
  98. if cnt <= maximum:
  99. return order
  100. for h in order[:cnt - maximum]:
  101. del history[h]
  102. return order[cnt - maximum:]
  103. def _read_hashes(history, history_order, ind, hashind, lines):
  104. current_order = []
  105. for line in lines:
  106. parts = line.split()
  107. fname = parts[2]
  108. if fname.endswith('.gz'):
  109. fname = fname[:-3]
  110. current_order.append(fname)
  111. if fname not in history:
  112. history[fname] = [None, None, None]
  113. if not history[fname][ind]:
  114. history[fname][ind] = PDiffHashes(int(parts[1]), None, None)
  115. if hashind == 1:
  116. history[fname][ind] = PDiffHashes(history[fname][ind].size,
  117. parts[0],
  118. history[fname][ind].sha256,
  119. )
  120. else:
  121. history[fname][ind] = PDiffHashes(history[fname][ind].size,
  122. history[fname][ind].sha1,
  123. parts[0],
  124. )
  125. # Common-case: Either this is the first sequence we read and we
  126. # simply adopt that
  127. if not history_order:
  128. return current_order
  129. # Common-case: The current history perfectly matches the existing, so
  130. # we just stop here.
  131. if current_order == history_order:
  132. return history_order
  133. # Special-case, the histories are not aligned. This "should not happen"
  134. # but has done so in the past due to bugs. Depending on which field is
  135. # out of sync, dak would either self heal or be stuff forever. We
  136. # realign the history to ensure we always end with "self-heal".
  137. #
  138. # Typically, the patches are aligned from the end as we always add a
  139. # patch in the end of the series.
  140. patches_from_the_end = 0
  141. for p1, p2 in zip(reversed(current_order), reversed(history_order)):
  142. if p1 == p2:
  143. patches_from_the_end += 1
  144. else:
  145. break
  146. if not patches_from_the_end:
  147. return None
  148. return current_order[-patches_from_the_end:]
  149. class PDiffIndex:
  150. def __init__(self, patches_dir, max=56, merge_pdiffs=False):
  151. self.can_path = None
  152. self._history = {}
  153. self._history_order = []
  154. self._unmerged_history = {}
  155. self._unmerged_history_order = []
  156. self._old_merged_patches_prefix = []
  157. self.max = max
  158. self.patches_dir = patches_dir
  159. self.filesizehashes = None
  160. self.wants_merged_pdiffs = merge_pdiffs
  161. self.has_merged_pdiffs = False
  162. self.index_path = os.path.join(patches_dir, 'Index')
  163. self.read_index_file(self.index_path)
  164. async def generate_and_add_patch_file(self, original_file, new_file_uncompressed, patch_name):
  165. with await open_decompressed(original_file) as oldf:
  166. oldsizehashes = PDiffHashes.from_file(oldf)
  167. with open(new_file_uncompressed, "r") as newf:
  168. newsizehashes = PDiffHashes.from_file(newf)
  169. if newsizehashes == oldsizehashes:
  170. return
  171. if not os.path.isdir(self.patches_dir):
  172. os.mkdir(self.patches_dir)
  173. oldf.seek(0)
  174. patch_path = os.path.join(self.patches_dir, patch_name)
  175. with open("{}.gz".format(patch_path), "wb") as fh:
  176. await asyncio_check_call(
  177. "diff --ed - {} | gzip --rsyncable --no-name -c -9".format(new_file_uncompressed),
  178. shell=True,
  179. stdin=oldf,
  180. stdout=fh
  181. )
  182. difsizehashes, difgzsizehashes = await _pdiff_hashes_from_patch(patch_path)
  183. self.filesizehashes = newsizehashes
  184. self._unmerged_history[patch_name] = [oldsizehashes,
  185. difsizehashes,
  186. difgzsizehashes,
  187. ]
  188. self._unmerged_history_order.append(patch_name)
  189. if self.has_merged_pdiffs != self.wants_merged_pdiffs:
  190. # Convert patches
  191. if self.wants_merged_pdiffs:
  192. await self._convert_to_merged_patches()
  193. else:
  194. self._convert_to_unmerged()
  195. # Conversion also covers the newly added patch. Accordingly,
  196. # the elif here.
  197. else:
  198. second_patch_name = patch_name
  199. if self.wants_merged_pdiffs:
  200. await self._bump_merged_patches()
  201. second_patch_name = "T-%s-F-%s" % (patch_name, patch_name)
  202. os.link(os.path.join(self.patches_dir, patch_name + ".gz"),
  203. os.path.join(self.patches_dir, second_patch_name + ".gz"))
  204. # Without merged PDiffs, keep _history and _unmerged_history aligned
  205. self._history[second_patch_name] = [oldsizehashes,
  206. difsizehashes,
  207. difgzsizehashes,
  208. ]
  209. self._history_order.append(second_patch_name)
  210. async def _bump_merged_patches(self):
  211. # When bumping patches, we need to "rewrite" all merged patches. As
  212. # neither apt nor dak supports by-hash for pdiffs, we leave the old
  213. # versions of merged pdiffs behind.
  214. target_name = self._unmerged_history_order[-1]
  215. target_path = os.path.join(self.patches_dir, target_name)
  216. new_merged_order = []
  217. new_merged_history = {}
  218. for old_merged_patch_name in self._history_order:
  219. try:
  220. old_orig_name = old_merged_patch_name.split("-F-", 1)[1]
  221. except IndexError:
  222. old_orig_name = old_merged_patch_name
  223. new_merged_patch_name = "T-%s-F-%s" % (target_name, old_orig_name)
  224. old_merged_patch_path = os.path.join(self.patches_dir, old_merged_patch_name)
  225. new_merged_patch_path = os.path.join(self.patches_dir, new_merged_patch_name)
  226. await _merge_pdiffs(old_merged_patch_path, target_path, new_merged_patch_path)
  227. hashes_decompressed, hashes_compressed = await _pdiff_hashes_from_patch(new_merged_patch_path)
  228. new_merged_history[new_merged_patch_name] = [self._history[old_merged_patch_name][0],
  229. hashes_decompressed,
  230. hashes_compressed,
  231. ]
  232. new_merged_order.append(new_merged_patch_name)
  233. self._history_order = new_merged_order
  234. self._history = new_merged_history
  235. self._old_merged_patches_prefix.append(self._unmerged_history_order[-1])
  236. def _convert_to_unmerged(self):
  237. if not self.has_merged_pdiffs:
  238. return
  239. # Converting from merged patches to unmerged patches is simply. Discard the merged
  240. # patches. Cleanup will be handled by find_obsolete_patches
  241. self._history = {k: v for k, v in self._unmerged_history.items()}
  242. self._history_order = list(self._unmerged_history_order)
  243. self._old_merged_patches_prefix = []
  244. self.has_merged_pdiffs = False
  245. async def _convert_to_merged_patches(self):
  246. if self.has_merged_pdiffs:
  247. return
  248. target_name = self._unmerged_history_order[-1]
  249. self._history = {}
  250. self._history_order = []
  251. new_patches = []
  252. # We merge from newest to oldest
  253. #
  254. # Assume we got N unmerged patches (u1 - uN) where given s1 then
  255. # you can apply u1 to get to s2. From s2 you use u2 to move to s3
  256. # and so on until you reach your target T (= sN+1).
  257. #
  258. # In the merged patch world, we want N merged patches called m1-N,
  259. # m2-N, m3-N ... m(N-1)-N. Here, the you use sX + mX-N to go to
  260. # T directly regardless of where you start.
  261. #
  262. # A note worthy special case is that m(N-1)-N is identical uN
  263. # content-wise. This will be important in a moment. For now,
  264. # lets start with looking at creating merged patches.
  265. #
  266. # We can get m1-N by merging u1 with m2-N because u1 will take s1
  267. # to s2 and m2-N will take s2 to T. By the same argument, we get
  268. # generate m2-N by combing u2 with m3-N. Rinse-and-repeat until
  269. # we get to the base-case m(N-1)-N - which is uN.
  270. #
  271. # From this, we can conclude that generating the patches in
  272. # reverse order (i.e. m2-N is generated before m1-N) will get
  273. # us the desired result in N-1 pair-wise merges without having
  274. # to use all patches in one go. (This is also optimal in the
  275. # sense that we need to update N-1 patches to preserve the
  276. # entire history).
  277. #
  278. for patch_name in reversed(self._unmerged_history_order):
  279. merged_patch = "T-%s-F-%s" % (target_name, patch_name)
  280. merged_patch_path = os.path.join(self.patches_dir, merged_patch)
  281. if new_patches:
  282. oldest_patch = os.path.join(self.patches_dir, patch_name)
  283. previous_merged_patch = os.path.join(self.patches_dir, new_patches[-1])
  284. await _merge_pdiffs(oldest_patch, previous_merged_patch, merged_patch_path)
  285. hashes_decompressed, hashes_compressed = await _pdiff_hashes_from_patch(merged_patch_path)
  286. self._history[merged_patch] = [self._unmerged_history[patch_name][0],
  287. hashes_decompressed,
  288. hashes_compressed,
  289. ]
  290. else:
  291. # Special_case; the latest patch is its own "merged" variant.
  292. os.link(os.path.join(self.patches_dir, patch_name + ".gz"), merged_patch_path + ".gz")
  293. self._history[merged_patch] = self._unmerged_history[patch_name]
  294. new_patches.append(merged_patch)
  295. self._history_order = list(reversed(new_patches))
  296. self._old_merged_patches_prefix.append(target_name)
  297. self.has_merged_pdiffs = True
  298. def read_index_file(self, index_file_path):
  299. try:
  300. with apt_pkg.TagFile(index_file_path) as index:
  301. index.step()
  302. section = index.section
  303. self.has_merged_pdiffs = section.get('X-Patch-Precedence') == 'merged'
  304. self._old_merged_patches_prefix = section.get('X-DAK-Older-Patches', '').split()
  305. for field in section.keys():
  306. value = section[field]
  307. if field in HASH_FIELDS_TABLE:
  308. ind, hashind, primary_history = HASH_FIELDS_TABLE[field]
  309. if primary_history:
  310. history = self._history
  311. history_order = self._history_order
  312. else:
  313. history = self._unmerged_history
  314. history_order = self._unmerged_history_order
  315. if history_order is None:
  316. # History is already misaligned and we cannot find a common restore point.
  317. continue
  318. new_order = _read_hashes(history, history_order, ind, hashind, value.splitlines())
  319. if primary_history:
  320. self._history_order = new_order
  321. else:
  322. self._unmerged_history_order = new_order
  323. continue
  324. if field in ("Canonical-Name", "Canonical-Path"):
  325. self.can_path = value
  326. continue
  327. if field not in ("SHA1-Current", "SHA256-Current"):
  328. continue
  329. l = value.split()
  330. if len(l) != 2:
  331. continue
  332. if not self.filesizehashes:
  333. self.filesizehashes = PDiffHashes(int(l[1]), None, None)
  334. if field == "SHA1-Current":
  335. self.filesizehashes = PDiffHashes(self.filesizehashes.size, l[0], self.filesizehashes.sha256)
  336. if field == "SHA256-Current":
  337. self.filesizehashes = PDiffHashes(self.filesizehashes.size, self.filesizehashes.sha1, l[0])
  338. # Ensure that the order lists are defined again.
  339. if self._history_order is None:
  340. self._history_order = []
  341. if self._unmerged_history_order is None:
  342. self._unmerged_history_order = []
  343. if not self.has_merged_pdiffs:
  344. # When X-Patch-Precedence != merged, then the two histories are the same.
  345. self._unmerged_history = {k: v for k, v in self._history.items()}
  346. self._unmerged_history_order = list(self._history_order)
  347. self._old_merged_patches_prefix = []
  348. except (OSError, apt_pkg.Error):
  349. # On error, we ignore everything. This causes the file to be regenerated from scratch.
  350. # It forces everyone to download the full file for if they are behind.
  351. # But it is self-healing providing that we generate valid files from here on.
  352. pass
  353. def prune_patch_history(self):
  354. # Truncate our history if necessary
  355. hs = self._history
  356. order = self._history_order
  357. unmerged_hs = self._unmerged_history
  358. unmerged_order = self._unmerged_history_order
  359. self._history_order = _prune_history(order, hs, self.max)
  360. self._unmerged_history_order = _prune_history(unmerged_order, unmerged_hs, self.max)
  361. prefix_cnt = len(self._old_merged_patches_prefix)
  362. if prefix_cnt > 3:
  363. self._old_merged_patches_prefix = self._old_merged_patches_prefix[prefix_cnt - 3:]
  364. def find_obsolete_patches(self):
  365. if not os.path.isdir(self.patches_dir):
  366. return
  367. hs = self._history
  368. unmerged_hs = self._unmerged_history
  369. keep_prefixes = tuple("T-%s-F-" % x for x in self._old_merged_patches_prefix)
  370. # Scan for obsolete patches. While we could have computed these
  371. # from the history, this method has the advantage of cleaning up
  372. # old patches left that we failed to remove previously (e.g. if
  373. # we had an index corruption, which happened in fed7ada36b609 and
  374. # was later fixed in a36f867acf029)
  375. for name in os.listdir(self.patches_dir):
  376. if name in ('Index', 'by-hash'):
  377. continue
  378. # We keep some old merged patches around (as neither apt nor
  379. # dak supports by-hash for pdiffs)
  380. if keep_prefixes and name.startswith(keep_prefixes):
  381. continue
  382. basename, ext = os.path.splitext(name)
  383. if ext in ('', '.gz') and (basename in hs or basename in unmerged_hs):
  384. continue
  385. path = os.path.join(self.patches_dir, name)
  386. if not os.path.isfile(path):
  387. # Non-files are probably not patches.
  388. continue
  389. # Unknown patch file; flag it as obsolete
  390. yield path
  391. def dump(self, out=sys.stdout):
  392. if self.can_path:
  393. out.write("Canonical-Path: %s\n" % self.can_path)
  394. if self.filesizehashes:
  395. if self.filesizehashes.sha1:
  396. out.write("SHA1-Current: %s %7d\n" % (self.filesizehashes.sha1, self.filesizehashes.size))
  397. if self.filesizehashes.sha256:
  398. out.write("SHA256-Current: %s %7d\n" % (self.filesizehashes.sha256, self.filesizehashes.size))
  399. for fieldname, ind, hashind, ext, primary_history in HASH_FIELDS:
  400. if primary_history:
  401. hs = self._history
  402. order = self._history_order
  403. elif self.has_merged_pdiffs:
  404. hs = self._unmerged_history
  405. order = self._unmerged_history_order
  406. else:
  407. continue
  408. out.write("%s:\n" % fieldname)
  409. for h in order:
  410. if hs[h][ind] and hs[h][ind][hashind]:
  411. out.write(" %s %7d %s%s\n" % (hs[h][ind][hashind], hs[h][ind].size, h, ext))
  412. if self.has_merged_pdiffs:
  413. out.write("X-Patch-Precedence: merged\n")
  414. if self._old_merged_patches_prefix:
  415. out.write("X-DAK-Older-Patches: %s\n" % " ".join(self._old_merged_patches_prefix))
  416. def update_index(self, tmp_suffix=".new"):
  417. if not os.path.isdir(self.patches_dir):
  418. # If there is no patch directory, then we have no patches.
  419. # It seems weird to have an Index of patches when we know there are
  420. # none.
  421. return
  422. tmp_path = self.index_path + tmp_suffix
  423. with open(tmp_path, "w") as f:
  424. self.dump(f)
  425. os.rename(tmp_path, self.index_path)