Coverage for website/photos/services.py: 38.79%

86 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2026-06-21 23:59 +0000

1import logging 

2import os 

3import tarfile 

4from zipfile import ZipFile, is_zipfile 

5 

6from django.core.files import File 

7from django.db import transaction 

8from django.db.models import BooleanField, Case, ExpressionWrapper, Q, Value, When 

9from django.forms import ValidationError 

10from django.http import Http404 

11from django.utils.translation import gettext_lazy as _ 

12 

13from PIL import UnidentifiedImageError 

14 

15from photos.models import Photo 

16 

17logger = logging.getLogger(__name__) 

18 

19 

20def check_shared_album_token(album, token): 

21 """Return a 404 if the token does not match the album token.""" 

22 if token != album.access_token: 22 ↛ 23line 22 didn't jump to line 23 because the condition on line 22 was never true

23 raise Http404("Invalid token.") 

24 

25 

26def is_album_accessible(request, album): 

27 """Check if the request user can access an album.""" 

28 if request.member and request.member.has_active_membership(): 

29 return True 

30 if request.member and not request.member.has_active_membership(): 

31 # This user is currently not a member, so need to check if he/she 

32 # can view this album by checking the membership 

33 return request.member.membership_set.filter( 

34 Q(since__lte=album.date) & Q(until__gte=album.date) 

35 ).exists() 

36 return False 

37 

38 

39def get_annotated_accessible_albums(request, albums): 

40 """Annotate the albums which are accessible by the user.""" 

41 if request.member and request.member.has_active_membership(): 

42 albums = albums.annotate( 

43 accessible=ExpressionWrapper(Value(True), output_field=BooleanField()) 

44 ) 

45 elif request.member and not request.member.has_active_membership(): 

46 albums_filter = Q(pk__in=[]) 

47 for membership in request.member.membership_set.all(): 

48 albums_filter |= Q(date__gte=membership.since) & Q( 

49 date__lte=membership.until 

50 ) 

51 

52 albums = albums.annotate( 

53 accessible=Case( 

54 When(albums_filter, then=Value(True)), 

55 default=Value(False), 

56 output_field=BooleanField(), 

57 ) 

58 ) 

59 else: 

60 albums = albums.annotate( 

61 accessible=ExpressionWrapper(Value(False), output_field=BooleanField()) 

62 ) 

63 

64 return albums 

65 

66 

67def extract_archive(album, archive) -> tuple[dict[str, str], int]: 

68 """Extract zip and tar files.""" 

69 warnings, count = {}, 0 

70 pos = 1 

71 if is_zipfile(archive): 

72 archive.seek(0) 

73 with ZipFile(archive) as zip_file: 

74 for photo in sorted(zip_file.namelist()): 

75 if zip_file.getinfo(photo).is_dir(): 

76 continue 

77 if not _has_photo_extension(photo): 

78 warnings[photo] = "has an unknown extension." 

79 continue 

80 

81 with zip_file.open(photo) as file: 

82 if warning := _try_save_photo(album, file, photo, pos): 

83 warnings[photo] = warning 

84 else: 

85 count += 1 

86 pos += 1 

87 return warnings, count 

88 

89 archive.seek(0) 

90 # is_tarfile only supports filenames, so we cannot use that 

91 try: 

92 with tarfile.open(fileobj=archive) as tar_file: 

93 for photo in sorted(tar_file.getnames()): 

94 if not tar_file.getmember(photo).isfile(): 

95 continue 

96 

97 if not _has_photo_extension(photo): 

98 warnings[photo] = "has an unknown extension." 

99 continue 

100 

101 with tar_file.extractfile(photo) as file: 

102 if warning := _try_save_photo(album, file, photo): 

103 warnings[photo] = warning 

104 else: 

105 count += 1 

106 except tarfile.ReadError as e: 

107 raise ValueError(_("The uploaded file is not a zip or tar file.")) from e 

108 

109 return warnings, count 

110 

111 

112def _has_photo_extension(filename): 

113 """Check if the filename has a photo extension.""" 

114 __, extension = os.path.splitext(filename) 

115 return extension.lower() in (".jpg", ".jpeg", ".png", ".webp") 

116 

117 

118def _try_save_photo(album, file, filename, pos) -> str | None: 

119 """Try to save a photo to an album. 

120 

121 Returns None, or a string describing a reason for failure. 

122 """ 

123 filename = f"{pos}-{filename}" 

124 instance = Photo(album=album) 

125 instance.file = File(file, filename) 

126 try: 

127 with transaction.atomic(): 

128 instance.full_clean() 

129 instance.save() 

130 except ValidationError as e: 

131 logger.warning(f"Photo '{filename}' could not be read: {e}", exc_info=e) 

132 return "could not be read." 

133 except UnidentifiedImageError as e: 

134 logger.warning(f"Photo '{filename}' could not be read: {e}", exc_info=e) 

135 return "could not be read." 

136 except OSError as e: 

137 logger.warning(f"Photo '{filename}' could not be read: {e}", exc_info=e) 

138 return "could not be read."