Skip to content

writer

django_spire.contrib.sync.django.storage.writer

logger = logging.getLogger(__name__) module-attribute

DjangoRecordWriter

Source code in django_spire/contrib/sync/django/storage/writer.py
def __init__(
    self,
    models: list[type[SyncableMixin]],
    identity_field: str = 'id',
    batch_size_max: int = _BATCH_SIZE_MAX,
    delete_strategies: dict[str, DeleteStrategy] | None = None,
    many_to_many_applier: ManyToManyApplier | None = None,
    upsert_strategy: UpsertStrategy | None = None,
) -> None:
    if not models:
        message = 'models must not be empty'
        raise InvalidParameterError(message)

    if not identity_field:
        message = 'identity_field must be a non-empty string'
        raise InvalidParameterError(message)

    if batch_size_max < 1:
        message = (
            f'batch_size_max must be >= 1, '
            f'got {batch_size_max}'
        )

        raise InvalidParameterError(message)

    self._batch_size_max = batch_size_max
    self._identity_field = identity_field

    self._models: dict[str, type[SyncableMixin]] = {
        model._meta.label: model
        for model in models
    }

    self._serializers: dict[str, SyncFieldSerializer] = {
        model._meta.label: SyncFieldSerializer(model)
        for model in models
    }

    self._upsert_strategy = (
        upsert_strategy
        or BulkUpsertStrategy(
            identity_field=identity_field,
        )
    )

    self._many_to_many_applier = (
        many_to_many_applier
        or ManyToManyApplier(
            identity_field=identity_field,
        )
    )

    self._delete_strategies = (
        delete_strategies
        or self._build_delete_strategies(models)
    )

delete_many

Source code in django_spire/contrib/sync/django/storage/writer.py
def delete_many(
    self,
    model_label: str,
    deletes: dict[str, int],
) -> None:
    if not deletes:
        return

    model = self._get_model(model_label)
    strategy = self._delete_strategies[model_label]
    keys = list(deletes.keys())

    for start in range(0, len(keys), self._batch_size_max):
        chunk_keys = keys[start:start + self._batch_size_max]
        chunk = {key: deletes[key] for key in chunk_keys}
        strategy.delete(model, chunk)

upsert_many

Source code in django_spire/contrib/sync/django/storage/writer.py
def upsert_many(
    self,
    model_label: str,
    records: dict[str, SyncRecord],
) -> set[str]:
    if not records:
        return set()

    model = self._get_model(model_label)
    many_to_many_names = self._get_many_to_many_names(model)
    serializer = self._serializers[model_label]

    skipped: set[str] = set()
    keys = sorted(records.keys())

    for start in range(0, len(keys), self._batch_size_max):
        chunk_keys = keys[start:start + self._batch_size_max]

        chunk_skipped = self._upsert_chunk(
            model,
            records,
            chunk_keys,
            many_to_many_names,
            serializer,
        )

        skipped |= chunk_skipped

    return skipped