Coverage for website/utils/management/commands/remove_unused_media.py: 0.00%

98 statements  

« prev     ^ index     » next       coverage.py v7.6.7, created at 2025-08-14 10:31 +0000

1import os 

2 

3from django.apps import apps 

4from django.conf import settings 

5from django.core.files.storage import DefaultStorage, storages 

6from django.core.management.base import BaseCommand 

7from django.core.validators import EMPTY_VALUES 

8from django.db import models 

9from django.utils import timezone 

10 

11 

12def get_file_fields(): 

13 """Get all FileFields of all models.""" 

14 all_models = apps.get_models() 

15 fields = [] 

16 for model in all_models: 

17 for field in model._meta.get_fields(): 

18 if isinstance(field, models.FileField): 

19 fields.append(field) 

20 return fields 

21 

22 

23def remove_empty_dirs(path=None): 

24 """Remove the directory `path` if it is emty, and return whether it was removed.""" 

25 if not path: 

26 path = settings.MEDIA_ROOT 

27 

28 if not os.path.isdir(path): 

29 return False 

30 

31 listdir = [os.path.join(path, filename) for filename in os.listdir(path)] 

32 

33 if all(list(map(remove_empty_dirs, listdir))): 

34 os.rmdir(path) 

35 return True 

36 return False 

37 

38 

39def get_used_media(storage): 

40 """Get the filenames of all files stored in a FileField of any model.""" 

41 media = set() 

42 

43 for field in get_file_fields(): 

44 is_null = { 

45 f"{field.name}__isnull": True, 

46 } 

47 is_empty = { 

48 f"{field.name}": "", 

49 } 

50 

51 if not isinstance(field.storage, type(storage)): 

52 continue 

53 

54 for value in ( 

55 field.model._base_manager.values_list(field.name, flat=True) 

56 .exclude(**is_empty) 

57 .exclude(**is_null) 

58 ): 

59 if value not in EMPTY_VALUES: 

60 media.add(value) 

61 

62 return media 

63 

64 

65def get_all_media(storage, minimum_file_age=None, folder=""): 

66 media = set() 

67 initial_time = timezone.now() 

68 

69 if not storage.exists(folder): 

70 return [] 

71 

72 dirs, files = storage.listdir(folder) 

73 

74 # Remove file locations in case they are nested 

75 # (public is in the private media dir at the time of writing this code) 

76 if settings.PUBLIC_MEDIA_LOCATION in dirs: 

77 dirs.remove(settings.PUBLIC_MEDIA_LOCATION) 

78 if settings.PRIVATE_MEDIA_LOCATION in dirs: 

79 dirs.remove(settings.PRIVATE_MEDIA_LOCATION) 

80 

81 for name in files: 

82 name = os.path.join(folder, name) 

83 if minimum_file_age: 

84 file_age = initial_time - storage.get_modified_time(name) 

85 

86 if file_age.total_seconds() < minimum_file_age: 

87 continue 

88 else: 

89 media.add(name) 

90 

91 for name in dirs: 

92 media.update( 

93 get_all_media(storage, minimum_file_age, os.path.join(folder, name)) 

94 ) 

95 

96 return media 

97 

98 

99def get_unused_media(storage, minimum_file_age=None): 

100 all_media = get_all_media(storage, minimum_file_age) 

101 used_media = get_used_media(storage) 

102 

103 return all_media - used_media 

104 

105 

106class Command(BaseCommand): 

107 help = "Clean unused media files which have no reference in models" 

108 

109 def add_arguments(self, parser): 

110 parser.add_argument( 

111 "--noinput", 

112 "--no-input", 

113 dest="interactive", 

114 action="store_false", 

115 default=True, 

116 help="Do not ask confirmation", 

117 ) 

118 

119 parser.add_argument( 

120 "--minimum-file-age", 

121 dest="minimum_file_age", 

122 default=180, 

123 type=int, 

124 help="Skip files younger this age (sec)", 

125 ) 

126 

127 parser.add_argument( 

128 "-n", 

129 "--dry-run", 

130 dest="dry_run", 

131 action="store_true", 

132 default=False, 

133 help="Dry run without any affect on your data", 

134 ) 

135 

136 def _show_files_to_delete(self, storage_type, unused_media, verbosity): 

137 if verbosity >= 2: 

138 self.stdout.writelines(map(lambda x: f"Will remove {x}", unused_media)) 

139 

140 self.stdout.write( 

141 f"Total {storage_type} files to be removed: {len(unused_media)}" 

142 ) 

143 

144 def handle(self, *args, **options): 

145 verbosity = options.get("verbosity") 

146 

147 private_storage = DefaultStorage() 

148 unused_private_media = get_unused_media( 

149 private_storage, 

150 minimum_file_age=options.get("minimum_file_age"), 

151 ) 

152 

153 public_storage = storages["public"] 

154 unused_public_media = get_unused_media( 

155 public_storage, 

156 minimum_file_age=options.get("minimum_file_age"), 

157 ) 

158 

159 if not unused_public_media and not unused_private_media: 

160 self.stdout.write("Nothing to delete. Exit") 

161 return 

162 

163 if options.get("dry_run"): 

164 self._show_files_to_delete("private", unused_private_media, verbosity) 

165 self._show_files_to_delete("public", unused_public_media, verbosity) 

166 self.stdout.write("Dry run. Exit.") 

167 return 

168 

169 if options.get("interactive"): 

170 self._show_files_to_delete("private", unused_private_media, verbosity) 

171 self._show_files_to_delete("public", unused_public_media, verbosity) 

172 

173 question = f"Are you sure you want to remove {len(unused_private_media) + len(unused_public_media)} unused files? (y/N)" 

174 

175 if input(question).upper() != "Y": 

176 self.stdout.write("Interrupted by user. Exit.") 

177 return 

178 

179 for f in unused_private_media: 

180 private_storage.delete(f) 

181 

182 for f in unused_public_media: 

183 public_storage.delete(f) 

184 

185 remove_empty_dirs() 

186 

187 self.stdout.write( 

188 f"Done. Total files removed: {len(unused_private_media) + len(unused_public_media)}" 

189 )