dakmultiprocessing.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. #!/usr/bin/env python
  2. # vim:set et sw=4:
  3. """
  4. multiprocessing for DAK
  5. @contact: Debian FTP Master <ftpmaster@debian.org>
  6. @copyright: 2011 Ansgar Burchardt <ansgar@debian.org>
  7. @license: GNU General Public License version 2 or later
  8. """
  9. # This program is free software; you can redistribute it and/or modify
  10. # it under the terms of the GNU General Public License as published by
  11. # the Free Software Foundation; either version 2 of the License, or
  12. # (at your option) any later version.
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU General Public License for more details.
  17. # You should have received a copy of the GNU General Public License
  18. # along with this program; if not, write to the Free Software
  19. # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  20. ###############################################################################
  21. from multiprocessing.pool import Pool
  22. from signal import signal, SIGHUP, SIGTERM, SIGPIPE, SIGALRM
  23. import traceback
  24. import sqlalchemy.orm.session
  25. __all__ = []
  26. PROC_STATUS_SUCCESS = 0 # Everything ok
  27. PROC_STATUS_EXCEPTION = 1 # An exception was caught
  28. PROC_STATUS_SIGNALRAISED = 2 # A signal was generated
  29. PROC_STATUS_MISCFAILURE = 3 # Process specific error; see message
  30. __all__.extend(['PROC_STATUS_SUCCESS', 'PROC_STATUS_EXCEPTION',
  31. 'PROC_STATUS_SIGNALRAISED', 'PROC_STATUS_MISCFAILURE'])
  32. class SignalException(Exception):
  33. def __init__(self, signum):
  34. self.signum = signum
  35. def __str__(self):
  36. return "<SignalException: %d>" % self.signum
  37. __all__.append('SignalException')
  38. def signal_handler(signum, info):
  39. raise SignalException(signum)
  40. def _func_wrapper(func, *args, **kwds):
  41. # We need to handle signals to avoid hanging
  42. signal(SIGHUP, signal_handler)
  43. signal(SIGTERM, signal_handler)
  44. signal(SIGPIPE, signal_handler)
  45. signal(SIGALRM, signal_handler)
  46. # We expect our callback function to return:
  47. # (status, messages)
  48. # Where:
  49. # status is one of PROC_STATUS_*
  50. # messages is a string used for logging
  51. try:
  52. return (func(*args, **kwds))
  53. except SignalException as e:
  54. return (PROC_STATUS_SIGNALRAISED, e.signum)
  55. except Exception as e:
  56. return (PROC_STATUS_EXCEPTION, "Exception: %s\n%s" % (e, traceback.format_exc()))
  57. finally:
  58. # Make sure connections are closed. We might die otherwise.
  59. sqlalchemy.orm.session.Session.close_all()
  60. class DakProcessPool(Pool):
  61. def __init__(self, *args, **kwds):
  62. Pool.__init__(self, *args, **kwds)
  63. self.results = []
  64. self.int_results = []
  65. def apply_async(self, func, args=(), kwds={}, callback=None):
  66. wrapper_args = list(args)
  67. wrapper_args.insert(0, func)
  68. self.int_results.append(Pool.apply_async(self, _func_wrapper, wrapper_args, kwds, callback))
  69. def join(self):
  70. Pool.join(self)
  71. for r in self.int_results:
  72. # return values were already handled in the callbacks, but asking
  73. # for them might raise exceptions which would otherwise be lost
  74. self.results.append(r.get())
  75. def overall_status(self):
  76. # Return the highest of our status results
  77. # This basically allows us to do sys.exit(overall_status()) and have us
  78. # exit 0 if everything was good and non-zero if not
  79. status = 0
  80. for r in self.results:
  81. if r[0] > status:
  82. status = r[0]
  83. return status
  84. __all__.append('DakProcessPool')