sso_auth.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. # Copyright 2013 The Distro Tracker Developers
  2. # See the COPYRIGHT file at the top-level directory of this distribution and
  3. # at http://deb.li/DTAuthors
  4. #
  5. # This file is part of Distro Tracker. It is subject to the license terms
  6. # in the LICENSE file found in the top-level directory of this
  7. # distribution and at http://deb.li/DTLicense. No part of Distro Tracker,
  8. # including this file, may be copied, modified, propagated, or distributed
  9. # except according to the terms contained in the LICENSE file.
  10. from __future__ import unicode_literals
  11. from django.contrib.auth.middleware import RemoteUserMiddleware
  12. from django.contrib.auth.backends import RemoteUserBackend
  13. from django.contrib import auth
  14. from django.core.exceptions import ImproperlyConfigured
  15. from distro_tracker.accounts.models import UserEmail
  16. from distro_tracker.accounts.models import User
  17. try:
  18. import ldap
  19. except ImportError:
  20. ldap = None
  21. class DebianSsoUserMiddleware(RemoteUserMiddleware):
  22. """
  23. Middleware that initiates user authentication based on the REMOTE_USER
  24. field provided by Debian's SSO system, or based on the SSL_CLIENT_S_DN_CN
  25. field provided by the validation of the SSL client certificate generated
  26. by sso.debian.org.
  27. If the currently logged in user is a DD (as identified by having a
  28. @debian.org address), he is forcefully logged out if the header is no longer
  29. found or is invalid.
  30. """
  31. dacs_header = 'REMOTE_USER'
  32. cert_header = 'SSL_CLIENT_S_DN_CN'
  33. @staticmethod
  34. def dacs_user_to_email(username):
  35. parts = [part for part in username.split(':') if part]
  36. federation, jurisdiction = parts[:2]
  37. if (federation, jurisdiction) != ('DEBIANORG', 'DEBIAN'):
  38. return
  39. username = parts[-1]
  40. if '@' in username:
  41. return username # Full email already
  42. return username + '@debian.org'
  43. @staticmethod
  44. def is_debian_member(user):
  45. return any(
  46. email.email.endswith('@debian.org')
  47. for email in user.emails.all()
  48. )
  49. def process_request(self, request):
  50. # AuthenticationMiddleware is required so that request.user exists.
  51. if not hasattr(request, 'user'):
  52. raise ImproperlyConfigured(
  53. "The Django remote user auth middleware requires the"
  54. " authentication middleware to be installed. Edit your"
  55. " MIDDLEWARE_CLASSES setting to insert"
  56. " 'django.contrib.auth.middleware.AuthenticationMiddleware'"
  57. " before the DebianSsoUserMiddleware class.")
  58. dacs_user = request.META.get(self.dacs_header)
  59. cert_user = request.META.get(self.cert_header)
  60. if cert_user is not None:
  61. remote_user = cert_user
  62. elif dacs_user is not None:
  63. remote_user = self.dacs_user_to_email(dacs_user)
  64. else:
  65. # Debian developers can only authenticate via SSO/SSL certs
  66. # so log them out now if they no longer have the proper META
  67. # variable
  68. if request.user.is_authenticated():
  69. if self.is_debian_member(request.user):
  70. auth.logout(request)
  71. return
  72. if request.user.is_authenticated():
  73. if request.user.emails.filter(email=remote_user).exists():
  74. # The currently logged in user matches the one given by the
  75. # headers.
  76. return
  77. # This will create the user if it doesn't exist
  78. user = auth.authenticate(remote_user=remote_user)
  79. if user:
  80. # User is valid. Set request.user and persist user in the session
  81. # by logging the user in.
  82. request.user = user
  83. auth.login(request, user)
  84. class DebianSsoUserBackend(RemoteUserBackend):
  85. """
  86. The authentication backend which authenticates the provided remote user
  87. (identified by his @debian.org email) in Distro Tracker. If a matching User
  88. model instance does not exist, one is automatically created. In that case
  89. the DDs first and last name are pulled from Debian's LDAP.
  90. """
  91. def authenticate(self, remote_user):
  92. if not remote_user:
  93. return
  94. email = remote_user
  95. user_email, _ = UserEmail.objects.get_or_create(email=email)
  96. if not user_email.user:
  97. kwargs = {}
  98. names = self.get_user_details(remote_user)
  99. if names:
  100. kwargs.update(names)
  101. user = User.objects.create_user(main_email=email, **kwargs)
  102. else:
  103. user = User.objects.get(pk=user_email.user.id)
  104. return user
  105. @staticmethod
  106. def get_uid(remote_user):
  107. # Strips off the @debian.org part of the email leaving the uid
  108. if remote_user.endswith('@debian.org'):
  109. return remote_user[:-11]
  110. return remote_user
  111. def get_user_details(self, remote_user):
  112. """
  113. Gets the details of the given user from the Debian LDAP.
  114. :return: Dict with the keys ``first_name``, ``last_name``
  115. ``None`` if the LDAP lookup did not return anything.
  116. """
  117. if ldap is None:
  118. return None
  119. if not remote_user.endswith('@debian.org'):
  120. # We only know how to extract data for DD via LDAP
  121. return None
  122. l = ldap.initialize('ldap://db.debian.org')
  123. result_set = l.search_s(
  124. 'dc=debian,dc=org',
  125. ldap.SCOPE_SUBTREE,
  126. 'uid={}'.format(self.get_uid(remote_user)),
  127. None)
  128. if not result_set:
  129. return None
  130. result = result_set[0]
  131. return {
  132. 'first_name': result[1]['cn'][0].decode('utf-8'),
  133. 'last_name': result[1]['sn'][0].decode('utf-8'),
  134. }