dbtest_multiproc.py 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. #! /usr/bin/env python3
  2. from db_test import DBDakTestCase
  3. from daklib.dbconn import DBConn
  4. from multiprocessing import Pool
  5. from time import sleep
  6. import sqlalchemy.sql as sql
  7. import unittest
  8. def read_number():
  9. session = DBConn().session()
  10. result = session.query(sql.column('foo')).from_statement(sql.text('select 7 as foo')).scalar()
  11. sleep(0.1)
  12. session.close()
  13. return result
  14. class MultiProcTestCase(DBDakTestCase):
  15. """
  16. This TestCase checks that DBConn works with multiprocessing.
  17. """
  18. def save_result(self, result):
  19. self.result += result
  20. def test_seven(self):
  21. '''
  22. Test apply_async() with a database session.
  23. '''
  24. self.result = 0
  25. pool = Pool()
  26. pool.apply_async(read_number, (), callback=self.save_result)
  27. pool.apply_async(read_number, (), callback=self.save_result)
  28. pool.apply_async(read_number, (), callback=self.save_result)
  29. pool.apply_async(read_number, (), callback=self.save_result)
  30. pool.apply_async(read_number, (), callback=self.save_result)
  31. pool.close()
  32. pool.join()
  33. self.assertEqual(5 * 7, self.result)
  34. if __name__ == '__main__':
  35. unittest.main()