Skip to content

strategy

django_spire.contrib.sync.django.storage.strategy

__all__ = ['BulkUpsertStrategy', 'DeleteStrategy', 'HardDeleteStrategy', 'SoftDeleteStrategy', 'StalenessGuardedUpsertStrategy', 'UpsertStrategy'] module-attribute

DeleteStrategy

Bases: Protocol

delete

Source code in django_spire/contrib/sync/django/storage/strategy/delete.py
def delete(
    self,
    model: type[SyncableMixin],
    deletes: dict[str, int],
    origin_node: str,
) -> None: ...

HardDeleteStrategy

Source code in django_spire/contrib/sync/django/storage/strategy/delete.py
def __init__(self, identity_field: str = 'id') -> None:
    self._identity_field = identity_field

delete

Source code in django_spire/contrib/sync/django/storage/strategy/delete.py
def delete(
    self,
    model: type[SyncableMixin],
    deletes: dict[str, int],
    origin_node: str,
) -> None:
    if not deletes:
        return

    from django_spire.contrib.sync.django.sequence import (  # noqa: PLC0415
        SyncSequenceAllocator,
    )

    keys = sorted(deletes.keys())
    sequence_first = SyncSequenceAllocator().allocate(len(keys)).value_first
    model_label = model._meta.label

    with sync_bypass():
        for index, key in enumerate(keys):
            tombstone_timestamp = deletes[key]

            staleness_filter = {
                self._identity_field: key,
                'sync_field_last_modified__lte': tombstone_timestamp,
            }

            model.objects.filter(**staleness_filter).delete()

            _record_tombstone(
                model_label,
                key,
                tombstone_timestamp,
                sequence_first + index,
                origin_node,
            )

SoftDeleteStrategy

Source code in django_spire/contrib/sync/django/storage/strategy/delete.py
def __init__(self, identity_field: str = 'id') -> None:
    self._identity_field = identity_field

delete

Source code in django_spire/contrib/sync/django/storage/strategy/delete.py
def delete(
    self,
    model: type[SyncableMixin],
    deletes: dict[str, int],
    origin_node: str,
) -> None:
    if not deletes:
        return

    from django_spire.contrib.sync.django.sequence import (  # noqa: PLC0415
        SyncSequenceAllocator,
    )

    identity_lookup = {f'{self._identity_field}__in': list(deletes.keys())}
    instances = list(model.objects.filter(**identity_lookup))

    pending = self._collect_pending(instances, deletes)

    if not pending:
        return

    sequence_first = SyncSequenceAllocator().allocate(len(pending)).value_first
    model_label = model._meta.label

    with sync_bypass():
        for index, instance in enumerate(pending):
            key = str(getattr(instance, self._identity_field))
            tombstone_timestamp = deletes[key]
            local_sequence = sequence_first + index

            staleness_filter = {
                self._identity_field: key,
                'sync_field_last_modified__lte': tombstone_timestamp,
            }

            model.objects.filter(**staleness_filter).update(
                is_deleted=True,
                sync_field_last_modified=instance.sync_field_last_modified,
                sync_field_origin_node=origin_node,
                sync_field_sequence=local_sequence,
                sync_field_timestamps=instance.sync_field_timestamps,
            )

            _record_tombstone(
                model_label,
                key,
                tombstone_timestamp,
                local_sequence,
                origin_node,
            )

BulkUpsertStrategy

Source code in django_spire/contrib/sync/django/storage/strategy/upsert.py
def __init__(self, identity_field: str = 'id') -> None:
    self._identity_field = identity_field

apply_many

Source code in django_spire/contrib/sync/django/storage/strategy/upsert.py
def apply_many(
    self,
    model: type[SyncableMixin],
    records: dict[str, SyncRecord],
    deserialized: dict[str, dict[str, Any]],
    origin_node: str,
) -> UpsertResult:
    if not records:
        return UpsertResult()

    connection = self._connection(model)

    if connection.vendor not in _SUPPORTED_VENDORS:
        message = (
            f'BulkUpsertStrategy requires postgresql or sqlite, '
            f'got {connection.vendor!r}'
        )

        raise NotImplementedError(message)

    model_label = model._meta.label
    writable, errors = self._validate_records(model_label, records)

    if not writable:
        return UpsertResult(skipped=set(), errors=errors)

    from django_spire.contrib.sync.django.sequence import (  # noqa: PLC0415
        SyncSequenceAllocator,
    )

    fields = self._writable_fields(model)
    sorted_keys = sorted(writable.keys())

    sequence_first = SyncSequenceAllocator().allocate(len(sorted_keys)).value_first

    instances = [
        self._build_instance(RecordContext(
            model=model,
            key=key,
            sync_record=writable[key],
            field_data=deserialized[key],
            sequence=sequence_first + index,
            origin_node=origin_node,
        ))
        for index, key in enumerate(sorted_keys)
    ]

    identity = model._meta.get_field(self._identity_field)
    rows_per_batch = self._rows_per_batch(len(fields))
    applied: set[str] = set()

    with sync_bypass(), connection.cursor() as cursor:
        for offset in range(0, len(instances), rows_per_batch):
            batch = instances[offset:offset + rows_per_batch]

            params: list[Any] = []

            for instance in batch:
                params.extend(
                    self._build_row_params(
                        instance,
                        fields,
                        connection,
                    ),
                )

            sql = self._build_upsert_sql(
                model,
                fields,
                connection,
                len(batch),
            )

            cursor.execute(sql, params)

            applied.update(
                str(identity.to_python(row[0]))
                for row in cursor.fetchall()
            )

    skipped: set[str] = set()

    for key in writable:
        if key not in applied:
            skipped.add(key)

    return UpsertResult(skipped=skipped, errors=errors)

StalenessGuardedUpsertStrategy

Source code in django_spire/contrib/sync/django/storage/strategy/upsert.py
def __init__(self, identity_field: str = 'id') -> None:
    self._identity_field = identity_field

apply_many

Source code in django_spire/contrib/sync/django/storage/strategy/upsert.py
def apply_many(
    self,
    model: type[SyncableMixin],
    records: dict[str, SyncRecord],
    deserialized: dict[str, dict[str, Any]],
    origin_node: str,
) -> UpsertResult:
    from django_spire.contrib.sync.django.sequence import (  # noqa: PLC0415
        SyncSequenceAllocator,
    )

    skipped: set[str] = set()
    errors: list[Error] = []
    sorted_keys = sorted(records.keys())

    if not sorted_keys:
        return UpsertResult()

    sequence_first = SyncSequenceAllocator().allocate(len(sorted_keys)).value_first

    for index, key in enumerate(sorted_keys):
        sync_record = records[key]
        field_data = deserialized[key]

        if sync_record.sync_field_last_modified == 0:
            message = (
                f'Ghost record for {model._meta.label} key={key}: '
                f'sync_field_last_modified=0 indicates record was '
                f'never properly stamped '
                f'(timestamps={sync_record.timestamps!r}, '
                f'received_at={sync_record.received_at}). '
                f'Run stamp_unstamped_records on the source node '
                f'or check that AppConfig.ready() runs.'
            )

            logger.error(message)
            errors.append(Error(key=key, message=message))
            continue

        ctx = RecordContext(
            model=model,
            key=key,
            sync_record=sync_record,
            field_data=field_data,
            sequence=sequence_first + index,
            origin_node=origin_node,
        )

        if not self._apply_one(ctx):
            skipped.add(key)

    return UpsertResult(skipped=skipped, errors=errors)

UpsertStrategy

Bases: Protocol

apply_many

Source code in django_spire/contrib/sync/django/storage/strategy/upsert.py
def apply_many(
    self,
    model: type[SyncableMixin],
    records: dict[str, SyncRecord],
    deserialized: dict[str, dict[str, Any]],
    origin_node: str,
) -> UpsertResult: ...