Coverage for website/thaliawebsite/management/commands/migratemedia.py: 0.00%

41 statements  

« prev     ^ index     » next       coverage.py v7.6.7, created at 2025-08-14 10:31 +0000

1import logging 

2import os 

3 

4from django.conf import settings 

5from django.core.management.base import BaseCommand 

6 

7import boto3 

8from botocore.exceptions import ClientError 

9 

10 

11class Command(BaseCommand): 

12 def handle(self, *args, **options): 

13 if not settings.AWS_STORAGE_BUCKET_NAME: 

14 logging.error("No AWS settings found") 

15 return 

16 

17 # create session to s3 

18 s3_client = boto3.client( 

19 service_name="s3", 

20 aws_access_key_id=settings.AWS_ACCESS_KEY_ID, 

21 aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY, 

22 ) 

23 

24 # process migrate local media files to s3 

25 for full_path in self._get_all_media_file(): 

26 upload_path = self._split_path_to_upload(full_path) 

27 try: 

28 try: 

29 remote_file = s3_client.head_object( 

30 Bucket=settings.AWS_STORAGE_BUCKET_NAME, Key=upload_path 

31 ) 

32 except ClientError as e: 

33 if e.response["Error"]["Code"] == "404": 

34 remote_file = None 

35 else: 

36 raise e 

37 

38 if remote_file: 

39 # file already exists 

40 # note that this will not check if the file contents are the same 

41 print(f"file already exists {upload_path}") 

42 logging.info(f"file already exists {upload_path}") 

43 continue 

44 

45 s3_client.upload_file( 

46 full_path, settings.AWS_STORAGE_BUCKET_NAME, upload_path 

47 ) 

48 print(f"success upload {upload_path}") 

49 logging.info(f"success upload {upload_path}") 

50 except ClientError as e: 

51 print(f"failed upload {upload_path}") 

52 logging.error(f"{e}: {upload_path}") 

53 

54 def _get_all_media_file(self) -> [str]: 

55 files = [] 

56 for r, d, f in os.walk(settings.MEDIA_ROOT): 

57 for file in f: 

58 files.append(os.path.join(r, file)) 

59 return files 

60 

61 def _split_path_to_upload(self, full_path: str) -> str: 

62 media_root = settings.MEDIA_ROOT 

63 upload_path = full_path.split(media_root)[-1][1:] 

64 return upload_path