Coverage for website/photos/services.py: 39.82%
83 statements
« prev ^ index » next coverage.py v7.6.7, created at 2025-08-14 10:31 +0000
« prev ^ index » next coverage.py v7.6.7, created at 2025-08-14 10:31 +0000
1import logging
2import os
3import tarfile
4from zipfile import ZipFile, is_zipfile
6from django.core.files import File
7from django.db import transaction
8from django.db.models import BooleanField, Case, ExpressionWrapper, Q, Value, When
9from django.forms import ValidationError
10from django.http import Http404
11from django.utils.translation import gettext_lazy as _
13from PIL import UnidentifiedImageError
15from photos.models import Photo
17logger = logging.getLogger(__name__)
20def check_shared_album_token(album, token):
21 """Return a 404 if the token does not match the album token."""
22 if token != album.access_token: 22 ↛ 23line 22 didn't jump to line 23 because the condition on line 22 was never true
23 raise Http404("Invalid token.")
26def is_album_accessible(request, album):
27 """Check if the request user can access an album."""
28 if request.member and request.member.has_active_membership():
29 return True
30 if request.member and not request.member.has_active_membership():
31 # This user is currently not a member, so need to check if he/she
32 # can view this album by checking the membership
33 return request.member.membership_set.filter(
34 Q(since__lte=album.date) & Q(until__gte=album.date)
35 ).exists()
36 return False
39def get_annotated_accessible_albums(request, albums):
40 """Annotate the albums which are accessible by the user."""
41 if request.member and request.member.has_active_membership():
42 albums = albums.annotate(
43 accessible=ExpressionWrapper(Value(True), output_field=BooleanField())
44 )
45 elif request.member and not request.member.has_active_membership():
46 albums_filter = Q(pk__in=[])
47 for membership in request.member.membership_set.all():
48 albums_filter |= Q(date__gte=membership.since) & Q(
49 date__lte=membership.until
50 )
52 albums = albums.annotate(
53 accessible=Case(
54 When(albums_filter, then=Value(True)),
55 default=Value(False),
56 output_field=BooleanField(),
57 )
58 )
59 else:
60 albums = albums.annotate(
61 accessible=ExpressionWrapper(Value(False), output_field=BooleanField())
62 )
64 return albums
67def extract_archive(album, archive) -> tuple[dict[str, str], int]:
68 """Extract zip and tar files."""
69 warnings, count = {}, 0
70 if is_zipfile(archive):
71 archive.seek(0)
72 with ZipFile(archive) as zip_file:
73 for photo in sorted(zip_file.namelist()):
74 if zip_file.getinfo(photo).is_dir():
75 continue
76 if not _has_photo_extension(photo):
77 warnings[photo] = "has an unknown extension."
78 continue
80 with zip_file.open(photo) as file:
81 if warning := _try_save_photo(album, file, photo):
82 warnings[photo] = warning
83 else:
84 count += 1
85 return warnings, count
87 archive.seek(0)
88 # is_tarfile only supports filenames, so we cannot use that
89 try:
90 with tarfile.open(fileobj=archive) as tar_file:
91 for photo in sorted(tar_file.getnames()):
92 if not tar_file.getmember(photo).isfile():
93 continue
95 if not _has_photo_extension(photo):
96 warnings[photo] = "has an unknown extension."
97 continue
99 with tar_file.extractfile(photo) as file:
100 if warning := _try_save_photo(album, file, photo):
101 warnings[photo] = warning
102 else:
103 count += 1
104 except tarfile.ReadError as e:
105 raise ValueError(_("The uploaded file is not a zip or tar file.")) from e
107 return warnings, count
110def _has_photo_extension(filename):
111 """Check if the filename has a photo extension."""
112 __, extension = os.path.splitext(filename)
113 return extension.lower() in (".jpg", ".jpeg", ".png", ".webp")
116def _try_save_photo(album, file, filename) -> str | None:
117 """Try to save a photo to an album.
119 Returns None, or a string describing a reason for failure.
120 """
121 instance = Photo(album=album)
122 instance.file = File(file, filename)
123 try:
124 with transaction.atomic():
125 instance.full_clean()
126 instance.save()
127 except ValidationError as e:
128 logger.warning(f"Photo '{filename}' could not be read: {e}", exc_info=e)
129 return "could not be read."
130 except UnidentifiedImageError as e:
131 logger.warning(f"Photo '{filename}' could not be read: {e}", exc_info=e)
132 return "could not be read."
133 except OSError as e:
134 logger.warning(f"Photo '{filename}' could not be read: {e}", exc_info=e)
135 return "could not be read."