Coverage for website/utils/management/commands/remove_unused_media.py: 0.00%
98 statements
« prev ^ index » next coverage.py v7.6.7, created at 2025-08-14 10:31 +0000
« prev ^ index » next coverage.py v7.6.7, created at 2025-08-14 10:31 +0000
1import os
3from django.apps import apps
4from django.conf import settings
5from django.core.files.storage import DefaultStorage, storages
6from django.core.management.base import BaseCommand
7from django.core.validators import EMPTY_VALUES
8from django.db import models
9from django.utils import timezone
12def get_file_fields():
13 """Get all FileFields of all models."""
14 all_models = apps.get_models()
15 fields = []
16 for model in all_models:
17 for field in model._meta.get_fields():
18 if isinstance(field, models.FileField):
19 fields.append(field)
20 return fields
23def remove_empty_dirs(path=None):
24 """Remove the directory `path` if it is emty, and return whether it was removed."""
25 if not path:
26 path = settings.MEDIA_ROOT
28 if not os.path.isdir(path):
29 return False
31 listdir = [os.path.join(path, filename) for filename in os.listdir(path)]
33 if all(list(map(remove_empty_dirs, listdir))):
34 os.rmdir(path)
35 return True
36 return False
39def get_used_media(storage):
40 """Get the filenames of all files stored in a FileField of any model."""
41 media = set()
43 for field in get_file_fields():
44 is_null = {
45 f"{field.name}__isnull": True,
46 }
47 is_empty = {
48 f"{field.name}": "",
49 }
51 if not isinstance(field.storage, type(storage)):
52 continue
54 for value in (
55 field.model._base_manager.values_list(field.name, flat=True)
56 .exclude(**is_empty)
57 .exclude(**is_null)
58 ):
59 if value not in EMPTY_VALUES:
60 media.add(value)
62 return media
65def get_all_media(storage, minimum_file_age=None, folder=""):
66 media = set()
67 initial_time = timezone.now()
69 if not storage.exists(folder):
70 return []
72 dirs, files = storage.listdir(folder)
74 # Remove file locations in case they are nested
75 # (public is in the private media dir at the time of writing this code)
76 if settings.PUBLIC_MEDIA_LOCATION in dirs:
77 dirs.remove(settings.PUBLIC_MEDIA_LOCATION)
78 if settings.PRIVATE_MEDIA_LOCATION in dirs:
79 dirs.remove(settings.PRIVATE_MEDIA_LOCATION)
81 for name in files:
82 name = os.path.join(folder, name)
83 if minimum_file_age:
84 file_age = initial_time - storage.get_modified_time(name)
86 if file_age.total_seconds() < minimum_file_age:
87 continue
88 else:
89 media.add(name)
91 for name in dirs:
92 media.update(
93 get_all_media(storage, minimum_file_age, os.path.join(folder, name))
94 )
96 return media
99def get_unused_media(storage, minimum_file_age=None):
100 all_media = get_all_media(storage, minimum_file_age)
101 used_media = get_used_media(storage)
103 return all_media - used_media
106class Command(BaseCommand):
107 help = "Clean unused media files which have no reference in models"
109 def add_arguments(self, parser):
110 parser.add_argument(
111 "--noinput",
112 "--no-input",
113 dest="interactive",
114 action="store_false",
115 default=True,
116 help="Do not ask confirmation",
117 )
119 parser.add_argument(
120 "--minimum-file-age",
121 dest="minimum_file_age",
122 default=180,
123 type=int,
124 help="Skip files younger this age (sec)",
125 )
127 parser.add_argument(
128 "-n",
129 "--dry-run",
130 dest="dry_run",
131 action="store_true",
132 default=False,
133 help="Dry run without any affect on your data",
134 )
136 def _show_files_to_delete(self, storage_type, unused_media, verbosity):
137 if verbosity >= 2:
138 self.stdout.writelines(map(lambda x: f"Will remove {x}", unused_media))
140 self.stdout.write(
141 f"Total {storage_type} files to be removed: {len(unused_media)}"
142 )
144 def handle(self, *args, **options):
145 verbosity = options.get("verbosity")
147 private_storage = DefaultStorage()
148 unused_private_media = get_unused_media(
149 private_storage,
150 minimum_file_age=options.get("minimum_file_age"),
151 )
153 public_storage = storages["public"]
154 unused_public_media = get_unused_media(
155 public_storage,
156 minimum_file_age=options.get("minimum_file_age"),
157 )
159 if not unused_public_media and not unused_private_media:
160 self.stdout.write("Nothing to delete. Exit")
161 return
163 if options.get("dry_run"):
164 self._show_files_to_delete("private", unused_private_media, verbosity)
165 self._show_files_to_delete("public", unused_public_media, verbosity)
166 self.stdout.write("Dry run. Exit.")
167 return
169 if options.get("interactive"):
170 self._show_files_to_delete("private", unused_private_media, verbosity)
171 self._show_files_to_delete("public", unused_public_media, verbosity)
173 question = f"Are you sure you want to remove {len(unused_private_media) + len(unused_public_media)} unused files? (y/N)"
175 if input(question).upper() != "Y":
176 self.stdout.write("Interrupted by user. Exit.")
177 return
179 for f in unused_private_media:
180 private_storage.delete(f)
182 for f in unused_public_media:
183 public_storage.delete(f)
185 remove_empty_dirs()
187 self.stdout.write(
188 f"Done. Total files removed: {len(unused_private_media) + len(unused_public_media)}"
189 )