Coverage for website/payments/services.py: 100.00%

104 statements  

« prev     ^ index     » next       coverage.py v7.6.7, created at 2025-08-14 10:31 +0000

1import datetime 

2 

3from django.conf import settings 

4from django.core import mail 

5from django.db import transaction 

6from django.db.models import Model, Q, QuerySet, Sum 

7from django.urls import reverse 

8from django.utils import timezone 

9from django.utils.translation import gettext_lazy as _ 

10 

11from members.models import Member 

12from utils.snippets import send_email 

13 

14from .exceptions import PaymentError 

15from .models import BankAccount, Payment, PaymentUser 

16from .payables import Payable, payables 

17from .signals import processed_batch 

18 

19 

20def create_payment( 

21 model_payable: Model | Payable, 

22 processed_by: Member, 

23 pay_type: str, 

24) -> Payment | None: 

25 """Create a new payment from a payable object. 

26 

27 The payable model is saved with the new payment set on it. 

28 Normally, this function will return the payment. However, it is possible that the 

29 payable model's payment field is set to None while saving, so it might return None. 

30 """ 

31 if pay_type not in (Payment.CASH, Payment.CARD, Payment.WIRE, Payment.TPAY): 

32 raise PaymentError("Invalid payment type") 

33 

34 with transaction.atomic(): 

35 if isinstance(model_payable, Payable): 

36 model_payable = model_payable.model 

37 

38 try: 

39 # Fully refresh and lock the payable object until we've created the payment. 

40 # This ensures we have fresh data, and that the payable can't be paid twice 

41 # at the same time. 

42 model_payable = ( 

43 model_payable._meta.model.objects.filter(pk=model_payable.pk) 

44 .select_for_update(of=("self",)) 

45 .get() 

46 ) 

47 except AttributeError: 

48 # In case we're testing with Mock models. 

49 model_payable = ( 

50 model_payable.model 

51 if isinstance(model_payable, Payable) 

52 else model_payable 

53 ) 

54 

55 payable = payables.get_payable(model_payable) 

56 

57 payer = ( 

58 PaymentUser.objects.get(pk=payable.payment_payer.pk) 

59 if payable.payment_payer 

60 else None 

61 ) 

62 

63 if not ( 

64 (payer and payer == processed_by and pay_type == Payment.TPAY) 

65 or (payable.can_manage_payment(processed_by) and pay_type != Payment.TPAY) 

66 ): 

67 raise PaymentError( 

68 _("User processing payment does not have the right permissions") 

69 ) 

70 

71 if payable.payment_amount == 0: 

72 raise PaymentError(_("Payment amount 0 is not accepted")) 

73 

74 if pay_type == Payment.TPAY and not payer.tpay_enabled: 

75 raise PaymentError(_("This user does not have Thalia Pay enabled")) 

76 

77 if not payable.paying_allowed: 

78 raise PaymentError(_("Payment restricted")) 

79 

80 if payable.payment is not None: 

81 payable.payment.amount = payable.payment_amount 

82 payable.payment.notes = payable.payment_notes 

83 payable.payment.topic = payable.payment_topic 

84 payable.payment.paid_by = payer 

85 payable.payment.processed_by = processed_by 

86 payable.payment.type = pay_type 

87 payable.payment.save() 

88 else: 

89 payable.payment = Payment.objects.create( 

90 processed_by=processed_by, 

91 amount=payable.payment_amount, 

92 notes=payable.payment_notes, 

93 topic=payable.payment_topic, 

94 paid_by=payer, 

95 type=pay_type, 

96 ) 

97 

98 try: 

99 payable.model.save() 

100 except Exception as e: 

101 raise PaymentError(f"Something went wrong saving the payable: {e}") from e 

102 

103 return payable.payment 

104 

105 

106def delete_payment(model: Model, member: Member = None, ignore_change_window=False): 

107 """Remove a payment from a payable object. 

108 

109 :param model: Payable or Model object 

110 :param member: member deleting the payment 

111 :param ignore_change_window: ignore the payment change window 

112 :return: 

113 """ 

114 payable = payables.get_payable(model) 

115 

116 if member and not payable.can_manage_payment(member): 

117 raise PaymentError( 

118 _("User deleting payment does not have the right permissions.") 

119 ) 

120 

121 payment = payable.payment 

122 if ( 

123 payment.created_at 

124 < timezone.now() - timezone.timedelta(seconds=settings.PAYMENT_CHANGE_WINDOW) 

125 and not ignore_change_window 

126 ): 

127 raise PaymentError(_("This payment cannot be deleted anymore.")) 

128 if payment.batch and payment.batch.processed: 

129 raise PaymentError( 

130 _("This payment has already been processed and hence cannot be deleted.") 

131 ) 

132 

133 with transaction.atomic(): 

134 payable.payment = None 

135 payment.delete() 

136 payable.model.save() 

137 

138 

139def update_last_used(queryset: QuerySet, date: datetime.date | None = None) -> int: 

140 """Update the last used field of a BankAccount queryset. 

141 

142 :param queryset: Queryset of BankAccounts 

143 :param date: date to set last_used to 

144 :return: number of affected rows 

145 """ 

146 now = timezone.now() 

147 if not date: 

148 date = now.date() 

149 

150 result = queryset.filter( 

151 Q(valid_from__gte=now, valid_until__lt=now) | Q(valid_until=None) 

152 ).update(last_used=date) 

153 return result 

154 

155 

156def revoke_old_mandates() -> int: 

157 """Revoke all mandates that have not been used for 36 months or more. 

158 

159 :return: number of affected rows 

160 """ 

161 return BankAccount.objects.filter( 

162 last_used__lte=(timezone.now() - timezone.timedelta(days=36 * 30)) 

163 ).update(valid_until=timezone.now().date()) 

164 

165 

166def process_batch(batch): 

167 """Process a Thalia Pay batch. 

168 

169 :param batch: the batch to be processed 

170 :return: 

171 """ 

172 batch.processed = True 

173 

174 payments = batch.payments_set.select_related("paid_by") 

175 for payment in payments: 

176 bank_account = payment.paid_by.bank_accounts.last() 

177 if not bank_account: # pragma: no cover 

178 # This should not happen, cannot happen, does not happen (right... ;p) 

179 # but if it does, we don't want to crash, but just remove the payment from the batch (make it unprocessed) 

180 payment.batch = None 

181 payment.save() 

182 else: 

183 bank_account.last_used = batch.withdrawal_date 

184 bank_account.save(update_fields=["last_used"]) 

185 

186 batch.save() 

187 processed_batch.send(sender=None, instance=batch) 

188 

189 send_tpay_batch_processing_emails(batch) 

190 

191 

192def derive_next_mandate_no(member) -> str: 

193 accounts = ( 

194 BankAccount.objects.filter(owner=PaymentUser.objects.get(pk=member.pk)) 

195 .exclude(mandate_no=None) 

196 .filter(mandate_no__regex=BankAccount.MANDATE_NO_DEFAULT_REGEX) 

197 ) 

198 new_mandate_no = 1 + max( 

199 (int(account.mandate_no.split("-")[1]) for account in accounts), default=0 

200 ) 

201 return f"{member.pk}-{new_mandate_no}" 

202 

203 

204def send_tpay_batch_processing_emails(batch): 

205 """Send withdrawal notice emails to all members in a batch.""" 

206 member_payments = batch.payments_set.values("paid_by").annotate(total=Sum("amount")) 

207 with mail.get_connection() as connection: 

208 for member_row in member_payments: 

209 member = PaymentUser.objects.get(pk=member_row["paid_by"]) 

210 total_amount = member_row["total"] 

211 

212 send_email( 

213 to=[member.email], 

214 subject="Thalia Pay withdrawal notice", 

215 txt_template="payments/email/tpay_withdrawal_notice_mail.txt", 

216 html_template="payments/email/tpay_withdrawal_notice_mail.html", 

217 connection=connection, 

218 context={ 

219 "name": member.get_full_name(), 

220 "batch": batch, 

221 "bank_account": member.bank_accounts.filter( 

222 mandate_no__isnull=False 

223 ).last(), 

224 "creditor_id": settings.SEPA_CREDITOR_ID, 

225 "payments": batch.payments_set.filter(paid_by=member), 

226 "total_amount": total_amount, 

227 "payments_url": ( 

228 settings.BASE_URL 

229 + reverse( 

230 "payments:payment-list", 

231 ) 

232 ), 

233 }, 

234 ) 

235 return len(member_payments) 

236 

237 

238def execute_data_minimisation(dry_run=False): 

239 """Anonymize payments older than 7 years. Also revoke mandates of minimized users.""" 

240 # Sometimes years are 366 days of course, but better delete 1 or 2 days early than late 

241 payment_deletion_period = timezone.now().date() - timezone.timedelta(days=365 * 7) 

242 bankaccount_deletion_period = timezone.now() - datetime.timedelta(days=31 * 13) 

243 

244 queryset_payments = Payment.objects.filter( 

245 created_at__lte=payment_deletion_period 

246 ).exclude(paid_by__isnull=True) 

247 

248 # Delete bank accounts that are not valid anymore, and have not been used in the last 13 months 

249 # (13 months is the required time we need to keep the mandates for) 

250 queryset_bankaccounts = BankAccount.objects.all() 

251 queryset_bankaccounts = queryset_bankaccounts.filter( 

252 valid_until__lt=timezone.now() 

253 ) # We must always keep valid bank accounts. so we only select the ones that are not valid anymore (valid_until < now) 

254 queryset_bankaccounts = queryset_bankaccounts.exclude( # Also keep bank accounts that 

255 Q( 

256 owner__paid_payment_set__type=Payment.TPAY 

257 ), # are used for Thalia Pay payments, AND 

258 Q( 

259 owner__paid_payment_set__batch__isnull=True 

260 ) # have a payment that is in no batch, OR 

261 | Q( 

262 owner__paid_payment_set__batch__processed=False 

263 ) # have an unprocessed batch, OR 

264 | Q( 

265 owner__paid_payment_set__batch__processing_date__gt=bankaccount_deletion_period # or have a processed batch that is not older than 13 months 

266 ), 

267 ) 

268 

269 queryset_mandates = BankAccount.objects.filter( 

270 mandate_no__isnull=False, 

271 valid_until=None, 

272 owner__profile__is_minimized=True, 

273 ) 

274 

275 if not dry_run: 

276 queryset_payments.update(paid_by=None, processed_by=None) 

277 queryset_bankaccounts.delete() 

278 queryset_mandates.update(valid_until=timezone.now()) 

279 

280 return queryset_payments