Skip to content

processor

django_spire.notification.sms.processor

SMSNotificationProcessor

Bases: BaseNotificationProcessor

process

Source code in django_spire/notification/sms/processor.py
def process(self, notification: Notification):
    notification.status = NotificationStatusChoices.PROCESSING
    notification.save()

    try:
        if notification.type != NotificationTypeChoices.SMS:
            raise NotificationException(
                f'SMSNotificationProcessor only processes '
                f'SMS notifications. Was provided {notification.type}'
            )

        twilio_sms_client = Client(settings.TWILIO_ACCOUNT_SID, settings.TWILIO_AUTH_TOKEN)
        TwilioSMSHelper(notification, twilio_sms_client).send()

        notification.status = NotificationStatusChoices.SENT
        notification.sent_datetime = now()
    except Exception as e:
        # Requeue notification as api concurrency is full
        if isinstance(e, TwilioAPIConcurrentException):
            notification.status = NotificationStatusChoices.PENDING
            notification.save()
            return

        notification.status_message = str(e)

        if isinstance(e, TwilioException):
            notification.status = NotificationStatusChoices.ERRORED
        else:
            notification.status = NotificationStatusChoices.FAILED
            raise e
    finally:
        notification.save()

process_list

Source code in django_spire/notification/sms/processor.py
def process_list(self, notifications: list):
    twilio_sms_client = Client(settings.TWILIO_ACCOUNT_SID, settings.TWILIO_AUTH_TOKEN)
    for notification in notifications:
        if notification.type != NotificationTypeChoices.SMS:
            raise NotificationException(
                f'SMSNotificationProcessor only processes '
                f'SMS notifications. Was provided {notification.type}'
            )

    try:
        BulkTwilioSMSHelper(notifications, twilio_sms_client).send_notifications()
    except Exception as e:
        raise e
    finally:
        Notification.objects.bulk_update(
            notifications,
            ['status', 'sent_datetime', 'status_message']
        )

process_ready

Source code in django_spire/notification/sms/processor.py
def process_ready(self):
    self.process_list(Notification.objects.sms_notifications().ready_to_send().active())

process_errored

Source code in django_spire/notification/sms/processor.py
def process_errored(self):
    self.process_list(Notification.objects.sms_notifications().errored().active())