Skip to content

facade

django_spire.contrib.sync.django.storage.facade

DjangoSyncStorage

Source code in django_spire/contrib/sync/django/storage/facade.py
def __init__(
    self,
    models: list[type[SyncableMixin]],
    identity_field: str = 'id',
    batch_size_max: int = _BATCH_SIZE_MAX,
    checkpoint_store: CheckpointStore | None = None,
    record_reader: RecordReader | None = None,
    record_writer: RecordWriter | None = None,
) -> None:
    self._checkpoint_store = checkpoint_store or DjangoCheckpointStore()

    self._record_reader = record_reader or DjangoRecordReader(
        models=models,
        identity_field=identity_field,
    )

    self._record_writer = record_writer or DjangoRecordWriter(
        models=models,
        identity_field=identity_field,
        batch_size_max=batch_size_max,
    )

delete_many

Source code in django_spire/contrib/sync/django/storage/facade.py
def delete_many(
    self,
    model_label: str,
    deletes: dict[str, int],
) -> None:
    self._record_writer.delete_many(model_label, deletes)

get_after_keys

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_after_keys(self, node_id: str) -> dict[str, Any]:
    return self._checkpoint_store.get_after_keys(node_id)

get_changed_since

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_changed_since(
    self,
    model_label: str,
    timestamp: int,
    limit: int | None = None,
    after_key: str | None = None,
) -> dict[str, SyncRecord]:
    return self._record_reader.get_changed_since(
        model_label,
        timestamp,
        limit=limit,
        after_key=after_key,
    )

get_deletes_since

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_deletes_since(
    self,
    model_label: str,
    timestamp: int,
) -> dict[str, int]:
    return self._record_reader.get_deletes_since(
        model_label,
        timestamp,
    )

get_checkpoint

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_checkpoint(self, node_id: str) -> int:
    return self._checkpoint_store.get_checkpoint(node_id)

get_records

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_records(
    self,
    model_label: str,
    keys: set[str],
) -> dict[str, SyncRecord]:
    return self._record_reader.get_records(model_label, keys)

get_syncable_models

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_syncable_models(self) -> list[str]:
    return self._record_reader.get_syncable_models()

save_checkpoint

Source code in django_spire/contrib/sync/django/storage/facade.py
def save_checkpoint(
    self,
    node_id: str,
    timestamp: int,
    after_keys: dict[str, Any] | None = None,
) -> None:
    self._checkpoint_store.save_checkpoint(
        node_id,
        timestamp,
        after_keys=after_keys,
    )

upsert_many

Source code in django_spire/contrib/sync/django/storage/facade.py
def upsert_many(
    self,
    model_label: str,
    records: dict[str, SyncRecord],
) -> set[str]:
    return self._record_writer.upsert_many(model_label, records)