Coverage for website/activemembers/gsuite.py: 42.86%

43 statements  

« prev     ^ index     » next       coverage.py v7.6.7, created at 2025-08-14 10:31 +0000

1import hashlib 

2import logging 

3import secrets 

4import string 

5 

6from django.conf import settings 

7from django.utils.translation import gettext_lazy as _ 

8 

9from googleapiclient.errors import HttpError 

10 

11from members.models import Member 

12from utils.google_api import get_directory_api 

13 

14logger = logging.getLogger(__name__) 

15 

16 

17class GSuiteUserService: 

18 def __init__(self, directory_api=None): 

19 self._directory_api = directory_api 

20 

21 @property 

22 def directory_api(self): 

23 if self._directory_api is not None: 

24 return self._directory_api 

25 return get_directory_api() 

26 

27 def _generate_password(self, member: Member): 

28 alphabet = string.ascii_letters + string.digits + "!@#$%^&*-_=+?" 

29 return "".join(secrets.choice(alphabet) for i in range(15)) 

30 

31 def create_user(self, member: Member): 

32 """Create a new GSuite user based on the provided data. 

33 

34 :param member: The member that gets an account 

35 :return returns a tuple with the password and id of the created user 

36 """ 

37 plain_password = self._generate_password(member) 

38 

39 # Google only supports sha-1, md5 or crypt as hash functions[0] for the initial password. 

40 # Because this password should be changed on first login and is safely sent to Google over 

41 # https, we just use sha-1 for simplicity. GitHub code scanning gave a warning about this 

42 # but we have set it to ignore the 'problem'. 

43 # [0]: https://developers.google.com/admin-sdk/directory/reference/rest/v1/users#User.FIELDS.hash_function 

44 digest_password = hashlib.sha1(plain_password.encode("utf-8")).hexdigest() 

45 

46 try: 

47 response = ( 

48 self.directory_api.users() 

49 .insert( 

50 body={ 

51 "name": { 

52 "familyName": member.last_name, 

53 "givenName": member.first_name, 

54 }, 

55 "primaryEmail": f"{member.username}@{settings.GSUITE_MEMBERS_DOMAIN}", 

56 "password": digest_password, 

57 "hashFunction": "SHA-1", 

58 "changePasswordAtNextLogin": "true", 

59 "externalIds": [{"value": f"{member.pk}", "type": "login_id"}], 

60 "includeInGlobalAddressList": "false", 

61 "orgUnitPath": "/", 

62 }, 

63 ) 

64 .execute() 

65 ) 

66 except HttpError as e: 

67 if e.resp.status == 409: 

68 return self.update_user(member, member.username) 

69 raise e 

70 

71 return response["primaryEmail"], plain_password 

72 

73 def update_user(self, member: Member, username: str): 

74 response = ( 

75 self.directory_api.users() 

76 .patch( 

77 body={ 

78 "suspended": "false", 

79 "primaryEmail": f"{member.username}@{settings.GSUITE_MEMBERS_DOMAIN}", 

80 }, 

81 userKey=f"{username}@{settings.GSUITE_MEMBERS_DOMAIN}", 

82 ) 

83 .execute() 

84 ) 

85 

86 if username != member.username: 

87 self.directory_api.users().aliases().delete( 

88 userKey=f"{member.username}@{settings.GSUITE_MEMBERS_DOMAIN}", 

89 alias=f"{username}@{settings.GSUITE_MEMBERS_DOMAIN}", 

90 ).execute() 

91 

92 return response["primaryEmail"], _("known by the user") 

93 

94 def suspend_user(self, username): 

95 """Suspend the user in GSuite. 

96 

97 :param username: username of the user 

98 """ 

99 self.directory_api.users().patch( 

100 body={ 

101 "suspended": "true", 

102 }, 

103 userKey=f"{username}@{settings.GSUITE_MEMBERS_DOMAIN}", 

104 ).execute() 

105 

106 def delete_user(self, email): 

107 """Delete the user from GSuite. 

108 

109 :param email: primary email of the user 

110 """ 

111 self.directory_api.users().delete(userKey=email).execute() 

112 

113 def get_suspended_users(self): 

114 """Get all the suspended users.""" 

115 response = ( 

116 self.directory_api.users() 

117 .list(domain=settings.GSUITE_MEMBERS_DOMAIN, query="isSuspended=true") 

118 .execute() 

119 ) 

120 return response.get("users", [])