Skip to content

storage

django_spire.contrib.sync.django.storage

__all__ = ['DeleteStrategy', 'DjangoCheckpointStore', 'DjangoRecordReader', 'DjangoRecordWriter', 'DjangoSyncStorage', 'HardDeleteStrategy', 'ManyToManyApplier', 'SoftDeleteStrategy', 'StalenessGuardedUpsertStrategy', 'UpsertStrategy'] module-attribute

DjangoCheckpointStore

get_after_keys

Source code in django_spire/contrib/sync/django/storage/checkpoint.py
def get_after_keys(self, node_id: str) -> dict[str, Any]:
    checkpoint = (
        SyncCheckpoint.objects
        .filter(node_id=node_id)
        .first()
    )

    if checkpoint is None:
        return {}

    return checkpoint.after_keys or {}

get_checkpoint

Source code in django_spire/contrib/sync/django/storage/checkpoint.py
def get_checkpoint(self, node_id: str) -> int:
    checkpoint = (
        SyncCheckpoint.objects
        .filter(node_id=node_id)
        .first()
    )

    if checkpoint is None:
        return 0

    return checkpoint.timestamp

save_checkpoint

Source code in django_spire/contrib/sync/django/storage/checkpoint.py
def save_checkpoint(
    self,
    node_id: str,
    timestamp: int,
    after_keys: dict[str, Any] | None = None,
) -> None:
    SyncCheckpoint.objects.update_or_create(
        node_id=node_id,
        defaults={
            'after_keys': after_keys or {},
            'timestamp': timestamp,
        },
    )

DjangoSyncStorage

Source code in django_spire/contrib/sync/django/storage/facade.py
def __init__(
    self,
    models: list[type[SyncableMixin]],
    identity_field: str = 'id',
    batch_size_max: int = _BATCH_SIZE_MAX,
    checkpoint_store: CheckpointStore | None = None,
    record_reader: RecordReader | None = None,
    record_writer: RecordWriter | None = None,
) -> None:
    self._checkpoint_store = checkpoint_store or DjangoCheckpointStore()

    self._record_reader = record_reader or DjangoRecordReader(
        models=models,
        identity_field=identity_field,
    )

    self._record_writer = record_writer or DjangoRecordWriter(
        models=models,
        identity_field=identity_field,
        batch_size_max=batch_size_max,
    )

delete_many

Source code in django_spire/contrib/sync/django/storage/facade.py
def delete_many(
    self,
    model_label: str,
    deletes: dict[str, int],
) -> None:
    self._record_writer.delete_many(model_label, deletes)

get_after_keys

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_after_keys(self, node_id: str) -> dict[str, Any]:
    return self._checkpoint_store.get_after_keys(node_id)

get_changed_since

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_changed_since(
    self,
    model_label: str,
    timestamp: int,
    limit: int | None = None,
    after_key: str | None = None,
) -> dict[str, SyncRecord]:
    return self._record_reader.get_changed_since(
        model_label,
        timestamp,
        limit=limit,
        after_key=after_key,
    )

get_deletes_since

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_deletes_since(
    self,
    model_label: str,
    timestamp: int,
) -> dict[str, int]:
    return self._record_reader.get_deletes_since(
        model_label,
        timestamp,
    )

get_checkpoint

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_checkpoint(self, node_id: str) -> int:
    return self._checkpoint_store.get_checkpoint(node_id)

get_records

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_records(
    self,
    model_label: str,
    keys: set[str],
) -> dict[str, SyncRecord]:
    return self._record_reader.get_records(model_label, keys)

get_syncable_models

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_syncable_models(self) -> list[str]:
    return self._record_reader.get_syncable_models()

save_checkpoint

Source code in django_spire/contrib/sync/django/storage/facade.py
def save_checkpoint(
    self,
    node_id: str,
    timestamp: int,
    after_keys: dict[str, Any] | None = None,
) -> None:
    self._checkpoint_store.save_checkpoint(
        node_id,
        timestamp,
        after_keys=after_keys,
    )

upsert_many

Source code in django_spire/contrib/sync/django/storage/facade.py
def upsert_many(
    self,
    model_label: str,
    records: dict[str, SyncRecord],
) -> set[str]:
    return self._record_writer.upsert_many(model_label, records)

ManyToManyApplier

Source code in django_spire/contrib/sync/django/storage/many_to_many.py
def __init__(self, identity_field: str = 'id') -> None:
    self._identity_field = identity_field

apply

Source code in django_spire/contrib/sync/django/storage/many_to_many.py
def apply(
    self,
    model: type[SyncableMixin],
    pending: dict[str, dict[str, list[Any]]],
) -> set[str]:
    skipped: set[str] = set()

    if not pending:
        return skipped

    identity_lookup = {f'{self._identity_field}__in': list(pending.keys())}

    instances_by_key = {
        str(getattr(instance, self._identity_field)): instance
        for instance in model.objects.filter(**identity_lookup)
    }

    with sync_bypass():
        for key in sorted(pending.keys()):
            many_to_many_data = pending[key]
            instance = instances_by_key.get(key)

            if instance is None:
                logger.warning(
                    'Skipping M2M relations for %s key=%s: '
                    'instance not found after upsert',
                    model._meta.label,
                    key,
                )

                skipped.add(key)

                continue

            for field_name, values in sorted(many_to_many_data.items()):
                try:
                    getattr(instance, field_name).set(values)
                except IntegrityError:
                    logger.exception(
                        'M2M set failed for %s:%s field=%s values=%s',
                        model._meta.label,
                        key,
                        field_name,
                        values,
                    )

                    raise

    return skipped

DjangoRecordReader

Source code in django_spire/contrib/sync/django/storage/reader.py
def __init__(
    self,
    models: list[type[SyncableMixin]],
    identity_field: str = 'id',
) -> None:
    self._identity_field = identity_field

    self._models: dict[str, type[SyncableMixin]] = {
        model._meta.label: model
        for model in models
    }

    self._serializers: dict[str, SyncFieldSerializer] = {
        model._meta.label: SyncFieldSerializer(model)
        for model in models
    }

get_changed_since

Source code in django_spire/contrib/sync/django/storage/reader.py
def get_changed_since(
    self,
    model_label: str,
    timestamp: int,
    limit: int | None = None,
    after_key: str | None = None,
) -> dict[str, SyncRecord]:
    model = self._get_model(model_label)
    many_to_many_names = self._get_many_to_many_names(model)

    if after_key:
        identity_gt = {
            f'{self._identity_field}__gt': after_key,
        }

        queryset = model.objects.filter(
            Q(sync_field_last_modified__gt=timestamp) |
            Q(
                sync_field_last_modified=timestamp,
                **identity_gt,
            ),
        )
    else:
        queryset = model.objects.filter(
            sync_field_last_modified__gt=timestamp,
        )

    queryset = queryset.order_by(
        'sync_field_last_modified',
        self._identity_field,
    )

    if limit is not None:
        queryset = queryset[:limit]

    if many_to_many_names:
        queryset = queryset.prefetch_related(
            *many_to_many_names
        )

    return {
        str(getattr(instance, self._identity_field)): self._instance_to_record(instance)
        for instance in queryset
    }

get_deletes_since

Source code in django_spire/contrib/sync/django/storage/reader.py
def get_deletes_since(
    self,
    model_label: str,
    timestamp: int,
) -> dict[str, int]:
    from django_spire.contrib.sync.django.models.tombstone import SyncTombstone  # noqa: PLC0415

    rows = (
        SyncTombstone.objects
        .filter(model_label=model_label, timestamp__gt=timestamp)
        .values_list('record_key', 'timestamp')
    )

    return dict(rows)

get_records

Source code in django_spire/contrib/sync/django/storage/reader.py
def get_records(
    self,
    model_label: str,
    keys: set[str],
) -> dict[str, SyncRecord]:
    if not keys:
        return {}

    model = self._get_model(model_label)
    many_to_many_names = self._get_many_to_many_names(model)

    identity_lookup = {f'{self._identity_field}__in': keys}
    queryset = model.objects.filter(**identity_lookup)

    if many_to_many_names:
        queryset = queryset.prefetch_related(*many_to_many_names)

    return {
        str(getattr(instance, self._identity_field)): self._instance_to_record(instance)
        for instance in queryset
    }

get_syncable_models

Source code in django_spire/contrib/sync/django/storage/reader.py
def get_syncable_models(self) -> list[str]:
    return sorted(self._models.keys())

DeleteStrategy

Bases: Protocol

delete

Source code in django_spire/contrib/sync/django/storage/strategy/delete.py
def delete(self, model: type[SyncableMixin], deletes: dict[str, int]) -> None: ...

HardDeleteStrategy

Source code in django_spire/contrib/sync/django/storage/strategy/delete.py
def __init__(self, identity_field: str = 'id') -> None:
    self._identity_field = identity_field

delete

Source code in django_spire/contrib/sync/django/storage/strategy/delete.py
def delete(
    self,
    model: type[SyncableMixin],
    deletes: dict[str, int],
) -> None:
    for key, tombstone_ts in deletes.items():
        staleness_filter = {
            self._identity_field: key,
            'sync_field_last_modified__lte': tombstone_ts,
        }

        model.objects.filter(**staleness_filter).delete()

SoftDeleteStrategy

Source code in django_spire/contrib/sync/django/storage/strategy/delete.py
def __init__(self, identity_field: str = 'id') -> None:
    self._identity_field = identity_field

delete

Source code in django_spire/contrib/sync/django/storage/strategy/delete.py
def delete(
    self,
    model: type[SyncableMixin],
    deletes: dict[str, int],
) -> None:
    identity_lookup = {f'{self._identity_field}__in': list(deletes.keys())}
    instances = list(model.objects.filter(**identity_lookup))

    pending = self._collect_pending(instances, deletes)

    if not pending:
        return

    with sync_bypass():
        for instance in pending:
            key = str(getattr(instance, self._identity_field))
            tombstone_ts = deletes[key]

            staleness_filter = {
                self._identity_field: key,
                'sync_field_last_modified__lte': tombstone_ts,
            }

            model.objects.filter(**staleness_filter).update(
                is_deleted=True,
                sync_field_timestamps=instance.sync_field_timestamps,
                sync_field_last_modified=instance.sync_field_last_modified,
            )

StalenessGuardedUpsertStrategy

Source code in django_spire/contrib/sync/django/storage/strategy/upsert.py
def __init__(self, identity_field: str = 'id') -> None:
    self._identity_field = identity_field

apply_many

Source code in django_spire/contrib/sync/django/storage/strategy/upsert.py
def apply_many(
    self,
    model: type[SyncableMixin],
    records: dict[str, SyncRecord],
    deserialized: dict[str, dict[str, Any]],
) -> set[str]:
    skipped: set[str] = set()

    for key in sorted(records.keys()):
        sync_record = records[key]
        field_data = deserialized[key]

        applied = self._apply_one(
            model,
            key,
            sync_record,
            field_data,
        )

        if not applied:
            skipped.add(key)

    return skipped

UpsertStrategy

Bases: Protocol

apply_many

Source code in django_spire/contrib/sync/django/storage/strategy/upsert.py
def apply_many(
    self,
    model: type[SyncableMixin],
    records: dict[str, SyncRecord],
    deserialized: dict[str, dict[str, Any]],
) -> set[str]: ...

DjangoRecordWriter

Source code in django_spire/contrib/sync/django/storage/writer.py
def __init__(
    self,
    models: list[type[SyncableMixin]],
    identity_field: str = 'id',
    batch_size_max: int = _BATCH_SIZE_MAX,
    delete_strategies: dict[str, DeleteStrategy] | None = None,
    many_to_many_applier: ManyToManyApplier | None = None,
    upsert_strategy: UpsertStrategy | None = None,
) -> None:
    if not models:
        message = 'models must not be empty'
        raise InvalidParameterError(message)

    if not identity_field:
        message = 'identity_field must be a non-empty string'
        raise InvalidParameterError(message)

    if batch_size_max < 1:
        message = (
            f'batch_size_max must be >= 1, '
            f'got {batch_size_max}'
        )

        raise InvalidParameterError(message)

    self._batch_size_max = batch_size_max
    self._identity_field = identity_field

    self._models: dict[str, type[SyncableMixin]] = {
        model._meta.label: model
        for model in models
    }

    self._serializers: dict[str, SyncFieldSerializer] = {
        model._meta.label: SyncFieldSerializer(model)
        for model in models
    }

    self._upsert_strategy = (
        upsert_strategy
        or BulkUpsertStrategy(
            identity_field=identity_field,
        )
    )

    self._many_to_many_applier = (
        many_to_many_applier
        or ManyToManyApplier(
            identity_field=identity_field,
        )
    )

    self._delete_strategies = (
        delete_strategies
        or self._build_delete_strategies(models)
    )

delete_many

Source code in django_spire/contrib/sync/django/storage/writer.py
def delete_many(
    self,
    model_label: str,
    deletes: dict[str, int],
) -> None:
    if not deletes:
        return

    model = self._get_model(model_label)
    strategy = self._delete_strategies[model_label]
    keys = list(deletes.keys())

    for start in range(0, len(keys), self._batch_size_max):
        chunk_keys = keys[start:start + self._batch_size_max]
        chunk = {key: deletes[key] for key in chunk_keys}
        strategy.delete(model, chunk)

upsert_many

Source code in django_spire/contrib/sync/django/storage/writer.py
def upsert_many(
    self,
    model_label: str,
    records: dict[str, SyncRecord],
) -> set[str]:
    if not records:
        return set()

    model = self._get_model(model_label)
    many_to_many_names = self._get_many_to_many_names(model)
    serializer = self._serializers[model_label]

    skipped: set[str] = set()
    keys = sorted(records.keys())

    for start in range(0, len(keys), self._batch_size_max):
        chunk_keys = keys[start:start + self._batch_size_max]

        chunk_skipped = self._upsert_chunk(
            model,
            records,
            chunk_keys,
            many_to_many_names,
            serializer,
        )

        skipped |= chunk_skipped

    return skipped