Coverage for website/activemembers/gsuite.py: 42.86%
43 statements
« prev ^ index » next coverage.py v7.6.7, created at 2025-08-14 10:31 +0000
« prev ^ index » next coverage.py v7.6.7, created at 2025-08-14 10:31 +0000
1import hashlib
2import logging
3import secrets
4import string
6from django.conf import settings
7from django.utils.translation import gettext_lazy as _
9from googleapiclient.errors import HttpError
11from members.models import Member
12from utils.google_api import get_directory_api
14logger = logging.getLogger(__name__)
17class GSuiteUserService:
18 def __init__(self, directory_api=None):
19 self._directory_api = directory_api
21 @property
22 def directory_api(self):
23 if self._directory_api is not None:
24 return self._directory_api
25 return get_directory_api()
27 def _generate_password(self, member: Member):
28 alphabet = string.ascii_letters + string.digits + "!@#$%^&*-_=+?"
29 return "".join(secrets.choice(alphabet) for i in range(15))
31 def create_user(self, member: Member):
32 """Create a new GSuite user based on the provided data.
34 :param member: The member that gets an account
35 :return returns a tuple with the password and id of the created user
36 """
37 plain_password = self._generate_password(member)
39 # Google only supports sha-1, md5 or crypt as hash functions[0] for the initial password.
40 # Because this password should be changed on first login and is safely sent to Google over
41 # https, we just use sha-1 for simplicity. GitHub code scanning gave a warning about this
42 # but we have set it to ignore the 'problem'.
43 # [0]: https://developers.google.com/admin-sdk/directory/reference/rest/v1/users#User.FIELDS.hash_function
44 digest_password = hashlib.sha1(plain_password.encode("utf-8")).hexdigest()
46 try:
47 response = (
48 self.directory_api.users()
49 .insert(
50 body={
51 "name": {
52 "familyName": member.last_name,
53 "givenName": member.first_name,
54 },
55 "primaryEmail": f"{member.username}@{settings.GSUITE_MEMBERS_DOMAIN}",
56 "password": digest_password,
57 "hashFunction": "SHA-1",
58 "changePasswordAtNextLogin": "true",
59 "externalIds": [{"value": f"{member.pk}", "type": "login_id"}],
60 "includeInGlobalAddressList": "false",
61 "orgUnitPath": "/",
62 },
63 )
64 .execute()
65 )
66 except HttpError as e:
67 if e.resp.status == 409:
68 return self.update_user(member, member.username)
69 raise e
71 return response["primaryEmail"], plain_password
73 def update_user(self, member: Member, username: str):
74 response = (
75 self.directory_api.users()
76 .patch(
77 body={
78 "suspended": "false",
79 "primaryEmail": f"{member.username}@{settings.GSUITE_MEMBERS_DOMAIN}",
80 },
81 userKey=f"{username}@{settings.GSUITE_MEMBERS_DOMAIN}",
82 )
83 .execute()
84 )
86 if username != member.username:
87 self.directory_api.users().aliases().delete(
88 userKey=f"{member.username}@{settings.GSUITE_MEMBERS_DOMAIN}",
89 alias=f"{username}@{settings.GSUITE_MEMBERS_DOMAIN}",
90 ).execute()
92 return response["primaryEmail"], _("known by the user")
94 def suspend_user(self, username):
95 """Suspend the user in GSuite.
97 :param username: username of the user
98 """
99 self.directory_api.users().patch(
100 body={
101 "suspended": "true",
102 },
103 userKey=f"{username}@{settings.GSUITE_MEMBERS_DOMAIN}",
104 ).execute()
106 def delete_user(self, email):
107 """Delete the user from GSuite.
109 :param email: primary email of the user
110 """
111 self.directory_api.users().delete(userKey=email).execute()
113 def get_suspended_users(self):
114 """Get all the suspended users."""
115 response = (
116 self.directory_api.users()
117 .list(domain=settings.GSUITE_MEMBERS_DOMAIN, query="isSuspended=true")
118 .execute()
119 )
120 return response.get("users", [])