daksql.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. """SQLAlchemy extensions for dak
  2. @copyright: 2014, Ansgar Burchardt <ansgar@debian.org>
  3. @license: GPL-2+
  4. """
  5. # This program is free software; you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation; either version 2 of the License, or
  8. # (at your option) any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program; if not, write to the Free Software
  17. # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  18. from sqlalchemy.ext.compiler import compiles
  19. from sqlalchemy.sql.expression import ColumnElement, ClauseElement, ClauseList, literal
  20. from sqlalchemy.types import Text
  21. from sqlalchemy.util import to_list
  22. class array_agg(ColumnElement):
  23. def __init__(self, expr, order_by=None):
  24. self.expr = ClauseElement(expr)
  25. self.order_by = None
  26. if order_by is not None:
  27. self.order_by = ClauseList(*to_list(order_by))
  28. @compiles(array_agg)
  29. def compile_array_agg(element, compiler, **kw):
  30. if element.order_by is not None:
  31. return "ARRAY_AGG({0} ORDER BY {1})".format(compiler.process(element.expr), compiler.process(element.order_by))
  32. return "ARRAY_AGG({0})".format(compiler.process(element.expr))
  33. class string_agg(ColumnElement):
  34. type = Text()
  35. def __init__(self, column, seperator, order_by=None):
  36. self.column = ClauseList(*to_list(column))
  37. self.seperator = literal(seperator)
  38. self.order_by = None
  39. if order_by is not None:
  40. self.order_by = ClauseList(*to_list(order_by))
  41. @compiles(string_agg)
  42. def compile_string_agg(element, compiler, **kw):
  43. if element.order_by is not None:
  44. return "STRING_AGG({0}, {1} ORDER BY {2})".format(compiler.process(element.column), compiler.process(element.seperator), compiler.process(element.order_by))
  45. return "STRING_AGG({0}, {1})".format(compiler.process(element.column), compiler.process(element.seperator))