dbtest_archive.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. #! /usr/bin/env python
  2. #
  3. # Copyright (C) 2018, Margarita Manteroa <marga@debian.org>
  4. #
  5. # This program is free software; you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation; either version 2 of the License, or
  8. # (at your option) any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License along
  16. # with this program; if not, write to the Free Software Foundation, Inc.,
  17. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  18. from db_test import DBDakTestCase, fixture
  19. from daklib import archive, upload, dbconn, policy
  20. from unittest import main
  21. class ArchiveTestCase(DBDakTestCase):
  22. def setup_keys(self):
  23. ''' Inserts the fingerprints in the keyring into the database.'''
  24. if 'keyring' in self.__dict__:
  25. return
  26. self.keyring = dbconn.Keyring()
  27. self.keyring.keyring_id = 1
  28. self.keyring.load_keys(fixture('packages/gpg/pubring.gpg'))
  29. _, _ = self.keyring.generate_users_from_keyring('%s', self.session)
  30. for key in self.keyring.keys.values():
  31. fpr = key['fingerprints'][0]
  32. fp = dbconn.get_or_set_fingerprint(fpr, self.session)
  33. fp.keyring_id = self.keyring.keyring_id
  34. fp.uid_id = dbconn.get_or_set_uid(key['uid'], self.session).uid_id
  35. self.session.add(fp)
  36. self.session.commit()
  37. def setup_srcformats(self):
  38. '''Add all source formats to the supported suites.'''
  39. for suite_name in self.suite:
  40. self.suite[suite_name].srcformats = self.session.query(
  41. dbconn.SrcFormat).all()
  42. self.session.add(self.suite[suite_name])
  43. self.session.commit()
  44. def attempt_upload(self, changes):
  45. '''Return an ArchiveUpload for the received changes.'''
  46. return archive.ArchiveUpload(
  47. fixture('packages'), changes, [fixture('packages/gpg/pubring.gpg')])
  48. def test_upload_rejects(self):
  49. # Parse the changes file
  50. changes = upload.Changes(fixture('packages'),
  51. 'linux_42.0-1_amd64.changes',
  52. [fixture('packages/gpg/pubring.gpg')],
  53. True)
  54. # Insert the fingerprint, but without associating it to a keyring
  55. dbconn.get_or_set_fingerprint(
  56. changes.primary_fingerprint, self.session)
  57. self.session.commit()
  58. # Try to upload, it should fail with the key being unknown
  59. with self.attempt_upload(changes) as attempt:
  60. result = attempt.check()
  61. self.assertFalse(result)
  62. self.assertEquals(attempt.reject_reasons,
  63. [u'.changes signed by unknown key.'])
  64. # Import the keyring
  65. self.setup_keys()
  66. # New attempt, it should fail with missing suite
  67. with self.attempt_upload(changes) as attempt:
  68. result = attempt.check()
  69. self.assertFalse(result)
  70. self.assertEquals(attempt.reject_reasons,
  71. [u'No target suite found. Please check your target distribution and that you uploaded to the right archive.'])
  72. # Add the missing suite
  73. self.setup_suites([('unstable', 'sid')])
  74. self.session.commit()
  75. # New attempt, it should fail with missing srcformat
  76. with self.attempt_upload(changes) as attempt:
  77. result = attempt.check()
  78. self.assertFalse(result)
  79. self.assertEquals(attempt.reject_reasons,
  80. [u'source format 3.0 (quilt) is not allowed in suite unstable'])
  81. # Add the missing format
  82. self.setup_srcformats()
  83. # New attempt, it should fail with missing architecture
  84. with self.attempt_upload(changes) as attempt:
  85. result = attempt.check()
  86. self.assertFalse(result)
  87. self.assertEquals(attempt.reject_reasons,
  88. [u'Architecture source is not allowed in suite unstable'])
  89. # Add the missing architecture / suites connection
  90. self.setup_architectures()
  91. self.session.commit()
  92. # New attempt, it should succeed and be a new package.
  93. with self.attempt_upload(changes) as attempt:
  94. result = attempt.check()
  95. self.assertTrue(result)
  96. self.assertTrue(attempt.new)
  97. self.assertEquals(attempt.reject_reasons, [])
  98. def test_upload_new(self):
  99. # Parse the changes file
  100. changes = upload.Changes(fixture('packages'),
  101. 'linux_42.0-1_amd64.changes',
  102. [fixture('packages/gpg/pubring.gpg')],
  103. True)
  104. # Do all the setup needed for the upload to be accepted
  105. dbconn.get_or_set_fingerprint(
  106. changes.primary_fingerprint, self.session)
  107. self.setup_keys()
  108. self.setup_suites([('unstable', 'sid')])
  109. self.setup_srcformats()
  110. self.setup_architectures()
  111. self.session.commit()
  112. # First attempt, it should succeed and be a NEW package
  113. first_upload = self.attempt_upload(changes)
  114. with first_upload as attempt:
  115. result = attempt.check()
  116. self.assertTrue(result)
  117. self.assertTrue(attempt.new)
  118. self.assertEquals(attempt.reject_reasons, [])
  119. # Add the override for the source package
  120. override = {}
  121. override['package'] = changes.source_name
  122. override['priority'] = 'optional'
  123. override['section'] = 'kernel'
  124. override['component'] = changes.source.component
  125. override['type'] = 'dsc'
  126. queue = policy.PolicyQueueUploadHandler(changes, self.session)
  127. queue.add_overrides([override], self.suite['unstable'])
  128. self.session.commit()
  129. # Second attempt, it should succeed but be NEW because of new binaries
  130. with self.attempt_upload(changes) as attempt:
  131. result = attempt.check()
  132. self.assertTrue(result)
  133. self.assertTrue(attempt.new)
  134. self.assertEquals(attempt.reject_reasons, [])
  135. # Add the binary overrides
  136. overrides = []
  137. for binary in changes.binaries:
  138. override = {}
  139. override['package'] = binary.name
  140. override['priority'] = 'optional'
  141. override['section'] = 'kernel'
  142. override['component'] = binary.component
  143. override['type'] = binary.type
  144. overrides.append(override)
  145. queue.add_overrides(overrides, self.suite['unstable'])
  146. self.session.commit()
  147. # Third attempt, this time it should succeed and not be NEW
  148. with self.attempt_upload(changes) as attempt:
  149. result = attempt.check()
  150. self.assertTrue(result)
  151. self.assertFalse(attempt.new)
  152. self.assertEquals(attempt.reject_reasons, [])
  153. def classes_to_clean(self):
  154. for object_ in self.session.query(dbconn.Override):
  155. self.session.delete(object_)
  156. if 'suite' in self.__dict__:
  157. self.clean_suites()
  158. return [dbconn.Fingerprint, dbconn.Uid]
  159. if __name__ == '__main__':
  160. main()