pdiff.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511
  1. import asyncio
  2. import collections
  3. import os
  4. import subprocess
  5. import sys
  6. import tempfile
  7. import apt_pkg
  8. from daklib.dakapt import DakHashes
  9. HASH_FIELDS = [
  10. ('SHA1-History', 0, 1, "", True),
  11. ('SHA256-History', 0, 2, "", True),
  12. ('SHA1-Patches', 1, 1, "", True),
  13. ('SHA256-Patches', 1, 2, "", True),
  14. ('SHA1-Download', 2, 1, ".gz", True),
  15. ('SHA256-Download', 2, 2, ".gz", True),
  16. ('X-Unmerged-SHA1-History', 0, 1, "", False),
  17. ('X-Unmerged-SHA256-History', 0, 2, "", False),
  18. ('X-Unmerged-SHA1-Patches', 1, 1, "", False),
  19. ('X-Unmerged-SHA256-Patches', 1, 2, "", False),
  20. ('X-Unmerged-SHA1-Download', 2, 1, ".gz", False),
  21. ('X-Unmerged-SHA256-Download', 2, 2, ".gz", False),
  22. ]
  23. HASH_FIELDS_TABLE = {x[0]: (x[1], x[2], x[4]) for x in HASH_FIELDS}
  24. _PDiffHashes = collections.namedtuple('_PDiffHashes', ['size', 'sha1', 'sha256'])
  25. async def asyncio_check_call(*args, **kwargs):
  26. """async variant of subprocess.check_call
  27. Parameters reflect that of asyncio.create_subprocess_exec or
  28. (if "shell=True") that of asyncio.create_subprocess_shell
  29. with restore_signals=True being the default.
  30. """
  31. kwargs.setdefault('restore_signals', True)
  32. shell = kwargs.pop('shell', False)
  33. if shell:
  34. proc = await asyncio.create_subprocess_shell(*args, **kwargs)
  35. else:
  36. proc = await asyncio.create_subprocess_exec(*args, **kwargs)
  37. retcode = await proc.wait()
  38. if retcode != 0:
  39. raise subprocess.CalledProcessError(retcode, args[0])
  40. return 0
  41. async def open_decompressed(file, named_temp_file=False):
  42. async def call_decompressor(cmd, inpath):
  43. fh = tempfile.NamedTemporaryFile("w+") if named_temp_file \
  44. else tempfile.TemporaryFile("w+")
  45. with open(inpath, "rb") as rfd:
  46. await asyncio_check_call(
  47. *cmd,
  48. stdin=rfd,
  49. stdout=fh,
  50. )
  51. fh.seek(0)
  52. return fh
  53. if os.path.isfile(file):
  54. return open(file, "r")
  55. elif os.path.isfile("%s.gz" % file):
  56. return await call_decompressor(['zcat'], '{}.gz'.format(file))
  57. elif os.path.isfile("%s.bz2" % file):
  58. return await call_decompressor(['bzcat'], '{}.bz2'.format(file))
  59. elif os.path.isfile("%s.xz" % file):
  60. return await call_decompressor(['xzcat'], '{}.xz'.format(file))
  61. else:
  62. return None
  63. async def _merge_pdiffs(patch_a, patch_b, resulting_patch_without_extension):
  64. """Merge two pdiff in to a merged pdiff
  65. While rred support merging more than 2, we only need support for merging two.
  66. In the steady state, we will have N merged patches plus 1 new patch. Here
  67. we need to do N pairwise merges (i.e. merge two patches N times).
  68. Therefore, supporting merging of 3+ patches does not help at all.
  69. The setup state looks like it could do with a bulk merging. However, if you
  70. merge from "latest to earliest" then you will be building in optimal order
  71. and still only need to do N-1 pairwise merges (rather than N-1 merges
  72. between N, N-1, N-2, ... 3, 2 patches).
  73. Combined, supporting pairwise merges is sufficient for our use case.
  74. """
  75. with await open_decompressed(patch_a, named_temp_file=True) as fd_a, \
  76. await open_decompressed(patch_b, named_temp_file=True) as fd_b:
  77. await asyncio_check_call(
  78. '/usr/lib/apt/methods/rred %s %s | gzip -9n > %s' % (fd_a.name, fd_b.name,
  79. resulting_patch_without_extension + ".gz"),
  80. shell=True,
  81. )
  82. class PDiffHashes(_PDiffHashes):
  83. @classmethod
  84. def from_file(cls, fd):
  85. size = os.fstat(fd.fileno())[6]
  86. hashes = DakHashes(fd)
  87. return cls(size, hashes.sha1, hashes.sha256)
  88. async def _pdiff_hashes_from_patch(path_without_extension):
  89. with await open_decompressed(path_without_extension) as difff:
  90. hashes_decompressed = PDiffHashes.from_file(difff)
  91. with open(path_without_extension + ".gz", "r") as difffgz:
  92. hashes_compressed = PDiffHashes.from_file(difffgz)
  93. return hashes_decompressed, hashes_compressed
  94. def _prune_history(order, history, maximum):
  95. cnt = len(order)
  96. if cnt <= maximum:
  97. return order
  98. for h in order[:cnt - maximum]:
  99. del history[h]
  100. return order[cnt - maximum:]
  101. def _read_hashes(history, history_order, ind, hashind, lines):
  102. current_order = []
  103. for line in lines:
  104. parts = line.split()
  105. fname = parts[2]
  106. if fname.endswith('.gz'):
  107. fname = fname[:-3]
  108. current_order.append(fname)
  109. if fname not in history:
  110. history[fname] = [None, None, None]
  111. if not history[fname][ind]:
  112. history[fname][ind] = PDiffHashes(int(parts[1]), None, None)
  113. if hashind == 1:
  114. history[fname][ind] = PDiffHashes(history[fname][ind].size,
  115. parts[0],
  116. history[fname][ind].sha256,
  117. )
  118. else:
  119. history[fname][ind] = PDiffHashes(history[fname][ind].size,
  120. history[fname][ind].sha1,
  121. parts[0],
  122. )
  123. # Common-case: Either this is the first sequence we read and we
  124. # simply adopt that
  125. if not history_order:
  126. return current_order
  127. # Common-case: The current history perfectly matches the existing, so
  128. # we just stop here.
  129. if current_order == history_order:
  130. return history_order
  131. # Special-case, the histories are not aligned. This "should not happen"
  132. # but has done so in the past due to bugs. Depending on which field is
  133. # out of sync, dak would either self heal or be stuff forever. We
  134. # realign the history to ensure we always end with "self-heal".
  135. #
  136. # Typically, the patches are aligned from the end as we always add a
  137. # patch in the end of the series.
  138. patches_from_the_end = 0
  139. for p1, p2 in zip(reversed(current_order), reversed(history_order)):
  140. if p1 == p2:
  141. patches_from_the_end += 1
  142. else:
  143. break
  144. if not patches_from_the_end:
  145. return None
  146. return current_order[-patches_from_the_end:]
  147. class PDiffIndex:
  148. def __init__(self, patches_dir, max=56, merge_pdiffs=False):
  149. self.can_path = None
  150. self._history = {}
  151. self._history_order = []
  152. self._unmerged_history = {}
  153. self._unmerged_history_order = []
  154. self._old_merged_patches_prefix = []
  155. self.max = max
  156. self.patches_dir = patches_dir
  157. self.filesizehashes = None
  158. self.wants_merged_pdiffs = merge_pdiffs
  159. self.has_merged_pdiffs = False
  160. self.index_path = os.path.join(patches_dir, 'Index')
  161. self.read_index_file(self.index_path)
  162. async def generate_and_add_patch_file(self, original_file, new_file_uncompressed, patch_name):
  163. with await open_decompressed(original_file) as oldf:
  164. oldsizehashes = PDiffHashes.from_file(oldf)
  165. with open(new_file_uncompressed, "r") as newf:
  166. newsizehashes = PDiffHashes.from_file(newf)
  167. if newsizehashes == oldsizehashes:
  168. return
  169. if not os.path.isdir(self.patches_dir):
  170. os.mkdir(self.patches_dir)
  171. oldf.seek(0)
  172. patch_path = os.path.join(self.patches_dir, patch_name)
  173. with open("{}.gz".format(patch_path), "wb") as fh:
  174. await asyncio_check_call(
  175. "diff --ed - {} | gzip --rsyncable --no-name -c -9".format(new_file_uncompressed),
  176. shell=True,
  177. stdin=oldf,
  178. stdout=fh
  179. )
  180. difsizehashes, difgzsizehashes = await _pdiff_hashes_from_patch(patch_path)
  181. self.filesizehashes = newsizehashes
  182. self._unmerged_history[patch_name] = [oldsizehashes,
  183. difsizehashes,
  184. difgzsizehashes,
  185. ]
  186. self._unmerged_history_order.append(patch_name)
  187. if self.has_merged_pdiffs != self.wants_merged_pdiffs:
  188. # Convert patches
  189. if self.wants_merged_pdiffs:
  190. await self._convert_to_merged_patches()
  191. else:
  192. self._convert_to_unmerged()
  193. # Conversion also covers the newly added patch. Accordingly,
  194. # the elif here.
  195. else:
  196. second_patch_name = patch_name
  197. if self.wants_merged_pdiffs:
  198. await self._bump_merged_patches()
  199. second_patch_name = "T-%s-F-%s" % (patch_name, patch_name)
  200. os.link(os.path.join(self.patches_dir, patch_name + ".gz"),
  201. os.path.join(self.patches_dir, second_patch_name + ".gz"))
  202. # Without merged PDiffs, keep _history and _unmerged_history aligned
  203. self._history[second_patch_name] = [oldsizehashes,
  204. difsizehashes,
  205. difgzsizehashes,
  206. ]
  207. self._history_order.append(second_patch_name)
  208. async def _bump_merged_patches(self):
  209. # When bumping patches, we need to "rewrite" all merged patches. As
  210. # neither apt nor dak supports by-hash for pdiffs, we leave the old
  211. # versions of merged pdiffs behind.
  212. target_name = self._unmerged_history_order[-1]
  213. target_path = os.path.join(self.patches_dir, target_name)
  214. new_merged_order = []
  215. new_merged_history = {}
  216. for old_merged_patch_name in self._history_order:
  217. try:
  218. old_orig_name = old_merged_patch_name.split("-F-", 1)[1]
  219. except IndexError:
  220. old_orig_name = old_merged_patch_name
  221. new_merged_patch_name = "T-%s-F-%s" % (target_name, old_orig_name)
  222. old_merged_patch_path = os.path.join(self.patches_dir, old_merged_patch_name)
  223. new_merged_patch_path = os.path.join(self.patches_dir, new_merged_patch_name)
  224. await _merge_pdiffs(old_merged_patch_path, target_path, new_merged_patch_path)
  225. hashes_decompressed, hashes_compressed = await _pdiff_hashes_from_patch(new_merged_patch_path)
  226. new_merged_history[new_merged_patch_name] = [self._history[old_merged_patch_name][0],
  227. hashes_decompressed,
  228. hashes_compressed,
  229. ]
  230. new_merged_order.append(new_merged_patch_name)
  231. self._history_order = new_merged_order
  232. self._history = new_merged_history
  233. self._old_merged_patches_prefix.append(self._unmerged_history_order[-1])
  234. def _convert_to_unmerged(self):
  235. if not self.has_merged_pdiffs:
  236. return
  237. # Converting from merged patches to unmerged patches is simply. Discard the merged
  238. # patches. Cleanup will be handled by find_obsolete_patches
  239. self._history = {k: v for k, v in self._unmerged_history.items()}
  240. self._history_order = list(self._unmerged_history_order)
  241. self._old_merged_patches_prefix = []
  242. self.has_merged_pdiffs = False
  243. async def _convert_to_merged_patches(self):
  244. if self.has_merged_pdiffs:
  245. return
  246. target_name = self._unmerged_history_order[-1]
  247. self._history = {}
  248. self._history_order = []
  249. new_patches = []
  250. # We merge from newest to oldest
  251. #
  252. # Assume we got N unmerged patches (u1 - uN) where given s1 then
  253. # you can apply u1 to get to s2. From s2 you use u2 to move to s3
  254. # and so on until you reach your target T (= sN+1).
  255. #
  256. # In the merged patch world, we want N merged patches called m1-N,
  257. # m2-N, m3-N ... m(N-1)-N. Here, the you use sX + mX-N to go to
  258. # T directly regardless of where you start.
  259. #
  260. # A note worthy special case is that m(N-1)-N is identical uN
  261. # content-wise. This will be important in a moment. For now,
  262. # lets start with looking at creating merged patches.
  263. #
  264. # We can get m1-N by merging u1 with m2-N because u1 will take s1
  265. # to s2 and m2-N will take s2 to T. By the same argument, we get
  266. # generate m2-N by combing u2 with m3-N. Rinse-and-repeat until
  267. # we get to the base-case m(N-1)-N - which is uN.
  268. #
  269. # From this, we can conclude that generating the patches in
  270. # reverse order (i.e. m2-N is generated before m1-N) will get
  271. # us the desired result in N-1 pair-wise merges without having
  272. # to use all patches in one go. (This is also optimal in the
  273. # sense that we need to update N-1 patches to preserve the
  274. # entire history).
  275. #
  276. for patch_name in reversed(self._unmerged_history_order):
  277. merged_patch = "T-%s-F-%s" % (target_name, patch_name)
  278. merged_patch_path = os.path.join(self.patches_dir, merged_patch)
  279. if new_patches:
  280. oldest_patch = os.path.join(self.patches_dir, patch_name)
  281. previous_merged_patch = os.path.join(self.patches_dir, new_patches[-1])
  282. await _merge_pdiffs(oldest_patch, previous_merged_patch, merged_patch_path)
  283. hashes_decompressed, hashes_compressed = await _pdiff_hashes_from_patch(merged_patch_path)
  284. self._history[merged_patch] = [self._unmerged_history[patch_name][0],
  285. hashes_decompressed,
  286. hashes_compressed,
  287. ]
  288. else:
  289. # Special_case; the latest patch is its own "merged" variant.
  290. os.link(os.path.join(self.patches_dir, patch_name + ".gz"), merged_patch_path + ".gz")
  291. self._history[merged_patch] = self._unmerged_history[patch_name]
  292. new_patches.append(merged_patch)
  293. self._history_order = list(reversed(new_patches))
  294. self._old_merged_patches_prefix.append(target_name)
  295. self.has_merged_pdiffs = True
  296. def read_index_file(self, index_file_path):
  297. try:
  298. with apt_pkg.TagFile(index_file_path) as index:
  299. index.step()
  300. section = index.section
  301. self.has_merged_pdiffs = section.get('X-Patch-Precedence') == 'merged'
  302. self._old_merged_patches_prefix = section.get('X-DAK-Older-Patches', '').split()
  303. for field in section.keys():
  304. value = section[field]
  305. if field in HASH_FIELDS_TABLE:
  306. ind, hashind, primary_history = HASH_FIELDS_TABLE[field]
  307. if primary_history:
  308. history = self._history
  309. history_order = self._history_order
  310. else:
  311. history = self._unmerged_history
  312. history_order = self._unmerged_history_order
  313. if history_order is None:
  314. # History is already misaligned and we cannot find a common restore point.
  315. continue
  316. new_order = _read_hashes(history, history_order, ind, hashind, value.splitlines())
  317. if primary_history:
  318. self._history_order = new_order
  319. else:
  320. self._unmerged_history_order = new_order
  321. continue
  322. if field in ("Canonical-Name", "Canonical-Path"):
  323. self.can_path = value
  324. continue
  325. if field not in ("SHA1-Current", "SHA256-Current"):
  326. continue
  327. l = value.split()
  328. if len(l) != 2:
  329. continue
  330. if not self.filesizehashes:
  331. self.filesizehashes = PDiffHashes(int(l[1]), None, None)
  332. if field == "SHA1-Current":
  333. self.filesizehashes = PDiffHashes(self.filesizehashes.size, l[0], self.filesizehashes.sha256)
  334. if field == "SHA256-Current":
  335. self.filesizehashes = PDiffHashes(self.filesizehashes.size, self.filesizehashes.sha1, l[0])
  336. # Ensure that the order lists are defined again.
  337. if self._history_order is None:
  338. self._history_order = []
  339. if self._unmerged_history_order is None:
  340. self._unmerged_history_order = []
  341. if not self.has_merged_pdiffs:
  342. # When X-Patch-Precedence != merged, then the two histories are the same.
  343. self._unmerged_history = {k: v for k, v in self._history.items()}
  344. self._unmerged_history_order = list(self._history_order)
  345. self._old_merged_patches_prefix = []
  346. except (OSError, apt_pkg.Error):
  347. # On error, we ignore everything. This causes the file to be regenerated from scratch.
  348. # It forces everyone to download the full file for if they are behind.
  349. # But it is self-healing providing that we generate valid files from here on.
  350. pass
  351. def prune_patch_history(self):
  352. # Truncate our history if necessary
  353. hs = self._history
  354. order = self._history_order
  355. unmerged_hs = self._unmerged_history
  356. unmerged_order = self._unmerged_history_order
  357. self._history_order = _prune_history(order, hs, self.max)
  358. self._unmerged_history_order = _prune_history(unmerged_order, unmerged_hs, self.max)
  359. prefix_cnt = len(self._old_merged_patches_prefix)
  360. if prefix_cnt > 3:
  361. self._old_merged_patches_prefix = self._old_merged_patches_prefix[prefix_cnt - 3:]
  362. def find_obsolete_patches(self):
  363. if not os.path.isdir(self.patches_dir):
  364. return
  365. hs = self._history
  366. unmerged_hs = self._unmerged_history
  367. keep_prefixes = tuple("T-%s-F-" % x for x in self._old_merged_patches_prefix)
  368. # Scan for obsolete patches. While we could have computed these
  369. # from the history, this method has the advantage of cleaning up
  370. # old patches left that we failed to remove previously (e.g. if
  371. # we had an index corruption, which happened in fed7ada36b609 and
  372. # was later fixed in a36f867acf029)
  373. for name in os.listdir(self.patches_dir):
  374. if name in ('Index', 'by-hash'):
  375. continue
  376. # We keep some old merged patches around (as neither apt nor
  377. # dak supports by-hash for pdiffs)
  378. if keep_prefixes and name.startswith(keep_prefixes):
  379. continue
  380. basename, ext = os.path.splitext(name)
  381. if ext in ('', '.gz') and (basename in hs or basename in unmerged_hs):
  382. continue
  383. path = os.path.join(self.patches_dir, name)
  384. if not os.path.isfile(path):
  385. # Non-files are probably not patches.
  386. continue
  387. # Unknown patch file; flag it as obsolete
  388. yield path
  389. def dump(self, out=sys.stdout):
  390. if self.can_path:
  391. out.write("Canonical-Path: %s\n" % self.can_path)
  392. if self.filesizehashes:
  393. if self.filesizehashes.sha1:
  394. out.write("SHA1-Current: %s %7d\n" % (self.filesizehashes.sha1, self.filesizehashes.size))
  395. if self.filesizehashes.sha256:
  396. out.write("SHA256-Current: %s %7d\n" % (self.filesizehashes.sha256, self.filesizehashes.size))
  397. for fieldname, ind, hashind, ext, primary_history in HASH_FIELDS:
  398. if primary_history:
  399. hs = self._history
  400. order = self._history_order
  401. elif self.has_merged_pdiffs:
  402. hs = self._unmerged_history
  403. order = self._unmerged_history_order
  404. else:
  405. continue
  406. out.write("%s:\n" % fieldname)
  407. for h in order:
  408. if hs[h][ind] and hs[h][ind][hashind]:
  409. out.write(" %s %7d %s%s\n" % (hs[h][ind][hashind], hs[h][ind].size, h, ext))
  410. if self.has_merged_pdiffs:
  411. out.write("X-Patch-Precedence: merged\n")
  412. if self._old_merged_patches_prefix:
  413. out.write("X-DAK-Older-Patches: %s\n" % " ".join(self._old_merged_patches_prefix))
  414. def update_index(self, tmp_suffix=".new"):
  415. if not os.path.isdir(self.patches_dir):
  416. # If there is no patch directory, then we have no patches.
  417. # It seems weird to have an Index of patches when we know there are
  418. # none.
  419. return
  420. tmp_path = self.index_path + tmp_suffix
  421. with open(tmp_path, "w") as f:
  422. self.dump(f)
  423. os.rename(tmp_path, self.index_path)