Skip to content

processor

django_spire.notification.app.processor

AppNotificationProcessor

Bases: BaseNotificationProcessor

process

Source code in django_spire/notification/app/processor.py
def process(self, notification: Notification):
    try:
        if notification.type != NotificationTypeChoices.APP:
            raise NotificationException(
                f'AppNotificationProcessor only processes '
                f'App notifications. Was provided {notification.type}'
            )

        if notification.user_id is None:
            raise AppNotificationException(
                'AppNotifications must have a user associated with them'
            )

    except Exception as e:
        notification.status = NotificationStatusChoices.ERRORED
        notification.status_message = str(e)
        notification.save()
        raise e

    notification.status = NotificationStatusChoices.SENT
    notification.sent_datetime = now()

    notification.save()

process_list

Source code in django_spire/notification/app/processor.py
def process_list(self, notifications: list):
    self._update_notifications_to_processing(notifications)

    for notification in notifications:
        try:
            if notification.type != NotificationTypeChoices.APP:
                raise NotificationException(
                    f'AppNotificationProcessor only processes '
                    f'App notifications. Was provided {notification.type}'
                )

            if notification.user_id is None:
                raise AppNotificationException(
                    'AppNotifications must have a user associated with them'
                )

            notification.status = NotificationStatusChoices.SENT
            notification.sent_datetime = now()

        except Exception as e:
            notification.status = NotificationStatusChoices.ERRORED
            notification.status_message = str(e)

            Notification.objects.bulk_update(
                notifications,
                ['status', 'sent_datetime', 'status_message']
            )
            raise e

    Notification.objects.bulk_update(
        notifications,
        ['status', 'sent_datetime', 'status_message']
    )

process_ready

Source code in django_spire/notification/app/processor.py
def process_ready(self):
    self.process_list(Notification.objects.app_notifications().ready_to_send().active())

process_errored

Source code in django_spire/notification/app/processor.py
def process_errored(self):
    self.process_list(Notification.objects.app_notifications().errored().active())