Coverage for website/photos/services.py: 39.82%

83 statements  

« prev     ^ index     » next       coverage.py v7.6.7, created at 2025-08-14 10:31 +0000

1import logging 

2import os 

3import tarfile 

4from zipfile import ZipFile, is_zipfile 

5 

6from django.core.files import File 

7from django.db import transaction 

8from django.db.models import BooleanField, Case, ExpressionWrapper, Q, Value, When 

9from django.forms import ValidationError 

10from django.http import Http404 

11from django.utils.translation import gettext_lazy as _ 

12 

13from PIL import UnidentifiedImageError 

14 

15from photos.models import Photo 

16 

17logger = logging.getLogger(__name__) 

18 

19 

20def check_shared_album_token(album, token): 

21 """Return a 404 if the token does not match the album token.""" 

22 if token != album.access_token: 22 ↛ 23line 22 didn't jump to line 23 because the condition on line 22 was never true

23 raise Http404("Invalid token.") 

24 

25 

26def is_album_accessible(request, album): 

27 """Check if the request user can access an album.""" 

28 if request.member and request.member.has_active_membership(): 

29 return True 

30 if request.member and not request.member.has_active_membership(): 

31 # This user is currently not a member, so need to check if he/she 

32 # can view this album by checking the membership 

33 return request.member.membership_set.filter( 

34 Q(since__lte=album.date) & Q(until__gte=album.date) 

35 ).exists() 

36 return False 

37 

38 

39def get_annotated_accessible_albums(request, albums): 

40 """Annotate the albums which are accessible by the user.""" 

41 if request.member and request.member.has_active_membership(): 

42 albums = albums.annotate( 

43 accessible=ExpressionWrapper(Value(True), output_field=BooleanField()) 

44 ) 

45 elif request.member and not request.member.has_active_membership(): 

46 albums_filter = Q(pk__in=[]) 

47 for membership in request.member.membership_set.all(): 

48 albums_filter |= Q(date__gte=membership.since) & Q( 

49 date__lte=membership.until 

50 ) 

51 

52 albums = albums.annotate( 

53 accessible=Case( 

54 When(albums_filter, then=Value(True)), 

55 default=Value(False), 

56 output_field=BooleanField(), 

57 ) 

58 ) 

59 else: 

60 albums = albums.annotate( 

61 accessible=ExpressionWrapper(Value(False), output_field=BooleanField()) 

62 ) 

63 

64 return albums 

65 

66 

67def extract_archive(album, archive) -> tuple[dict[str, str], int]: 

68 """Extract zip and tar files.""" 

69 warnings, count = {}, 0 

70 if is_zipfile(archive): 

71 archive.seek(0) 

72 with ZipFile(archive) as zip_file: 

73 for photo in sorted(zip_file.namelist()): 

74 if zip_file.getinfo(photo).is_dir(): 

75 continue 

76 if not _has_photo_extension(photo): 

77 warnings[photo] = "has an unknown extension." 

78 continue 

79 

80 with zip_file.open(photo) as file: 

81 if warning := _try_save_photo(album, file, photo): 

82 warnings[photo] = warning 

83 else: 

84 count += 1 

85 return warnings, count 

86 

87 archive.seek(0) 

88 # is_tarfile only supports filenames, so we cannot use that 

89 try: 

90 with tarfile.open(fileobj=archive) as tar_file: 

91 for photo in sorted(tar_file.getnames()): 

92 if not tar_file.getmember(photo).isfile(): 

93 continue 

94 

95 if not _has_photo_extension(photo): 

96 warnings[photo] = "has an unknown extension." 

97 continue 

98 

99 with tar_file.extractfile(photo) as file: 

100 if warning := _try_save_photo(album, file, photo): 

101 warnings[photo] = warning 

102 else: 

103 count += 1 

104 except tarfile.ReadError as e: 

105 raise ValueError(_("The uploaded file is not a zip or tar file.")) from e 

106 

107 return warnings, count 

108 

109 

110def _has_photo_extension(filename): 

111 """Check if the filename has a photo extension.""" 

112 __, extension = os.path.splitext(filename) 

113 return extension.lower() in (".jpg", ".jpeg", ".png", ".webp") 

114 

115 

116def _try_save_photo(album, file, filename) -> str | None: 

117 """Try to save a photo to an album. 

118 

119 Returns None, or a string describing a reason for failure. 

120 """ 

121 instance = Photo(album=album) 

122 instance.file = File(file, filename) 

123 try: 

124 with transaction.atomic(): 

125 instance.full_clean() 

126 instance.save() 

127 except ValidationError as e: 

128 logger.warning(f"Photo '{filename}' could not be read: {e}", exc_info=e) 

129 return "could not be read." 

130 except UnidentifiedImageError as e: 

131 logger.warning(f"Photo '{filename}' could not be read: {e}", exc_info=e) 

132 return "could not be read." 

133 except OSError as e: 

134 logger.warning(f"Photo '{filename}' could not be read: {e}", exc_info=e) 

135 return "could not be read."