Skip to content

django

django_spire.contrib.sync.django

__all__ = ['DjangoSyncLock', 'DjangoSyncStorage', 'HttpTransport', 'SyncClient', 'SyncServer', 'SyncableMixin', 'SyncableModelService', 'SyncableQuerySet', 'build_graph', 'process_sync_request', 'register_many_to_many_signals', 'seed_clock', 'sync_bypass'] module-attribute

HttpTransport

Bases: Transport

Source code in django_spire/contrib/sync/database/transport/http.py
def __init__(
    self,
    url: str,
    headers: dict[str, str] | None = None,
    retries: int = 3,
    retry_delay: float = 1.0,
    timeout: float = 30.0,
    response_bytes_max: int = _RESPONSE_BYTES_MAX,
) -> None:
    _validate_url(url)

    if retries < 1:
        message = f'retries must be >= 1, got {retries}'
        raise InvalidParameterError(message)

    if retry_delay < 0.0:
        message = (
            f'retry_delay must be non-negative, '
            f'got {retry_delay}'
        )

        raise InvalidParameterError(message)

    if timeout <= 0.0:
        message = f'timeout must be positive, got {timeout}'
        raise InvalidParameterError(message)

    if response_bytes_max < 1:
        message = (
            f'response_bytes_max must be >= 1, '
            f'got {response_bytes_max}'
        )

        raise InvalidParameterError(message)

    self._headers = headers or {}
    self._response_bytes_max = response_bytes_max
    self._retries = retries
    self._retry_delay = retry_delay
    self._timeout = timeout
    self._url = url

    if not self._headers:
        logger.warning(
            'HttpTransport created without '
            'auth headers for %s',
            self._url,
        )

exchange

Source code in django_spire/contrib/sync/database/transport/http.py
def exchange(
    self,
    manifest: SyncManifest,
) -> SyncManifest:
    payload = manifest.to_dict()

    response_data = retry(
        lambda: self._post(payload),
        attempts=self._retries,
        delay=self._retry_delay,
        exceptions=_TRANSIENT_EXCEPTIONS,
    )

    if not isinstance(response_data, dict):
        message = 'Server returned an invalid response format'
        raise InvalidResponseError(message)

    for required in _REQUIRED_RESPONSE_FIELDS:
        if required not in response_data:
            error = response_data.get(
                'error',
                f"Missing required manifest field {required!r}",
            )

            message = (
                f'Server returned an invalid sync '
                f'response: {error}'
            )

            raise InvalidResponseError(message)

    logger.info(
        'Exchanged manifest with %s '
        '(%d payloads sent, %d received)',
        self._url,
        len(manifest.payloads),
        len(response_data.get('payloads', [])),
    )

    return SyncManifest.from_dict(response_data)

SyncClient

Source code in django_spire/contrib/sync/django/client.py
def __init__(
    self,
    models: list[type[SyncableMixin]],
    node_id: str,
    transport: Transport,
    *,
    peer_node_id: str,
    batch_bytes: int | None = BATCH_BYTES_DEFAULT,
    batch_size: int | None = None,
    clock: HybridLogicalClock | None = None,
    clock_drift_max: int | None = 300,
    graph: DependencyGraph | None = None,
    lock: SyncLock | None = None,
    on_complete: Callable[[DatabaseResult], None] | None = None,
    on_phase: Callable[[SyncPhase], None] | None = None,
    payload_bytes_max: int | None = None,
    payload_records_max: int | None = None,
    progress: Callable[[SyncStage, int, int], None] | None = None,
    resolver: ConflictResolver | None = None,
    storage: DatabaseSyncStorage | None = None,
    transaction_fn: Callable[[], AbstractContextManager[Any]] = transaction.atomic,
) -> None:
    resolved_graph = graph or build_graph(models)

    deferred_foreign_keys = get_deferred_foreign_key_columns(
        models,
        resolved_graph,
    )

    foreign_key_columns = get_foreign_key_columns_for_cascade(models)

    self._engine = DatabaseEngine(
        batch_bytes=batch_bytes,
        batch_size=batch_size,
        clock=clock or SyncableMixin.get_clock(),
        clock_drift_max=clock_drift_max,
        foreign_key_columns=foreign_key_columns,
        graph=resolved_graph,
        lock=lock,
        node_id=node_id,
        on_complete=on_complete,
        on_phase=on_phase,
        payload_bytes_max=payload_bytes_max,
        payload_records_max=payload_records_max,
        peer_node_id=peer_node_id,
        progress=progress,
        reconciler=PayloadReconciler(
            resolver=resolver or
            FieldTimestampWins(),
        ),
        storage=storage or DjangoSyncStorage(
            models=models,
            deferred_foreign_keys=deferred_foreign_keys,
        ),
        transaction=transaction_fn,
        transport=transport,
    )

sync

Source code in django_spire/contrib/sync/django/client.py
def sync(self, dry_run: bool = False) -> DatabaseResult:
    return self._engine.sync(dry_run=dry_run)

DjangoSyncLock

Source code in django_spire/contrib/sync/django/lock.py
def __init__(
    self,
    timeout_stale: int = _STALE_TIMEOUT_DEFAULT,
) -> None:
    self._timeout_stale = timeout_stale

acquire

Source code in django_spire/contrib/sync/django/lock.py
def acquire(self, node_id: str, peer_node_id: str) -> str:
    self._ensure_lock_row(node_id, peer_node_id)

    with transaction.atomic():
        locked = (
            SyncNodeLock.objects
            .select_for_update()
            .filter(node_id=node_id, peer_node_id=peer_node_id)
            .first()
        )

        if locked is None:
            message = (
                f'Sync lock row for {node_id!r}<->{peer_node_id!r} '
                f'is missing after creation'
            )

            raise LockContentionError(message)

        self._abandon_stale_sessions(node_id, peer_node_id)
        self._check_active_session(node_id, peer_node_id)
        session_id = self._create_session(node_id, peer_node_id)

    logger.info(
        'Acquired sync lock for %s<->%s (session %s)',
        node_id,
        peer_node_id,
        session_id,
    )

    return session_id

hold

Source code in django_spire/contrib/sync/django/lock.py
def hold(self, node_id: str, peer_node_id: str) -> None:
    self._ensure_lock_row(node_id, peer_node_id)

    SyncNodeLock.objects.select_for_update().filter(
        node_id=node_id,
        peer_node_id=peer_node_id,
    ).first()

hold_global

Source code in django_spire/contrib/sync/django/lock.py
def hold_global(self) -> None:
    self._ensure_lock_row(_GLOBAL_LOCK_ID, _GLOBAL_LOCK_ID)

    SyncNodeLock.objects.select_for_update().filter(
        node_id=_GLOBAL_LOCK_ID,
        peer_node_id=_GLOBAL_LOCK_ID,
    ).first()

release

Source code in django_spire/contrib/sync/django/lock.py
def release(
    self,
    session_id: str,
    status: SyncStatus,
    result: DatabaseResult | None = None,
) -> None:
    session = (
        SyncSession.objects
        .filter(id=session_id)
        .first()
    )

    if session is None:
        logger.warning(
            'Sync session %s not found during release',
            session_id,
        )

        return

    now = timezone.now()
    elapsed_ms = (now - session.started_at).total_seconds() * 1000

    session.completed_at = now
    session.duration_ms = int(elapsed_ms)

    session.phase = (
        SyncPhase.COMPLETE
        if status == SyncStatus.SUCCESS
        else SyncPhase.FAILED
    )

    session.status = status

    if result is not None:
        session.records_pushed = sum(
            len(keys)
            for keys in result.pushed.values()
        )

        session.records_applied = sum(
            len(keys)
            for keys in result.applied.values()
        )

        session.records_created = sum(
            len(keys)
            for keys in result.created.values()
        )

        session.records_deleted = sum(
            len(keys)
            for keys in result.deleted.values()
        )

        session.conflicts = sum(
            len(keys)
            for keys in result.conflicts.values()
        )

        session.errors = len(result.errors)

    session.save()

    logger.info(
        'Released sync lock for session %s (status=%s)',
        session_id,
        status,
    )

update_phase

Source code in django_spire/contrib/sync/django/lock.py
def update_phase(self, session_id: str, phase: SyncPhase) -> None:
    SyncSession.objects.filter(id=session_id).update(phase=phase)

prune_old_sessions

Source code in django_spire/contrib/sync/django/lock.py
def prune_old_sessions(
    self,
    days: int = _PRUNE_RETENTION_DAYS_DEFAULT,
) -> int:
    cutoff = timezone.now() - timedelta(days=days)

    count, _ = SyncSession.objects.filter(
        completed_at__lt=cutoff,
        status__in=_TERMINAL_STATUSES,
    ).delete()

    if count:
        logger.info(
            'Pruned %d old sync session(s) older than %d days',
            count,
            days,
        )

    return count

SyncableMixin

Bases: SyncableFieldsMixin

Source code in django_spire/contrib/sync/django/mixin.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
    super().__init__(*args, **kwargs)

    self._tracker = FieldUpdateTracker()
    self._tracker.snapshot(self._get_field_values())

id = models.UUIDField(primary_key=True, default=(uuid.uuid4), editable=False) class-attribute instance-attribute

Meta

abstract = True class-attribute instance-attribute

SyncableQuerySet

Bases: QuerySet

bulk_create

Source code in django_spire/contrib/sync/django/queryset.py
def bulk_create(
    self,
    objs: list[Any],
    **kwargs: Any,
) -> list[Any]:
    if _is_bypassed():
        return super().bulk_create(objs, **kwargs)

    if not objs:
        return super().bulk_create(objs, **kwargs)

    syncable = [
        instance for instance in objs
        if hasattr(instance, 'get_syncable_field_names')
    ]

    if not syncable:
        return super().bulk_create(objs, **kwargs)

    from django_spire.contrib.sync.django.sequence import (  # noqa: PLC0415
        SyncSequenceAllocator,
    )

    clock = self.model.get_clock()

    with transaction.atomic(using=self.db):
        sequence_first = SyncSequenceAllocator(using=self.db).allocate(len(syncable)).value_first
        sequence_next = sequence_first

        for instance in syncable:
            now = clock.now()
            timestamps = dict(instance.sync_field_timestamps)

            for name in instance.get_syncable_field_names():
                if name not in timestamps:
                    timestamps[name] = now

            instance.sync_field_timestamps = timestamps

            if not instance.sync_field_last_modified:
                instance.sync_field_last_modified = now

            if not instance.sync_field_sequence:
                instance.sync_field_sequence = sequence_next
                sequence_next += 1

            if not instance.sync_field_origin_node:
                instance.sync_field_origin_node = ''

        return super().bulk_create(objs, **kwargs)

bulk_update

Source code in django_spire/contrib/sync/django/queryset.py
def bulk_update(
    self,
    objs: list[Any],
    fields: list[str] | tuple[str, ...],
    **kwargs: Any,
) -> int:
    if _is_bypassed():
        return super().bulk_update(objs, fields, **kwargs)

    if not objs:
        return super().bulk_update(objs, fields, **kwargs)

    syncable = [
        instance for instance in objs
        if hasattr(instance, 'get_syncable_field_names')
    ]

    if not syncable:
        return super().bulk_update(objs, fields, **kwargs)

    from django_spire.contrib.sync.django.sequence import (  # noqa: PLC0415
        SyncSequenceAllocator,
    )

    clock = self.model.get_clock()
    attname_map = _relation_attname_map(self.model)

    field_set = set(fields)

    stamped_fields = list(field_set | {
        'sync_field_last_modified',
        'sync_field_origin_node',
        'sync_field_sequence',
        'sync_field_timestamps',
    })

    with transaction.atomic(using=self.db):
        sequence_first = SyncSequenceAllocator(using=self.db).allocate(len(syncable)).value_first
        sequence_next = sequence_first

        for instance in syncable:
            now = clock.now()
            timestamps = dict(instance.sync_field_timestamps)

            for name in field_set:
                if name in instance._sync_exclude_fields:
                    continue

                timestamps[attname_map.get(name, name)] = now

            instance.sync_field_timestamps = timestamps
            instance.sync_field_last_modified = now
            instance.sync_field_sequence = sequence_next
            instance.sync_field_origin_node = ''
            sequence_next += 1

        return super().bulk_update(objs, stamped_fields, **kwargs)

delete

Source code in django_spire/contrib/sync/django/queryset.py
def delete(self) -> tuple[int, dict[str, int]]:
    if _is_bypassed():
        return super().delete()

    if not hasattr(self.model, 'is_deleted'):
        return super().delete()

    count = self.exclude(is_deleted=True).update(is_deleted=True)

    return count, {self.model._meta.label: count}

update

Source code in django_spire/contrib/sync/django/queryset.py
def update(self, **kwargs: Any) -> int:
    if _is_bypassed():
        return super().update(**kwargs)

    if not hasattr(self.model, 'get_syncable_field_names'):
        return super().update(**kwargs)

    if not kwargs:
        return super().update(**kwargs)

    from django_spire.contrib.sync.django.sequence import (  # noqa: PLC0415
        SyncSequenceAllocator,
    )

    clock = self.model.get_clock()
    exclude = self.model._sync_exclude_fields
    attname_map = _relation_attname_map(self.model)

    stampable_field_names = [
        attname_map.get(name, name) for name in kwargs
        if name not in exclude
    ]

    with transaction.atomic(using=self.db):
        rows = list(
            self.select_for_update().values_list(
                'pk',
                'sync_field_timestamps',
            )
        )

        if not rows:
            return 0

        sequence_first = SyncSequenceAllocator(using=self.db).allocate(len(rows)).value_first

        total = 0

        with sync_bypass():
            for index, (primary_key, current_timestamps) in enumerate(rows):
                now = clock.now()
                timestamps = dict(current_timestamps or {})

                for field_name in stampable_field_names:
                    timestamps[field_name] = now

                update_kwargs = {
                    **kwargs,
                    'sync_field_timestamps': timestamps,
                    'sync_field_last_modified': now,
                    'sync_field_sequence': sequence_first + index,
                    'sync_field_origin_node': '',
                }

                total += (
                    self.model.objects
                    .using(self.db)
                    .filter(pk=primary_key)
                    .update(**update_kwargs)
                )

        return total

SyncServer

Source code in django_spire/contrib/sync/django/server.py
def __init__(
    self,
    models: list[type[SyncableMixin]],
    node_id: str,
    *,
    batch_bytes: int | None = BATCH_BYTES_DEFAULT,
    batch_size: int | None = None,
    clock: HybridLogicalClock | None = None,
    clock_drift_max: int | None = 300,
    graph: DependencyGraph | None = None,
    lock: SyncLock | None = None,
    on_complete: Callable[[DatabaseResult], None] | None = None,
    on_phase: Callable[[SyncPhase], None] | None = None,
    payload_bytes_max: int | None = None,
    payload_records_max: int | None = None,
    progress: Callable[[SyncStage, int, int], None] | None = None,
    resolver: ConflictResolver | None = None,
    storage: DatabaseSyncStorage | None = None,
    transaction_fn: Callable[[], AbstractContextManager[Any]] = transaction.atomic,
) -> None:
    resolved_graph = graph or build_graph(models)

    deferred_foreign_keys = get_deferred_foreign_key_columns(
        models,
        resolved_graph,
    )

    foreign_key_columns = get_foreign_key_columns_for_cascade(models)

    self._engine = DatabaseEngine(
        batch_bytes=batch_bytes,
        batch_size=batch_size,
        clock=clock or SyncableMixin.get_clock(),
        clock_drift_max=clock_drift_max,
        foreign_key_columns=foreign_key_columns,
        graph=resolved_graph,
        lock=lock or DjangoSyncLock(),
        node_id=node_id,
        on_complete=on_complete,
        on_phase=on_phase,
        payload_bytes_max=payload_bytes_max,
        payload_records_max=payload_records_max,
        progress=progress,
        reconciler=PayloadReconciler(
            resolver=resolver or FieldTimestampWins(),
        ),
        storage=storage or DjangoSyncStorage(
            models=models,
            deferred_foreign_keys=deferred_foreign_keys,
        ),
        transaction=transaction_fn,
    )

handle

Source code in django_spire/contrib/sync/django/server.py
def handle(
    self,
    incoming: SyncManifest,
) -> tuple[SyncManifest, DatabaseResult]:
    return self._engine.process(incoming)

serve

Source code in django_spire/contrib/sync/django/server.py
def serve(
    self,
    request: HttpRequest,
    validate_node_id: Callable[[HttpRequest, str], bool] | None = None,
) -> JsonResponse:
    return process_sync_request(
        request,
        self._engine,
        validate_node_id=validate_node_id,
    )

SyncableModelService

set_many_to_many staticmethod

Source code in django_spire/contrib/sync/django/service.py
@staticmethod
def set_many_to_many(
    instance: SyncableMixin,
    field_name: str,
    values: list[Any],
) -> None:
    if instance._state.adding:
        message = (
            f'Cannot set M2M field {field_name!r} before the '
            f'instance is saved. Call save() first.'
        )

        raise InvalidParameterError(message)

    getattr(instance, field_name).set(values)

DjangoSyncStorage

Source code in django_spire/contrib/sync/django/storage/facade.py
def __init__(
    self,
    models: list[type[SyncableMixin]],
    identity_field: str = 'id',
    batch_size_max: int = _BATCH_SIZE_MAX,
    checkpoint_store: CheckpointStore | None = None,
    deferred_foreign_keys: list[DeferredForeignKey] | None = None,
    record_reader: RecordReader | None = None,
    record_writer: RecordWriter | None = None,
    sequence_allocator: SequenceAllocator | None = None,
) -> None:
    self._checkpoint_store = checkpoint_store or DjangoCheckpointStore()

    self._record_reader = record_reader or DjangoRecordReader(
        models=models,
        identity_field=identity_field,
    )

    self._record_writer = record_writer or DjangoRecordWriter(
        models=models,
        identity_field=identity_field,
        batch_size_max=batch_size_max,
        deferred_foreign_keys=deferred_foreign_keys,
    )

    self._sequence_allocator = (
        sequence_allocator or
        SyncSequenceAllocator()
    )

clear_tombstones

Source code in django_spire/contrib/sync/django/storage/facade.py
def clear_tombstones(
    self,
    model_label: str,
    keys: set[str],
) -> None:
    self._record_writer.clear_tombstones(model_label, keys)

delete_many

Source code in django_spire/contrib/sync/django/storage/facade.py
def delete_many(
    self,
    model_label: str,
    deletes: dict[str, int],
    origin_node: str,
) -> None:
    self._record_writer.delete_many(
        model_label,
        deletes,
        origin_node,
    )

flush_deferred_backfill

Source code in django_spire/contrib/sync/django/storage/facade.py
def flush_deferred_backfill(self) -> None:
    self._record_writer.flush_deferred_backfill()

get_after_keys

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_after_keys(self, peer_node_id: str) -> dict[str, Any]:
    return self._checkpoint_store.get_after_keys(peer_node_id)

get_changed_since

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_changed_since(
    self,
    model_label: str,
    sequence: int,
    peer_node_id: str,
    sequence_max: int | None = None,
    limit: int | None = None,
    after_key: str | None = None,
) -> dict[str, SyncRecord]:
    return self._record_reader.get_changed_since(
        model_label,
        sequence,
        peer_node_id,
        sequence_max=sequence_max,
        limit=limit,
        after_key=after_key,
    )

get_deletes_since

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_deletes_since(
    self,
    model_label: str,
    sequence: int,
    peer_node_id: str,
    sequence_max: int | None = None,
) -> dict[str, int]:
    return self._record_reader.get_deletes_since(
        model_label,
        sequence,
        peer_node_id,
        sequence_max=sequence_max,
    )

get_checkpoint

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_checkpoint(self, peer_node_id: str) -> CheckpointPosition:
    return self._checkpoint_store.get_checkpoint(peer_node_id)

get_records

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_records(
    self,
    model_label: str,
    keys: set[str],
) -> dict[str, SyncRecord]:
    return self._record_reader.get_records(model_label, keys)

get_sequence_allocator

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_sequence_allocator(self) -> SequenceAllocator:
    return self._sequence_allocator

get_syncable_models

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_syncable_models(self) -> list[str]:
    return self._record_reader.get_syncable_models()

get_tombstones

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_tombstones(
    self,
    model_label: str,
    keys: set[str],
) -> dict[str, int]:
    return self._record_reader.get_tombstones(model_label, keys)

save_checkpoint

Source code in django_spire/contrib/sync/django/storage/facade.py
def save_checkpoint(
    self,
    peer_node_id: str,
    peer_sequence: int,
    local_sequence_pushed: int,
    after_keys: dict[str, Any] | None = None,
) -> None:
    self._checkpoint_store.save_checkpoint(
        peer_node_id,
        peer_sequence,
        local_sequence_pushed,
        after_keys=after_keys,
    )

stamp_unstamped_records

Source code in django_spire/contrib/sync/django/storage/facade.py
def stamp_unstamped_records(
    self,
    clock: HybridLogicalClock,
    model_order: list[str] | None = None,
) -> None:
    count = self._record_writer.stamp_unstamped_records(
        clock=clock,
        model_order=model_order,
    )

    if count:
        logger.info(
            'Pre-sync stamping complete: %d records',
            count,
        )

upsert_many

Source code in django_spire/contrib/sync/django/storage/facade.py
def upsert_many(
    self,
    model_label: str,
    records: dict[str, SyncRecord],
    origin_node: str,
) -> UpsertResult:
    return self._record_writer.upsert_many(
        model_label,
        records,
        origin_node,
    )

build_graph

Source code in django_spire/contrib/sync/django/graph.py
def build_graph(
    models: list[type[SyncableMixin]],
) -> DependencyGraph:
    labels = {model._meta.label for model in models}

    required_edges: dict[str, set[str]] = {}
    optional_edges: dict[str, set[str]] = {}

    for model in models:
        required: set[str] = set()
        optional: set[str] = set()

        foreign_keys = chain(
            model._meta.concrete_fields,
            model._meta.many_to_many,
        )

        for field in foreign_keys:
            if not field.is_relation:
                continue

            related_label = field.related_model._meta.label

            if related_label in labels and related_label != model._meta.label:
                if not getattr(field, 'null', True):
                    required.add(related_label)
                else:
                    optional.add(related_label)

        required_edges[model._meta.label] = required
        optional_edges[model._meta.label] = optional

    if _has_cycle(required_edges):
        involved = {
            label
            for label, dependencies in required_edges.items()
            if dependencies
        }

        message = (
            f'Non-nullable foreign keys form a circular dependency '
            f'among: {involved}'
        )

        raise CircularDependencyError(message)

    ordering_edges = {
        label: set(dependencies)
        for label, dependencies in required_edges.items()
    }

    deferred_edges: dict[str, set[str]] = {
        label: set()
        for label in ordering_edges
    }

    for source in sorted(optional_edges):
        for target in sorted(optional_edges[source]):
            ordering_edges[source].add(target)

            if _has_cycle(ordering_edges):
                ordering_edges[source].discard(target)
                deferred_edges[source].add(target)

    deferred_edges = {
        label: targets
        for label, targets in deferred_edges.items()
        if targets
    }

    return DependencyGraph(
        ordering_edges,
        deferred_edges=deferred_edges or None,
    )

sync_bypass

Source code in django_spire/contrib/sync/django/queryset.py
@contextmanager
def sync_bypass() -> Iterator[None]:
    previous = getattr(_bypass, 'active', False)
    _bypass.active = True

    try:
        yield
    finally:
        _bypass.active = previous

seed_clock

Source code in django_spire/contrib/sync/django/seed.py
def seed_clock(
    clock: HybridLogicalClock,
    models: list[type[SyncableMixin]],
) -> None:
    water_high = 0

    for model in models:
        result = model.objects.aggregate(maximum_timestamp=Max('sync_field_last_modified'))
        timestamp = result['maximum_timestamp'] or 0

        water_high = max(water_high, timestamp)

    if water_high:
        clock.receive(water_high)
        logger.info('Seeded HLC from database: %d', water_high)

register_many_to_many_signals

Source code in django_spire/contrib/sync/django/signals.py
def register_many_to_many_signals(
    parent_models: list[type[SyncableMixin]],
) -> None:
    if not parent_models:
        message = 'parent_models must be a non-empty list'
        raise InvalidParameterError(message)

    for parent_model in parent_models:
        for field in parent_model._meta.many_to_many:
            through_model = field.remote_field.through

            dispatch_uid = (
                f'syncable_m2m:'
                f'{parent_model._meta.label}:{field.name}'
            )

            m2m_changed.connect(
                _on_many_to_many_changed,
                sender=through_model,
                dispatch_uid=dispatch_uid,
            )

process_sync_request

Source code in django_spire/contrib/sync/django/views.py
def process_sync_request(
    request: HttpRequest,
    engine: DatabaseEngine,
    request_bytes_max: int = _REQUEST_BYTES_MAX,
    validate_node_id: Callable[[HttpRequest, str], bool] | None = None,
) -> JsonResponse:
    if request_bytes_max < 1:
        message = (
            f'request_bytes_max must be >= 1, '
            f'got {request_bytes_max}'
        )

        raise InvalidParameterError(message)

    if request.method != 'POST':
        return JsonResponse(
            {'ok': False, 'error': 'Method not allowed'},
            status=405,
        )

    content_type = request.content_type or ''

    if content_type != 'application/json':
        return JsonResponse(
            {
                'ok': False,
                'error': 'Content-Type must be application/json',
            },
            status=415,
        )

    rejection = _reject_if_oversized_header(
        request,
        request_bytes_max,
    )

    if rejection is not None:
        return rejection

    body, error = _read_body(request, request_bytes_max)

    if error is not None:
        return error

    incoming, error = _parse_manifest(body)

    if error is not None:
        return error

    if validate_node_id is not None:
        if not validate_node_id(request, incoming.node_id):
            return JsonResponse(
                {
                    'ok': False,
                    'error': 'Node ID not authorized for this user',
                },
                status=403,
            )

    logger.info(
        'Received sync from node %s with %d payloads',
        incoming.node_id,
        len(incoming.payloads),
    )

    try:
        response, result = engine.process(incoming)
    except SyncAbortedError:
        logger.exception(
            'Sync aborted for node %s',
            incoming.node_id,
        )

        return JsonResponse(
            {'ok': False, 'error': 'Sync aborted'},
            status=409,
        )

    skipped_count = sum(
        len(keys)
        for keys in result.skipped.values()
    )

    return JsonResponse({
        **response.to_dict(),
        'ok': result.ok,
        'errors': [
            {'key': error.key, 'message': error.message}
            for error in result.errors
        ],
        'skipped': skipped_count,
    })