Skip to content

strategy

django_spire.contrib.sync.django.storage.strategy

__all__ = ['BulkUpsertStrategy', 'DeleteStrategy', 'HardDeleteStrategy', 'SoftDeleteStrategy', 'StalenessGuardedUpsertStrategy', 'UpsertStrategy'] module-attribute

DeleteStrategy

Bases: Protocol

delete

Source code in django_spire/contrib/sync/django/storage/strategy/delete.py
def delete(self, model: type[SyncableMixin], deletes: dict[str, int]) -> None: ...

HardDeleteStrategy

Source code in django_spire/contrib/sync/django/storage/strategy/delete.py
def __init__(self, identity_field: str = 'id') -> None:
    self._identity_field = identity_field

delete

Source code in django_spire/contrib/sync/django/storage/strategy/delete.py
def delete(
    self,
    model: type[SyncableMixin],
    deletes: dict[str, int],
) -> None:
    for key, tombstone_ts in deletes.items():
        staleness_filter = {
            self._identity_field: key,
            'sync_field_last_modified__lte': tombstone_ts,
        }

        model.objects.filter(**staleness_filter).delete()

SoftDeleteStrategy

Source code in django_spire/contrib/sync/django/storage/strategy/delete.py
def __init__(self, identity_field: str = 'id') -> None:
    self._identity_field = identity_field

delete

Source code in django_spire/contrib/sync/django/storage/strategy/delete.py
def delete(
    self,
    model: type[SyncableMixin],
    deletes: dict[str, int],
) -> None:
    identity_lookup = {f'{self._identity_field}__in': list(deletes.keys())}
    instances = list(model.objects.filter(**identity_lookup))

    pending = self._collect_pending(instances, deletes)

    if not pending:
        return

    with sync_bypass():
        for instance in pending:
            key = str(getattr(instance, self._identity_field))
            tombstone_ts = deletes[key]

            staleness_filter = {
                self._identity_field: key,
                'sync_field_last_modified__lte': tombstone_ts,
            }

            model.objects.filter(**staleness_filter).update(
                is_deleted=True,
                sync_field_timestamps=instance.sync_field_timestamps,
                sync_field_last_modified=instance.sync_field_last_modified,
            )

BulkUpsertStrategy

Source code in django_spire/contrib/sync/django/storage/strategy/upsert.py
def __init__(self, identity_field: str = 'id') -> None:
    self._identity_field = identity_field

apply_many

Source code in django_spire/contrib/sync/django/storage/strategy/upsert.py
def apply_many(
    self,
    model: type[SyncableMixin],
    records: dict[str, SyncRecord],
    deserialized: dict[str, dict[str, Any]],
) -> set[str]:
    if not records:
        return set()

    connection = self._connection(model)

    if connection.vendor not in _SUPPORTED_VENDORS:
        message = (
            f'BulkUpsertStrategy requires postgresql or sqlite, '
            f'got {connection.vendor!r}'
        )

        raise NotImplementedError(message)

    writable, skipped = self._filter_ghosts(records)

    if not writable:
        return skipped

    fields = self._writable_fields(model)
    sorted_keys = sorted(writable.keys())

    instances = [
        self._build_instance(
            model,
            key,
            writable[key],
            deserialized[key],
        )
        for key in sorted_keys
    ]

    identity = model._meta.get_field(self._identity_field)
    rows_per_batch = self._rows_per_batch(len(fields))
    applied: set[str] = set()

    with sync_bypass(), connection.cursor() as cursor:
        for offset in range(0, len(instances), rows_per_batch):
            batch = instances[offset:offset + rows_per_batch]

            params: list[Any] = []

            for instance in batch:
                params.extend(
                    self._build_row_params(
                        instance,
                        fields,
                        connection,
                    ),
                )

            sql = self._build_sql(
                model,
                fields,
                connection,
                len(batch),
            )

            cursor.execute(sql, params)

            applied.update(
                str(identity.to_python(row[0]))
                for row in cursor.fetchall()
            )

    for key in writable:
        if key not in applied:
            skipped.add(key)

    return skipped

StalenessGuardedUpsertStrategy

Source code in django_spire/contrib/sync/django/storage/strategy/upsert.py
def __init__(self, identity_field: str = 'id') -> None:
    self._identity_field = identity_field

apply_many

Source code in django_spire/contrib/sync/django/storage/strategy/upsert.py
def apply_many(
    self,
    model: type[SyncableMixin],
    records: dict[str, SyncRecord],
    deserialized: dict[str, dict[str, Any]],
) -> set[str]:
    skipped: set[str] = set()

    for key in sorted(records.keys()):
        sync_record = records[key]
        field_data = deserialized[key]

        applied = self._apply_one(
            model,
            key,
            sync_record,
            field_data,
        )

        if not applied:
            skipped.add(key)

    return skipped

UpsertStrategy

Bases: Protocol

apply_many

Source code in django_spire/contrib/sync/django/storage/strategy/upsert.py
def apply_many(
    self,
    model: type[SyncableMixin],
    records: dict[str, SyncRecord],
    deserialized: dict[str, dict[str, Any]],
) -> set[str]: ...