filewriter.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. """
  2. Helper code for file writing with optional compression.
  3. @contact: Debian FTPMaster <ftpmaster@debian.org>
  4. @copyright: 2011 Torsten Werner <twerner@debian.org>
  5. @license: GNU General Public License version 2 or later
  6. """
  7. ################################################################################
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU General Public License as published by
  10. # the Free Software Foundation; either version 2 of the License, or
  11. # (at your option) any later version.
  12. # This program is distributed in the hope that it will be useful,
  13. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. # GNU General Public License for more details.
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program; if not, write to the Free Software
  18. # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  19. ################################################################################
  20. import errno
  21. import os
  22. import os.path
  23. import subprocess
  24. from dataclasses import dataclass
  25. from typing import Optional, TextIO
  26. @dataclass
  27. class CompressionMethod:
  28. keyword: str
  29. extension: str
  30. command: Optional[list[str]]
  31. _compression_methods = (
  32. CompressionMethod('bzip2', '.bz2', ['bzip2', '-9']),
  33. CompressionMethod('gzip', '.gz', ['gzip', '-9cn', '--rsyncable', '--no-name']),
  34. CompressionMethod('xz', '.xz', ['xz', '-c', '-e', '-T0']),
  35. CompressionMethod('zstd', '.zst', ['zstd', '--compress']),
  36. # 'none' must be the last compression method as BaseFileWriter
  37. # handling it will remove the input file for other compressions
  38. CompressionMethod('none', '', None),
  39. )
  40. class BaseFileWriter:
  41. '''
  42. Base class for compressed and uncompressed file writing.
  43. '''
  44. def __init__(self, template, **keywords):
  45. '''
  46. The template argument is a string template like
  47. "dists/%(suite)s/%(component)s/Contents-%(architecture)s.gz" that
  48. should be relative to the archive's root directory. The keywords
  49. include strings for suite, component, architecture and booleans
  50. uncompressed, gzip, bzip2.
  51. '''
  52. self.compression = keywords.get('compression', ['none'])
  53. self.path = template % keywords
  54. def open(self) -> TextIO:
  55. '''
  56. Returns a file object for writing.
  57. '''
  58. # create missing directories
  59. try:
  60. os.makedirs(os.path.dirname(self.path))
  61. except:
  62. pass
  63. self.file = open(self.path + '.new', 'w')
  64. return self.file
  65. # internal helper function
  66. def rename(self, filename: str) -> None:
  67. tempfilename = filename + '.new'
  68. os.chmod(tempfilename, 0o644)
  69. os.rename(tempfilename, filename)
  70. # internal helper function to compress output
  71. def compress(self, cmd, suffix, path) -> None:
  72. in_filename = "{0}.new".format(path)
  73. out_filename = "{0}{1}.new".format(path, suffix)
  74. if cmd is not None:
  75. with open(in_filename, 'r') as in_fh, open(out_filename, 'w') as out_fh:
  76. subprocess.check_call(cmd, stdin=in_fh, stdout=out_fh, close_fds=True)
  77. self.rename("{0}{1}".format(path, suffix))
  78. def close(self) -> None:
  79. '''
  80. Closes the file object and does the compression and rename work.
  81. '''
  82. self.file.close()
  83. for method in _compression_methods:
  84. if method.keyword in self.compression:
  85. self.compress(method.command, method.extension, self.path)
  86. else:
  87. # Try removing the file that would be generated.
  88. # It's not an error if it does not exist.
  89. try:
  90. os.unlink("{0}{1}".format(self.path, method.extension))
  91. except OSError as e:
  92. if e.errno != errno.ENOENT:
  93. raise
  94. else:
  95. os.unlink(self.path + '.new')
  96. class BinaryContentsFileWriter(BaseFileWriter):
  97. def __init__(self, **keywords):
  98. '''
  99. The value of the keywords suite, component, and architecture are
  100. strings. The value of component may be omitted if not applicable.
  101. Output files are gzip compressed only.
  102. '''
  103. flags = {
  104. 'compression': ['gzip'],
  105. }
  106. flags.update(keywords)
  107. if flags['debtype'] == 'deb':
  108. template = "%(archive)s/dists/%(suite)s/%(component)s/Contents-%(architecture)s"
  109. else: # udeb
  110. template = "%(archive)s/dists/%(suite)s/%(component)s/Contents-udeb-%(architecture)s"
  111. BaseFileWriter.__init__(self, template, **flags)
  112. class SourceContentsFileWriter(BaseFileWriter):
  113. def __init__(self, **keywords):
  114. '''
  115. The value of the keywords suite and component are strings.
  116. Output files are gzip compressed only.
  117. '''
  118. flags = {
  119. 'compression': ['gzip'],
  120. }
  121. flags.update(keywords)
  122. template = "%(archive)s/dists/%(suite)s/%(component)s/Contents-source"
  123. BaseFileWriter.__init__(self, template, **flags)
  124. class PackagesFileWriter(BaseFileWriter):
  125. def __init__(self, **keywords):
  126. '''
  127. The value of the keywords suite, component, debtype and architecture
  128. are strings. Output files are gzip compressed only.
  129. '''
  130. flags = {
  131. 'compression': ['gzip', 'xz'],
  132. }
  133. flags.update(keywords)
  134. if flags['debtype'] == 'deb':
  135. template = "%(archive)s/dists/%(suite)s/%(component)s/binary-%(architecture)s/Packages"
  136. else: # udeb
  137. template = "%(archive)s/dists/%(suite)s/%(component)s/debian-installer/binary-%(architecture)s/Packages"
  138. BaseFileWriter.__init__(self, template, **flags)
  139. class SourcesFileWriter(BaseFileWriter):
  140. def __init__(self, **keywords):
  141. '''
  142. The value of the keywords suite and component are strings. Output
  143. files are gzip compressed only.
  144. '''
  145. flags = {
  146. 'compression': ['gzip', 'xz'],
  147. }
  148. flags.update(keywords)
  149. template = "%(archive)s/dists/%(suite)s/%(component)s/source/Sources"
  150. BaseFileWriter.__init__(self, template, **flags)
  151. class TranslationFileWriter(BaseFileWriter):
  152. def __init__(self, **keywords):
  153. '''
  154. The value of the keywords suite, component and language are strings.
  155. Output files are bzip2 compressed only.
  156. '''
  157. flags = {
  158. 'compression': ['bzip2'],
  159. 'language': 'en',
  160. }
  161. flags.update(keywords)
  162. template = "%(archive)s/dists/%(suite)s/%(component)s/i18n/Translation-%(language)s"
  163. super(TranslationFileWriter, self).__init__(template, **flags)