Skip to content

writer

django_spire.contrib.sync.django.storage.writer

logger = logging.getLogger(__name__) module-attribute

DjangoRecordWriter

Source code in django_spire/contrib/sync/django/storage/writer.py
def __init__(
    self,
    models: list[type[SyncableMixin]],
    identity_field: str = 'id',
    batch_size_max: int = _BATCH_SIZE_MAX,
    deferred_foreign_keys: list[DeferredForeignKey] | None = None,
    delete_strategies: dict[str, DeleteStrategy] | None = None,
    many_to_many_applier: ManyToManyApplier | None = None,
    upsert_strategy: UpsertStrategy | None = None,
) -> None:
    if not models:
        message = 'models must not be empty'
        raise InvalidParameterError(message)

    if not identity_field:
        message = 'identity_field must be a non-empty string'
        raise InvalidParameterError(message)

    if batch_size_max < 1:
        message = (
            f'batch_size_max must be >= 1, '
            f'got {batch_size_max}'
        )

        raise InvalidParameterError(message)

    self._batch_size_max = batch_size_max
    self._identity_field = identity_field

    self._models: dict[str, type[SyncableMixin]] = {
        model._meta.label: model
        for model in models
    }

    self._serializers: dict[str, SyncFieldSerializer] = {
        model._meta.label: SyncFieldSerializer(model)
        for model in models
    }

    self._upsert_strategy = (
        upsert_strategy
        or BulkUpsertStrategy(
            identity_field=identity_field,
        )
    )

    self._many_to_many_applier = (
        many_to_many_applier
        or ManyToManyApplier(
            identity_field=identity_field,
        )
    )

    self._delete_strategies = (
        delete_strategies or
        self._build_delete_strategies(models)
    )

    self._deferred_attribute_names: dict[str, set[str]] = (
        defaultdict(set)
    )

    self._external_nullable_attribute_names: dict[str, set[str]] = (
        defaultdict(set)
    )

    syncable_labels = set(self._models.keys())

    for deferred_foreign_key in (deferred_foreign_keys or []):
        if deferred_foreign_key.target_label in syncable_labels:
            self._deferred_attribute_names[deferred_foreign_key.source_label].add(
                deferred_foreign_key.attribute_name,
            )
        else:
            self._external_nullable_attribute_names[deferred_foreign_key.source_label].add(
                deferred_foreign_key.attribute_name,
            )

clear_tombstones

Source code in django_spire/contrib/sync/django/storage/writer.py
def clear_tombstones(
    self,
    model_label: str,
    keys: set[str],
) -> None:
    if not keys:
        return

    from django_spire.contrib.sync.django.models.tombstone import SyncTombstone  # noqa: PLC0415

    SyncTombstone.objects.filter(
        model_label=model_label,
        record_key__in=keys,
    ).delete()

delete_many

Source code in django_spire/contrib/sync/django/storage/writer.py
def delete_many(
    self,
    model_label: str,
    deletes: dict[str, int],
    origin_node: str,
) -> None:
    if not deletes:
        return

    model = self._get_model(model_label)
    strategy = self._delete_strategies[model_label]
    keys = list(deletes.keys())

    for start in range(0, len(keys), self._batch_size_max):
        chunk_keys = keys[start:start + self._batch_size_max]
        chunk = {key: deletes[key] for key in chunk_keys}

        with transaction.atomic():
            strategy.delete(model, chunk, origin_node)

flush_deferred_backfill

Source code in django_spire/contrib/sync/django/storage/writer.py
def flush_deferred_backfill(self) -> None:
    from django_spire.contrib.sync.django.models.deferred_backfill import (  # noqa: PLC0415
        SyncDeferredBackfill,
    )

    rows = list(SyncDeferredBackfill.objects.all())

    if not rows:
        return

    by_label: dict[str, list[SyncDeferredBackfill]] = defaultdict(list)

    for row in rows:
        by_label[row.model_label].append(row)

    for model_label, label_rows in by_label.items():
        if model_label not in self._models:
            logger.warning(
                'Skipping %d deferred backfill row(s) for '
                'unknown model %s',
                len(label_rows),
                model_label,
            )

            continue

        model = self._get_model(model_label)

        key_columns: dict[str, dict[str, Any]] = {}

        for row in label_rows:
            key_columns.setdefault(row.record_key, {})[row.attname] = row.fk_value

        still_pending = self._run_backfill(model, key_columns)

        ids_to_delete: list[int] = []

        for row in label_rows:
            still_pending_for_key = still_pending.get(row.record_key, {})

            if row.attname not in still_pending_for_key:
                ids_to_delete.append(row.id)

        if ids_to_delete:
            SyncDeferredBackfill.objects.filter(
                id__in=ids_to_delete,
            ).delete()

        still_count = sum(
            len(values) for values in still_pending.values()
        )

        if still_count:
            logger.debug(
                '%d deferred FK backfill(s) still pending '
                'for %s (targets not yet present)',
                still_count,
                model_label,
            )

stamp_unstamped_records

Source code in django_spire/contrib/sync/django/storage/writer.py
def stamp_unstamped_records(
    self,
    clock: HybridLogicalClock,
    model_order: list[str] | None = None,
) -> int:
    from django_spire.contrib.sync.django.queryset import sync_bypass  # noqa: PLC0415
    from django_spire.contrib.sync.django.sequence import (  # noqa: PLC0415
        SyncSequenceAllocator,
    )

    total = 0
    labels = model_order or sorted(self._models.keys())

    self._ensure_sync_columns(labels)
    self._reconcile_counter(labels)

    with sync_bypass():
        for model_label in labels:
            if model_label not in self._models:
                continue

            model = self._models[model_label]

            connection = connections[
                router.db_for_write(model) or
                'default'
            ]

            quote = connection.ops.quote_name
            table = quote(model._meta.db_table)

            id_field = model._meta.get_field(self._identity_field)

            id_column = quote(id_field.column)

            with connection.cursor() as cursor:
                cursor.execute(
                    self._build_zero_field_count_sql(
                        table,
                        'sync_field_sequence',
                    ),
                )

                sequence_count = cursor.fetchone()[0]

                cursor.execute(
                    self._build_zero_field_count_sql(
                        table,
                        'sync_field_last_modified',
                    ),
                )

                timestamp_count = cursor.fetchone()[0]

            if sequence_count == 0 and timestamp_count == 0:
                continue

            stamp_timestamp = clock.now()

            stamp_field_names = (
                list(model.get_syncable_field_names()) +
                list(model.get_syncable_many_to_many_names())
            )

            stamp_timestamps_json = json.dumps(
                dict.fromkeys(
                    stamp_field_names,
                    stamp_timestamp
                )
            )

            sequence_first = 0

            with transaction.atomic():
                if sequence_count > 0:
                    sequence_first = SyncSequenceAllocator().allocate(sequence_count).value_first

                    with connection.cursor() as cursor:
                        cursor.execute(
                            self._build_assign_sequence_sql(
                                table,
                                id_column,
                            ),
                            [sequence_first],
                        )

                if timestamp_count > 0:
                    with connection.cursor() as cursor:
                        cursor.execute(
                            self._build_stamp_modified_sql(table),
                            [stamp_timestamp, stamp_timestamps_json],
                        )

            stamped = max(sequence_count, timestamp_count)

            logger.info(
                'Stamped %s in %s '
                '(%d sequences from %d, %d timestamps, ts=%d)',
                stamped,
                model_label,
                sequence_count,
                sequence_first,
                timestamp_count,
                stamp_timestamp,
            )

            total += stamped

    return total

upsert_many

Source code in django_spire/contrib/sync/django/storage/writer.py
def upsert_many(
    self,
    model_label: str,
    records: dict[str, SyncRecord],
    origin_node: str,
) -> UpsertResult:
    if not records:
        return UpsertResult()

    model = self._get_model(model_label)
    many_to_many_names = self._get_many_to_many_names(model)
    serializer = self._serializers[model_label]

    skipped: set[str] = set()
    errors: list[Error] = []
    keys = sorted(records.keys())

    for start in range(0, len(keys), self._batch_size_max):
        chunk_keys = keys[start:start + self._batch_size_max]

        chunk_result = self._upsert_chunk(
            model,
            records,
            chunk_keys,
            many_to_many_names,
            serializer,
            origin_node,
        )

        skipped |= chunk_result.skipped
        errors.extend(chunk_result.errors)

    return UpsertResult(skipped=skipped, errors=errors)