Skip to content

django

django_spire.contrib.sync.django

__all__ = ['DjangoSyncLock', 'DjangoSyncStorage', 'HttpTransport', 'SyncClient', 'SyncServer', 'SyncableMixin', 'SyncableModelService', 'SyncableQuerySet', 'build_graph', 'process_sync_request', 'register_m2m_signals', 'seed_clock', 'sync_bypass'] module-attribute

HttpTransport

Bases: Transport

Source code in django_spire/contrib/sync/database/transport/http.py
def __init__(
    self,
    url: str,
    headers: dict[str, str] | None = None,
    retries: int = 3,
    retry_delay: float = 1.0,
    timeout: float = 30.0,
    response_bytes_max: int = _RESPONSE_BYTES_MAX,
) -> None:
    _validate_url(url)

    if retries < 1:
        message = f'retries must be >= 1, got {retries}'
        raise InvalidParameterError(message)

    if retry_delay < 0.0:
        message = (
            f'retry_delay must be non-negative, '
            f'got {retry_delay}'
        )

        raise InvalidParameterError(message)

    if timeout <= 0.0:
        message = f'timeout must be positive, got {timeout}'
        raise InvalidParameterError(message)

    if response_bytes_max < 1:
        message = (
            f'response_bytes_max must be >= 1, '
            f'got {response_bytes_max}'
        )

        raise InvalidParameterError(message)

    self._headers = headers or {}
    self._response_bytes_max = response_bytes_max
    self._retries = retries
    self._retry_delay = retry_delay
    self._timeout = timeout
    self._url = url

    if not self._headers:
        logger.warning(
            'HttpTransport created without '
            'auth headers for %s',
            self._url,
        )

exchange

Source code in django_spire/contrib/sync/database/transport/http.py
def exchange(
    self, manifest: SyncManifest,
) -> SyncManifest:
    payload = manifest.to_dict()

    response_data = retry(
        lambda: self._post(payload),
        attempts=self._retries,
        delay=self._retry_delay,
        exceptions=_TRANSIENT_EXCEPTIONS,
    )

    if not isinstance(response_data, dict):
        message = 'Server returned an invalid response format'
        raise InvalidResponseError(message)

    if 'node_id' not in response_data:
        error = response_data.get(
            'error',
            'Missing required manifest fields',
        )

        message = (
            f'Server returned an invalid sync '
            f'response: {error}'
        )

        raise InvalidResponseError(message)

    if 'checkpoint' not in response_data:
        error = response_data.get(
            'error',
            'Missing required manifest fields',
        )

        message = (
            f'Server returned an invalid sync '
            f'response: {error}'
        )

        raise InvalidResponseError(message)

    logger.info(
        'Exchanged manifest with %s '
        '(%d payloads sent, %d received)',
        self._url,
        len(manifest.payloads),
        len(response_data.get('payloads', [])),
    )

    return SyncManifest.from_dict(response_data)

SyncClient

Source code in django_spire/contrib/sync/django/client.py
def __init__(
    self,
    models: list[type[SyncableMixin]],
    node_id: str,
    transport: Transport,
    *,
    batch_bytes: int | None = BATCH_BYTES_DEFAULT,
    batch_size: int | None = None,
    clock: HybridLogicalClock | None = None,
    clock_drift_max: int | None = 300,
    graph: DependencyGraph | None = None,
    lock: SyncLock | None = None,
    on_complete: Callable[[DatabaseResult], None] | None = None,
    on_phase: Callable[[SyncPhase], None] | None = None,
    payload_bytes_max: int | None = None,
    payload_records_max: int | None = None,
    progress: Callable[[SyncStage, int, int], None] | None = None,
    resolver: ConflictResolver | None = None,
    storage: DatabaseSyncStorage | None = None,
    transaction_fn: Callable[[], AbstractContextManager[Any]] = transaction.atomic,
) -> None:
    self._engine = DatabaseEngine(
        batch_bytes=batch_bytes,
        batch_size=batch_size,
        clock=clock or SyncableMixin.get_clock(),
        clock_drift_max=clock_drift_max,
        graph=graph or build_graph(models),
        node_id=node_id,
        on_complete=on_complete,
        on_phase=on_phase,
        payload_bytes_max=payload_bytes_max,
        payload_records_max=payload_records_max,
        progress=progress,
        reconciler=PayloadReconciler(
            resolver=resolver or
            FieldTimestampWins(),
        ),
        storage=storage or DjangoSyncStorage(models=models),
        transaction=transaction_fn,
        transport=transport,
    )

sync

Source code in django_spire/contrib/sync/django/client.py
def sync(self, dry_run: bool = False) -> DatabaseResult:
    return self._engine.sync(dry_run=dry_run)

DjangoSyncLock

Source code in django_spire/contrib/sync/django/lock.py
def __init__(
    self,
    timeout_stale: int = _STALE_TIMEOUT_DEFAULT,
) -> None:
    self._timeout_stale = timeout_stale

acquire

Source code in django_spire/contrib/sync/django/lock.py
def acquire(self, node_id: str) -> str:
    self._ensure_lock_row(node_id)

    with transaction.atomic():
        locked = (
            SyncNodeLock.objects
            .select_for_update()
            .filter(node_id=node_id)
            .first()
        )

        if locked is None:
            message = f'Sync lock row for {node_id!r} is missing after creation'
            raise LockContentionError(message)

        self._abandon_stale_sessions(node_id)
        self._check_active_session(node_id)
        session_id = self._create_session(node_id)

    logger.info(
        'Acquired sync lock for node %s (session %s)',
        node_id,
        session_id,
    )

    return session_id

hold

Source code in django_spire/contrib/sync/django/lock.py
def hold(self, node_id: str) -> None:
    self._ensure_lock_row(node_id)
    SyncNodeLock.objects.select_for_update().filter(node_id=node_id).first()

release

Source code in django_spire/contrib/sync/django/lock.py
def release(
    self,
    session_id: str,
    status: SyncStatus,
    result: DatabaseResult | None = None,
) -> None:
    session = (
        SyncSession.objects
        .filter(id=session_id)
        .first()
    )

    if session is None:
        logger.warning(
            'Sync session %s not found during release',
            session_id,
        )

        return

    now = timezone.now()
    elapsed_ms = (now - session.started_at).total_seconds() * 1000

    session.completed_at = now
    session.duration_ms = int(elapsed_ms)

    session.phase = (
        SyncPhase.COMPLETE
        if status == SyncStatus.SUCCESS
        else SyncPhase.FAILED
    )

    session.status = status

    if result is not None:
        session.records_pushed = sum(
            len(keys)
            for keys in result.pushed.values()
        )

        session.records_applied = sum(
            len(keys)
            for keys in result.applied.values()
        )

        session.records_created = sum(
            len(keys)
            for keys in result.created.values()
        )

        session.records_deleted = sum(
            len(keys)
            for keys in result.deleted.values()
        )

        session.conflicts = sum(
            len(keys)
            for keys in result.conflicts.values()
        )

        session.errors = len(result.errors)

    session.save()

    logger.info(
        'Released sync lock for session %s (status=%s)',
        session_id,
        status,
    )

update_phase

Source code in django_spire/contrib/sync/django/lock.py
def update_phase(self, session_id: str, phase: SyncPhase) -> None:
    SyncSession.objects.filter(id=session_id).update(phase=phase)

SyncableMixin

Bases: Model

Source code in django_spire/contrib/sync/django/mixin.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
    super().__init__(*args, **kwargs)

    self._tracker = FieldUpdateTracker()
    self._tracker.snapshot(self._get_field_values())

id = models.UUIDField(primary_key=True, default=(uuid.uuid4), editable=False) class-attribute instance-attribute

sync_field_timestamps = models.JSONField(default=dict, editable=False) class-attribute instance-attribute

sync_field_last_modified = models.BigIntegerField(default=0, editable=False, db_index=True) class-attribute instance-attribute

objects = SyncableQuerySet.as_manager() class-attribute instance-attribute

Meta

abstract = True class-attribute instance-attribute

save

Source code in django_spire/contrib/sync/django/mixin.py
def save(self, *args: Any, **kwargs: Any) -> None:
    if not _is_bypassed():
        dirty = self.get_dirty_fields()

        if dirty:
            now = self.get_clock().now()
            timestamps = dict(self.sync_field_timestamps)

            for field_name in dirty:
                timestamps[field_name] = now

            self.sync_field_timestamps = timestamps
            self.sync_field_last_modified = now

    super().save(*args, **kwargs)
    self._tracker.snapshot(self._get_field_values())

get_dirty_fields

Source code in django_spire/contrib/sync/django/mixin.py
def get_dirty_fields(self) -> set[str]:
    if self._state.adding:
        return set(self.get_syncable_field_names())

    return self._tracker.get_dirty(self._get_field_values())

refresh_from_db

Source code in django_spire/contrib/sync/django/mixin.py
def refresh_from_db(
    self, *args: Any, **kwargs: Any,
) -> None:
    super().refresh_from_db(*args, **kwargs)
    self._tracker.snapshot(self._get_field_values())

configure classmethod

Source code in django_spire/contrib/sync/django/mixin.py
@classmethod
def configure(cls, clock: HybridLogicalClock) -> None:
    cls._clock = clock

get_clock classmethod

Source code in django_spire/contrib/sync/django/mixin.py
@classmethod
def get_clock(cls) -> HybridLogicalClock:
    if cls._clock is None:
        message = (
            'SyncableMixin clock not configured. '
            'Call SyncableMixin.configure(clock) '
            'in AppConfig.ready().'
        )

        raise ClockNotConfiguredError(message)

    return cls._clock

get_syncable_field_names classmethod

Source code in django_spire/contrib/sync/django/mixin.py
@classmethod
def get_syncable_field_names(cls) -> list[str]:
    return sorted(
        field.name
        for field in cls._meta.concrete_fields
        if field.name not in cls._sync_exclude_fields
    )

get_syncable_m2m_names classmethod

Source code in django_spire/contrib/sync/django/mixin.py
@classmethod
def get_syncable_m2m_names(cls) -> list[str]:
    return sorted(
        field.name
        for field in cls._meta.many_to_many
    )

SyncableQuerySet

Bases: QuerySet

bulk_create

Source code in django_spire/contrib/sync/django/queryset.py
def bulk_create(
    self,
    objs: list[Any],
    **kwargs: Any,
) -> list[Any]:
    if _is_bypassed():
        return super().bulk_create(objs, **kwargs)

    if not objs:
        return super().bulk_create(objs, **kwargs)

    clock = self.model.get_clock()

    for instance in objs:
        if not hasattr(instance, 'get_syncable_field_names'):
            continue

        now = clock.now()
        timestamps = dict(instance.sync_field_timestamps)

        for name in instance.get_syncable_field_names():
            if name not in timestamps:
                timestamps[name] = now

        instance.sync_field_timestamps = timestamps

        if not instance.sync_field_last_modified:
            instance.sync_field_last_modified = now

    return super().bulk_create(objs, **kwargs)

bulk_update

Source code in django_spire/contrib/sync/django/queryset.py
def bulk_update(
    self,
    objs: list[Any],
    fields: list[str] | tuple[str, ...],
    **kwargs: Any,
) -> int:
    if _is_bypassed():
        return super().bulk_update(objs, fields, **kwargs)

    if not objs:
        return super().bulk_update(objs, fields, **kwargs)

    clock = self.model.get_clock()
    needs_extra = False

    for instance in objs:
        if not hasattr(instance, '_sync_exclude_fields'):
            continue

        syncable = set(instance.get_syncable_field_names())
        dirty = syncable & set(fields)

        if not dirty:
            continue

        now = clock.now()
        timestamps = dict(instance.sync_field_timestamps)

        for name in dirty:
            timestamps[name] = now

        instance.sync_field_timestamps = timestamps
        instance.sync_field_last_modified = now
        needs_extra = True

    if needs_extra:
        sync_fields = {
            'sync_field_timestamps',
            'sync_field_last_modified',
        }

        fields = list(set(fields) | sync_fields)

    return super().bulk_update(objs, fields, **kwargs)

SyncServer

Source code in django_spire/contrib/sync/django/server.py
def __init__(
    self,
    models: list[type[SyncableMixin]],
    node_id: str,
    *,
    batch_bytes: int | None = BATCH_BYTES_DEFAULT,
    batch_size: int | None = None,
    clock: HybridLogicalClock | None = None,
    clock_drift_max: int | None = 300,
    graph: DependencyGraph | None = None,
    lock: SyncLock | None = None,
    on_complete: Callable[[DatabaseResult], None] | None = None,
    on_phase: Callable[[SyncPhase], None] | None = None,
    payload_bytes_max: int | None = None,
    payload_records_max: int | None = None,
    progress: Callable[[SyncStage, int, int], None] | None = None,
    resolver: ConflictResolver | None = None,
    storage: DatabaseSyncStorage | None = None,
    transaction_fn: Callable[[], AbstractContextManager[Any]] = transaction.atomic,
) -> None:
    self._engine = DatabaseEngine(
        batch_bytes=batch_bytes,
        batch_size=batch_size,
        clock=clock or SyncableMixin.get_clock(),
        clock_drift_max=clock_drift_max,
        graph=graph or build_graph(models),
        lock=lock or DjangoSyncLock(),
        node_id=node_id,
        on_complete=on_complete,
        on_phase=on_phase,
        payload_bytes_max=payload_bytes_max,
        payload_records_max=payload_records_max,
        progress=progress,
        reconciler=PayloadReconciler(
            resolver=resolver or FieldTimestampWins(),
        ),
        storage=storage or DjangoSyncStorage(models=models),
        transaction=transaction_fn,
    )

handle

Source code in django_spire/contrib/sync/django/server.py
def handle(
    self, incoming: SyncManifest,
) -> tuple[SyncManifest, DatabaseResult]:
    return self._engine.process(incoming)

serve

Source code in django_spire/contrib/sync/django/server.py
def serve(
    self,
    request: HttpRequest,
    validate_node_id: Callable[[HttpRequest, str], bool] | None = None,
) -> JsonResponse:
    return process_sync_request(
        request,
        self._engine,
        validate_node_id=validate_node_id,
    )

SyncableModelService

set_m2m staticmethod

Source code in django_spire/contrib/sync/django/service.py
@staticmethod
def set_m2m(
    instance: SyncableMixin,
    field_name: str,
    values: list[Any],
) -> None:
    if instance._state.adding:
        message = (
            f'Cannot set M2M field {field_name!r} before the '
            f'instance is saved. Call save() first.'
        )

        raise InvalidParameterError(message)

    getattr(instance, field_name).set(values)

DjangoSyncStorage

Source code in django_spire/contrib/sync/django/storage/facade.py
def __init__(
    self,
    models: list[type[SyncableMixin]],
    identity_field: str = 'id',
    batch_size_max: int = _BATCH_SIZE_MAX,
    checkpoint_store: CheckpointStore | None = None,
    record_reader: RecordReader | None = None,
    record_writer: RecordWriter | None = None,
) -> None:
    self._checkpoint_store = checkpoint_store or DjangoCheckpointStore()

    self._record_reader = record_reader or DjangoRecordReader(
        models=models,
        identity_field=identity_field,
    )

    self._record_writer = record_writer or DjangoRecordWriter(
        models=models,
        identity_field=identity_field,
        batch_size_max=batch_size_max,
    )

delete_many

Source code in django_spire/contrib/sync/django/storage/facade.py
def delete_many(
    self,
    model_label: str,
    deletes: dict[str, int],
) -> None:
    self._record_writer.delete_many(model_label, deletes)

get_after_keys

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_after_keys(self, node_id: str) -> dict[str, Any]:
    return self._checkpoint_store.get_after_keys(node_id)

get_changed_since

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_changed_since(
    self,
    model_label: str,
    timestamp: int,
    limit: int | None = None,
    after_key: str | None = None,
) -> dict[str, SyncRecord]:
    return self._record_reader.get_changed_since(
        model_label,
        timestamp,
        limit=limit,
        after_key=after_key,
    )

get_deletes_since

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_deletes_since(
    self,
    model_label: str,
    timestamp: int,
) -> dict[str, int]:
    return self._record_reader.get_deletes_since(
        model_label,
        timestamp,
    )

get_checkpoint

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_checkpoint(self, node_id: str) -> int:
    return self._checkpoint_store.get_checkpoint(node_id)

get_records

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_records(
    self,
    model_label: str,
    keys: set[str],
) -> dict[str, SyncRecord]:
    return self._record_reader.get_records(model_label, keys)

get_syncable_models

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_syncable_models(self) -> list[str]:
    return self._record_reader.get_syncable_models()

save_checkpoint

Source code in django_spire/contrib/sync/django/storage/facade.py
def save_checkpoint(
    self,
    node_id: str,
    timestamp: int,
    after_keys: dict[str, Any] | None = None,
) -> None:
    self._checkpoint_store.save_checkpoint(
        node_id,
        timestamp,
        after_keys=after_keys,
    )

upsert_many

Source code in django_spire/contrib/sync/django/storage/facade.py
def upsert_many(
    self,
    model_label: str,
    records: dict[str, SyncRecord],
) -> set[str]:
    return self._record_writer.upsert_many(model_label, records)

build_graph

Source code in django_spire/contrib/sync/django/graph.py
def build_graph(models: list[type[SyncableMixin]]) -> DependencyGraph:
    labels = {model._meta.label for model in models}

    edges: dict[str, set[str]] = {}

    for model in models:
        dependencies: set[str] = set()

        foreign_keys = chain(
            model._meta.concrete_fields,
            model._meta.many_to_many,
        )

        for field in foreign_keys:
            if not field.is_relation:
                continue

            related_label = field.related_model._meta.label

            if related_label in labels and related_label != model._meta.label:
                if not getattr(field, 'null', True):
                    dependencies.add(related_label)

        edges[model._meta.label] = dependencies

    return DependencyGraph(edges)

sync_bypass

Source code in django_spire/contrib/sync/django/queryset.py
@contextmanager
def sync_bypass() -> Iterator[None]:
    previous = getattr(_bypass, 'active', False)
    _bypass.active = True

    try:
        yield
    finally:
        _bypass.active = previous

seed_clock

Source code in django_spire/contrib/sync/django/seed.py
def seed_clock(
    clock: HybridLogicalClock,
    models: list[type[SyncableMixin]],
) -> None:
    high_water = 0

    for model in models:
        result = model.objects.aggregate(max_ts=Max('sync_field_last_modified'))
        timestamp = result['max_ts'] or 0

        high_water = max(high_water, timestamp)

    if high_water:
        clock.receive(high_water)
        logger.info('Seeded HLC from database: %d', high_water)

register_m2m_signals

Source code in django_spire/contrib/sync/django/signals.py
def register_m2m_signals(
    models: list[type[SyncableMixin]],
) -> None:
    from django_spire.contrib.sync.django.mixin import SyncableMixin  # noqa: PLC0415

    for model in models:
        if not issubclass(model, SyncableMixin):
            message = (
                f'Cannot register M2M signals for {model!r}: '
                f'must be a SyncableMixin subclass'
            )

            raise InvalidParameterError(message)

        for field in model._meta.many_to_many:
            through = field.remote_field.through
            dispatch_uid = f'syncable_m2m:{model._meta.label}:{field.name}'

            m2m_changed.connect(
                _on_m2m_changed,
                sender=through,
                dispatch_uid=dispatch_uid,
            )

process_sync_request

Source code in django_spire/contrib/sync/django/views.py
def process_sync_request(
    request: HttpRequest,
    engine: DatabaseEngine,
    request_bytes_max: int = _REQUEST_BYTES_MAX,
    validate_node_id: Callable[[HttpRequest, str], bool] | None = None,
) -> JsonResponse:
    if request_bytes_max < 1:
        message = (
            f'request_bytes_max must be >= 1, '
            f'got {request_bytes_max}'
        )

        raise InvalidParameterError(message)

    if request.method != 'POST':
        return JsonResponse(
            {'ok': False, 'error': 'Method not allowed'},
            status=405,
        )

    content_type = request.content_type or ''

    if content_type != 'application/json':
        return JsonResponse(
            {
                'ok': False,
                'error': 'Content-Type must be application/json',
            },
            status=415,
        )

    rejection = _reject_if_oversized_header(
        request, request_bytes_max,
    )

    if rejection is not None:
        return rejection

    body, error = _read_body(request, request_bytes_max)

    if error is not None:
        return error

    incoming, error = _parse_manifest(body)

    if error is not None:
        return error

    if validate_node_id is not None:
        if not validate_node_id(request, incoming.node_id):
            return JsonResponse(
                {
                    'ok': False,
                    'error': 'Node ID not authorized for this user',
                },
                status=403,
            )

    logger.info(
        'Received sync from node %s with %d payloads',
        incoming.node_id,
        len(incoming.payloads),
    )

    try:
        response, result = engine.process(incoming)
    except SyncAbortedError:
        logger.exception(
            'Sync aborted for node %s',
            incoming.node_id,
        )

        return JsonResponse(
            {'ok': False, 'error': 'Sync aborted'},
            status=409,
        )

    return JsonResponse({
        **response.to_dict(),
        'ok': result.ok,
        'errors': [
            {'key': error.key, 'message': error.message}
            for error in result.errors
        ],
    })