Bases: BaseNotificationProcessor
process
Source code in django_spire/notification/app/processor.py
| def process(self, notification: Notification):
try:
if notification.type != NotificationTypeChoices.APP:
raise NotificationException(
f'AppNotificationProcessor only processes '
f'App notifications. Was provided {notification.type}'
)
if notification.user_id is None:
raise AppNotificationException(
'AppNotifications must have a user associated with them'
)
except Exception as e:
notification.status = NotificationStatusChoices.ERRORED
notification.status_message = str(e)
notification.save()
raise e
notification.status = NotificationStatusChoices.SENT
notification.sent_datetime = now()
notification.save()
|
process_list
Source code in django_spire/notification/app/processor.py
| def process_list(self, notifications: list):
self._update_notifications_to_processing(notifications)
for notification in notifications:
try:
if notification.type != NotificationTypeChoices.APP:
raise NotificationException(
f'AppNotificationProcessor only processes '
f'App notifications. Was provided {notification.type}'
)
if notification.user_id is None:
raise AppNotificationException(
'AppNotifications must have a user associated with them'
)
notification.status = NotificationStatusChoices.SENT
notification.sent_datetime = now()
except Exception as e:
notification.status = NotificationStatusChoices.ERRORED
notification.status_message = str(e)
Notification.objects.bulk_update(
notifications,
['status', 'sent_datetime', 'status_message']
)
raise e
Notification.objects.bulk_update(
notifications,
['status', 'sent_datetime', 'status_message']
)
|
process_ready
Source code in django_spire/notification/app/processor.py
| def process_ready(self):
self.process_list(Notification.objects.app_notifications().ready_to_send().active())
|
process_errored
Source code in django_spire/notification/app/processor.py
| def process_errored(self):
self.process_list(Notification.objects.app_notifications().errored().active())
|