Skip to content

core

django_spire.contrib.sync.core

__all__ = ['BatchLimitError', 'BidirectionalResult', 'Change', 'CircularDependencyError', 'ClockDriftError', 'ClockNotConfiguredError', 'ClockOverflowError', 'ConflictStateError', 'DecompressionLimitError', 'DependencyGraph', 'Error', 'HybridLogicalClock', 'InvalidParameterError', 'InvalidResponseError', 'LockContentionError', 'ManifestChecksumError', 'ManifestFieldError', 'PayloadLimitError', 'RecordFieldError', 'RecordHasher', 'RecordSerializationError', 'ResolutionAction', 'Result', 'RetryExhaustedError', 'SyncAbortedError', 'SyncAction', 'SyncError', 'SyncPhase', 'SyncStage', 'SyncStatus', 'TransportRequiredError', 'UnknownDependencyError', 'UnknownModelError', 'retry'] module-attribute

HybridLogicalClock

Source code in django_spire/contrib/sync/core/clock.py
def __init__(self) -> None:
    self._last = 0
    self._lock = threading.Lock()

now

Source code in django_spire/contrib/sync/core/clock.py
def now(self) -> int:
    for _ in range(_SPINS_MAX):
        with self._lock:
            physical_time = self._physical()
            wall_old = self._last >> _COUNTER_BITS
            counter_old = self._last & _COUNTER_MASK

            if physical_time > wall_old:
                wall, counter = physical_time, 0
            else:
                wall, counter = wall_old, counter_old + 1

            if counter <= _COUNTER_MASK:
                previous = self._last

                self._last = (wall << _COUNTER_BITS) | counter

                if self._last <= previous:
                    message = (
                        f'HLC monotonicity violated: '
                        f'{self._last} <= {previous}'
                    )

                    raise ClockOverflowError(message)

                return self._last

        time.sleep(0.001)

    message = (
        f'HLC counter overflow: unable to advance '
        f'after {_SPINS_MAX} attempts'
    )

    raise ClockOverflowError(message)

receive

Source code in django_spire/contrib/sync/core/clock.py
def receive(self, remote: int) -> int:
    if remote < 0:
        message = (
            f'The remote timestamp must be non-negative, '
            f'got {remote}'
        )

        raise InvalidParameterError(message)

    for _ in range(_SPINS_MAX):
        with self._lock:
            physical_time = self._physical()
            wall_old = self._last >> _COUNTER_BITS
            counter_old = self._last & _COUNTER_MASK
            wall_remote = remote >> _COUNTER_BITS
            counter_remote = remote & _COUNTER_MASK

            wall = max(physical_time, wall_old, wall_remote)

            if wall == wall_old == wall_remote:
                counter = max(counter_old, counter_remote) + 1
            elif wall == wall_old:
                counter = counter_old + 1
            elif wall == wall_remote:
                counter = counter_remote + 1
            else:
                counter = 0

            if counter <= _COUNTER_MASK:
                previous = self._last
                self._last = (wall << _COUNTER_BITS) | counter

                if self._last <= previous:
                    message = (
                        f'HLC monotonicity violated: '
                        f'{self._last} <= {previous}'
                    )

                    raise ClockOverflowError(message)

                return self._last

        time.sleep(0.001)

    message = (
        f'HLC counter overflow: unable to advance '
        f'after {_SPINS_MAX} attempts'
    )
    raise ClockOverflowError(message)

update

Source code in django_spire/contrib/sync/core/clock.py
def update(self, remote: int) -> None:
    self.receive(remote)

ResolutionAction

Bases: StrEnum

USE_SOURCE = 'use_source' class-attribute instance-attribute

USE_TARGET = 'use_target' class-attribute instance-attribute

SKIP = 'skip' class-attribute instance-attribute

SyncAction

Bases: StrEnum

CREATED = 'created' class-attribute instance-attribute

DEACTIVATED = 'deactivated' class-attribute instance-attribute

UNCHANGED = 'unchanged' class-attribute instance-attribute

UPDATED = 'updated' class-attribute instance-attribute

SyncPhase

Bases: StrEnum

COLLECTING = 'collecting' class-attribute instance-attribute

COMMITTING = 'committing' class-attribute instance-attribute

COMPLETE = 'complete' class-attribute instance-attribute

EXCHANGING = 'exchanging' class-attribute instance-attribute

FAILED = 'failed' class-attribute instance-attribute

RECONCILING = 'reconciling' class-attribute instance-attribute

SyncStage

Bases: StrEnum

CLASSIFY = 'classify' class-attribute instance-attribute

CALLBACK = 'callback' class-attribute instance-attribute

MUTATE = 'mutate' class-attribute instance-attribute

VALIDATE = 'validate' class-attribute instance-attribute

SyncStatus

Bases: StrEnum

ABANDONED = 'abandoned' class-attribute instance-attribute

ERROR = 'error' class-attribute instance-attribute

FAILURE = 'failure' class-attribute instance-attribute

IN_PROGRESS = 'in_progress' class-attribute instance-attribute

PENDING = 'pending' class-attribute instance-attribute

SUCCESS = 'success' class-attribute instance-attribute

BatchLimitError

Bases: SyncError

Batch size exceeds the configured maximum.

CircularDependencyError

Bases: SyncError

Dependency graph contains a cycle.

ClockDriftError

Bases: SyncAbortedError

Remote node clock exceeds the allowed drift threshold.

ClockNotConfiguredError

Bases: SyncError

SyncableMixin clock was not configured at startup.

ClockOverflowError

Bases: SyncError

HLC counter overflowed or monotonicity was violated.

ConflictStateError

Bases: SyncError

Conflict resolver received an invalid or incomplete conflict.

DecompressionLimitError

Bases: SyncError

Decompressed data exceeds the allowed size limit.

InvalidParameterError

Bases: SyncError

Constructor or function received an invalid argument.

InvalidResponseError

Bases: SyncAbortedError

Remote node returned a malformed or oversized response.

LockContentionError

Bases: SyncAbortedError

Another sync session is already running for this node.

ManifestChecksumError

Bases: SyncAbortedError

Manifest checksum verification failed.

ManifestFieldError

Bases: SyncError

Manifest contains missing or invalid fields.

PayloadLimitError

Bases: SyncAbortedError

Collected payload exceeds the configured size or record limit.

RecordFieldError

Bases: SyncError

Record contains missing or invalid fields.

RecordSerializationError

Bases: SyncError

Record contains a value that cannot be serialized to JSON.

RetryExhaustedError

Bases: SyncAbortedError

Retry helper exhausted all configured attempts.

SyncAbortedError

Bases: SyncError

Sync aborted due to a recoverable failure.

SyncError

Bases: Exception

Base exception for all sync operations.

TransportRequiredError

Bases: SyncError

Engine requires a transport for client-side sync.

UnknownDependencyError

Bases: SyncError

Model declares a dependency on an unregistered model.

UnknownModelError

Bases: SyncError

Model label does not match any registered syncable model.

DependencyGraph

Source code in django_spire/contrib/sync/core/graph.py
def __init__(self, edges: dict[str, set[str]]) -> None:
    for label in edges:
        if not label:
            message = 'edges must not contain empty labels'
            raise InvalidParameterError(message)

    self._edges = {
        label: set(dependencies)
        for label, dependencies in edges.items()
    }

    all_labels = set(self._edges)

    for label, dependencies in self._edges.items():
        unknown = dependencies - all_labels

        if unknown:
            message = (
                f'Model {label!r} declares dependencies on '
                f'unknown models: {unknown}'
            )

            raise UnknownDependencyError(message)

    self._dependents: dict[str, set[str]] = {
        label: set() for label in self._edges
    }

    for label, dependencies in self._edges.items():
        for dep in dependencies:
            self._dependents[dep].add(label)

    self._order = self._compute_order()

dependencies

Source code in django_spire/contrib/sync/core/graph.py
def dependencies(self, label: str) -> set[str]:
    return set(self._edges.get(label, set()))

known_models

Source code in django_spire/contrib/sync/core/graph.py
def known_models(self) -> frozenset[str]:
    return frozenset(self._edges)

sync_order

Source code in django_spire/contrib/sync/core/graph.py
def sync_order(self) -> list[str]:
    return list(self._order)

RecordHasher

Source code in django_spire/contrib/sync/core/hash.py
def __init__(
    self,
    identity_field: str,
    compare_fields: list[str] | None = None,
) -> None:
    if not identity_field:
        message = 'identity_field must be a non-empty string'
        raise InvalidParameterError(message)

    if compare_fields is not None:
        for field_name in compare_fields:
            if not field_name:
                message = (
                    'compare_fields must not contain '
                    'empty strings'
                )

                raise InvalidParameterError(message)

    self._compare_fields = compare_fields
    self._identity_field = identity_field
    self._schema_tag = self._compute_schema_tag()

hash

Source code in django_spire/contrib/sync/core/hash.py
def hash(self, record: dict[str, Any]) -> str:
    data = self._canonical(record)
    body = hashlib.sha256(data).hexdigest()

    return f'{self._schema_tag}:{body}'

BidirectionalResult dataclass

conflicts = field(default_factory=dict) class-attribute instance-attribute

changes = field(default_factory=dict) class-attribute instance-attribute

errors = field(default_factory=list) class-attribute instance-attribute

source_created = field(default_factory=list) class-attribute instance-attribute

source_deactivated = field(default_factory=list) class-attribute instance-attribute

source_updated = field(default_factory=list) class-attribute instance-attribute

target_created = field(default_factory=list) class-attribute instance-attribute

target_deactivated = field(default_factory=list) class-attribute instance-attribute

target_updated = field(default_factory=list) class-attribute instance-attribute

unchanged = field(default_factory=list) class-attribute instance-attribute

ok property

Change dataclass

old instance-attribute

new instance-attribute

diff property

Error dataclass

key instance-attribute

message instance-attribute

exception = None class-attribute instance-attribute

Result dataclass

created = field(default_factory=list) class-attribute instance-attribute

deactivated = field(default_factory=list) class-attribute instance-attribute

errors = field(default_factory=list) class-attribute instance-attribute

unchanged = field(default_factory=list) class-attribute instance-attribute

updated = field(default_factory=list) class-attribute instance-attribute

changes = field(default_factory=dict) class-attribute instance-attribute

ok property