dakmultiprocessing.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. # vim:set et sw=4:
  2. """
  3. multiprocessing for DAK
  4. @contact: Debian FTP Master <ftpmaster@debian.org>
  5. @copyright: 2011 Ansgar Burchardt <ansgar@debian.org>
  6. @license: GNU General Public License version 2 or later
  7. """
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU General Public License as published by
  10. # the Free Software Foundation; either version 2 of the License, or
  11. # (at your option) any later version.
  12. # This program is distributed in the hope that it will be useful,
  13. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. # GNU General Public License for more details.
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program; if not, write to the Free Software
  18. # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  19. ###############################################################################
  20. from multiprocessing.pool import Pool
  21. from signal import signal, SIGHUP, SIGTERM, SIGPIPE, SIGALRM
  22. import traceback
  23. import sqlalchemy.orm.session
  24. __all__ = []
  25. PROC_STATUS_SUCCESS = 0 # Everything ok
  26. PROC_STATUS_EXCEPTION = 1 # An exception was caught
  27. PROC_STATUS_SIGNALRAISED = 2 # A signal was generated
  28. PROC_STATUS_MISCFAILURE = 3 # Process specific error; see message
  29. __all__.extend(['PROC_STATUS_SUCCESS', 'PROC_STATUS_EXCEPTION',
  30. 'PROC_STATUS_SIGNALRAISED', 'PROC_STATUS_MISCFAILURE'])
  31. class SignalException(Exception):
  32. def __init__(self, signum):
  33. self.signum = signum
  34. def __str__(self):
  35. return "<SignalException: %d>" % self.signum
  36. __all__.append('SignalException')
  37. def signal_handler(signum, info):
  38. raise SignalException(signum)
  39. def _func_wrapper(func, *args, **kwds):
  40. # We need to handle signals to avoid hanging
  41. signal(SIGHUP, signal_handler)
  42. signal(SIGTERM, signal_handler)
  43. signal(SIGPIPE, signal_handler)
  44. signal(SIGALRM, signal_handler)
  45. # We expect our callback function to return:
  46. # (status, messages)
  47. # Where:
  48. # status is one of PROC_STATUS_*
  49. # messages is a string used for logging
  50. try:
  51. return (func(*args, **kwds))
  52. except SignalException as e:
  53. return (PROC_STATUS_SIGNALRAISED, e.signum)
  54. except Exception as e:
  55. return (PROC_STATUS_EXCEPTION, "Exception: %s\n%s" % (e, traceback.format_exc()))
  56. finally:
  57. # Make sure connections are closed. We might die otherwise.
  58. sqlalchemy.orm.session.Session.close_all()
  59. class DakProcessPool(Pool):
  60. def __init__(self, *args, **kwds):
  61. Pool.__init__(self, *args, **kwds)
  62. self.results = []
  63. self.int_results = []
  64. def apply_async(self, func, args=(), kwds={}, callback=None):
  65. wrapper_args = list(args)
  66. wrapper_args.insert(0, func)
  67. self.int_results.append(Pool.apply_async(self, _func_wrapper, wrapper_args, kwds, callback))
  68. def join(self):
  69. Pool.join(self)
  70. for r in self.int_results:
  71. # return values were already handled in the callbacks, but asking
  72. # for them might raise exceptions which would otherwise be lost
  73. self.results.append(r.get())
  74. def overall_status(self):
  75. # Return the highest of our status results
  76. # This basically allows us to do sys.exit(overall_status()) and have us
  77. # exit 0 if everything was good and non-zero if not
  78. status = 0
  79. for r in self.results:
  80. if r[0] > status:
  81. status = r[0]
  82. return status
  83. __all__.append('DakProcessPool')