Skip to content

reader

django_spire.contrib.sync.django.storage.reader

DjangoRecordReader

Source code in django_spire/contrib/sync/django/storage/reader.py
def __init__(
    self,
    models: list[type[SyncableMixin]],
    identity_field: str = 'id',
) -> None:
    self._identity_field = identity_field

    self._models: dict[str, type[SyncableMixin]] = {
        model._meta.label: model
        for model in models
    }

    self._serializers: dict[str, SyncFieldSerializer] = {
        model._meta.label: SyncFieldSerializer(model)
        for model in models
    }

get_changed_since

Source code in django_spire/contrib/sync/django/storage/reader.py
def get_changed_since(
    self,
    model_label: str,
    timestamp: int,
    limit: int | None = None,
    after_key: str | None = None,
) -> dict[str, SyncRecord]:
    model = self._get_model(model_label)
    many_to_many_names = self._get_many_to_many_names(model)

    if after_key:
        identity_gt = {
            f'{self._identity_field}__gt': after_key,
        }

        queryset = model.objects.filter(
            Q(sync_field_last_modified__gt=timestamp) |
            Q(
                sync_field_last_modified=timestamp,
                **identity_gt,
            ),
        )
    else:
        queryset = model.objects.filter(
            sync_field_last_modified__gt=timestamp,
        )

    queryset = queryset.order_by(
        'sync_field_last_modified',
        self._identity_field,
    )

    if limit is not None:
        queryset = queryset[:limit]

    if many_to_many_names:
        queryset = queryset.prefetch_related(
            *many_to_many_names
        )

    return {
        str(getattr(instance, self._identity_field)): self._instance_to_record(instance)
        for instance in queryset
    }

get_deletes_since

Source code in django_spire/contrib/sync/django/storage/reader.py
def get_deletes_since(
    self,
    model_label: str,
    timestamp: int,
) -> dict[str, int]:
    from django_spire.contrib.sync.django.models.tombstone import SyncTombstone  # noqa: PLC0415

    rows = (
        SyncTombstone.objects
        .filter(model_label=model_label, timestamp__gt=timestamp)
        .values_list('record_key', 'timestamp')
    )

    return dict(rows)

get_records

Source code in django_spire/contrib/sync/django/storage/reader.py
def get_records(
    self,
    model_label: str,
    keys: set[str],
) -> dict[str, SyncRecord]:
    if not keys:
        return {}

    model = self._get_model(model_label)
    many_to_many_names = self._get_many_to_many_names(model)

    identity_lookup = {f'{self._identity_field}__in': keys}
    queryset = model.objects.filter(**identity_lookup)

    if many_to_many_names:
        queryset = queryset.prefetch_related(*many_to_many_names)

    return {
        str(getattr(instance, self._identity_field)): self._instance_to_record(instance)
        for instance in queryset
    }

get_syncable_models

Source code in django_spire/contrib/sync/django/storage/reader.py
def get_syncable_models(self) -> list[str]:
    return sorted(self._models.keys())