Skip to content

database

django_spire.contrib.sync.database

__all__ = ['ConflictEntry', 'ConflictResolver', 'ConflictType', 'DatabaseEngine', 'DatabaseResult', 'DatabaseSyncStorage', 'DependencyGraph', 'FieldConflict', 'FieldOwnershipWins', 'FieldTimestampWins', 'FieldUpdateTracker', 'LocalWins', 'ModelPayload', 'PayloadReconciler', 'ReconciliationResult', 'RecordConflict', 'RecordResolution', 'RemoteWins', 'ResolutionSource', 'SyncLock', 'SyncManifest', 'SyncRecord'] module-attribute

ConflictResolver

Bases: Protocol

resolve

Source code in django_spire/contrib/sync/database/conflict.py
def resolve(
    self, conflict: RecordConflict,
) -> RecordResolution: ...

ConflictType

Bases: StrEnum

BOTH_MODIFIED = 'both_modified' class-attribute instance-attribute

COMPATIBLE = 'compatible' class-attribute instance-attribute

DELETE_VS_MODIFY = 'delete_vs_modify' class-attribute instance-attribute

MODIFY_VS_DELETE = 'modify_vs_delete' class-attribute instance-attribute

FieldConflict dataclass

field_name instance-attribute

local_value instance-attribute

remote_value instance-attribute

local_timestamp instance-attribute

remote_timestamp instance-attribute

FieldOwnershipWins

Source code in django_spire/contrib/sync/database/conflict.py
def __init__(
    self,
    local_fields: set[str],
    remote_fields: set[str],
    exclude_fields: set[str] | None = None,
    prefer_remote_on_tie: bool = False,
) -> None:
    overlap = local_fields & remote_fields

    if overlap:
        message = (
            f'local_fields and remote_fields must not '
            f'overlap: {overlap}'
        )

        raise InvalidParameterError(message)

    self._exclude = frozenset(exclude_fields or set()) | META_FIELDS
    self._local_fields = frozenset(local_fields)
    self._prefer_remote_on_tie = prefer_remote_on_tie
    self._remote_fields = frozenset(remote_fields)

resolve

Source code in django_spire/contrib/sync/database/conflict.py
def resolve(
    self,
    conflict: RecordConflict,
) -> RecordResolution:
    if conflict.conflict_type == ConflictType.DELETE_VS_MODIFY:
        return RecordResolution(
            record=_require_local(conflict),
            source=ResolutionSource.LOCAL,
        )

    if conflict.conflict_type == ConflictType.MODIFY_VS_DELETE:
        return RecordResolution(
            record=_require_remote(conflict),
            source=ResolutionSource.REMOTE,
        )

    local, remote = _require_both(conflict)

    return _merge_fields(
        local,
        remote,
        conflict,
        self._exclude,
        self._prefer_remote_on_tie,
        local_fields=self._local_fields,
        remote_fields=self._remote_fields,
    )

FieldTimestampWins

Source code in django_spire/contrib/sync/database/conflict.py
def __init__(
    self,
    exclude_fields: set[str] | None = None,
    prefer_remote_on_tie: bool = False,
) -> None:
    self._exclude = frozenset(exclude_fields or set()) | META_FIELDS
    self._prefer_remote_on_tie = prefer_remote_on_tie

resolve

Source code in django_spire/contrib/sync/database/conflict.py
def resolve(
    self,
    conflict: RecordConflict,
) -> RecordResolution:
    if conflict.conflict_type == ConflictType.DELETE_VS_MODIFY:
        return RecordResolution(
            record=_require_local(conflict),
            source=ResolutionSource.LOCAL,
        )

    if conflict.conflict_type == ConflictType.MODIFY_VS_DELETE:
        return RecordResolution(
            record=_require_remote(conflict),
            source=ResolutionSource.REMOTE,
        )

    local, remote = _require_both(conflict)

    return _merge_fields(
        local,
        remote,
        conflict,
        self._exclude,
        self._prefer_remote_on_tie,
    )

LocalWins

resolve

Source code in django_spire/contrib/sync/database/conflict.py
def resolve(
    self,
    conflict: RecordConflict,
) -> RecordResolution:
    if conflict.conflict_type == ConflictType.MODIFY_VS_DELETE:
        return RecordResolution(
            record=None,
            source=ResolutionSource.LOCAL,
            delete=True,
        )

    return RecordResolution(
        record=_require_local(conflict),
        source=ResolutionSource.LOCAL,
        field_conflicts=conflict.field_conflicts,
    )

RecordConflict dataclass

key instance-attribute

model_label instance-attribute

conflict_type instance-attribute

field_conflicts = field(default_factory=list) class-attribute instance-attribute

local = None class-attribute instance-attribute

remote = None class-attribute instance-attribute

RecordResolution dataclass

record instance-attribute

source instance-attribute

delete = False class-attribute instance-attribute

field_conflicts = field(default_factory=list) class-attribute instance-attribute

RemoteWins

resolve

Source code in django_spire/contrib/sync/database/conflict.py
def resolve(
    self,
    conflict: RecordConflict,
) -> RecordResolution:
    if conflict.conflict_type == ConflictType.DELETE_VS_MODIFY:
        return RecordResolution(
            record=None,
            source=ResolutionSource.REMOTE,
            delete=True,
        )

    return RecordResolution(
        record=_require_remote(conflict),
        source=ResolutionSource.REMOTE,
        field_conflicts=conflict.field_conflicts,
    )

ResolutionSource

Bases: StrEnum

LOCAL = 'local' class-attribute instance-attribute

MERGED = 'merged' class-attribute instance-attribute

REMOTE = 'remote' class-attribute instance-attribute

DatabaseEngine

Source code in django_spire/contrib/sync/database/engine.py
def __init__(
    self,
    storage: DatabaseSyncStorage,
    graph: DependencyGraph,
    clock: HybridLogicalClock,
    node_id: str,
    *,
    batch_bytes: int | None = BATCH_BYTES_DEFAULT,
    batch_size: int | None = None,
    clock_drift_max: int | None = CLOCK_DRIFT_MAX_DEFAULT,
    identity_field: str = 'id',
    lock: SyncLock | None = None,
    on_complete: Callable[[DatabaseResult], None] | None = None,
    on_phase: Callable[[SyncPhase], None] | None = None,
    payload_bytes_max: int | None = None,
    payload_records_max: int | None = None,
    progress: Callable[[SyncStage, int, int], None] | None = None,
    reconciler: PayloadReconciler | None = None,
    transaction: Callable[[], AbstractContextManager[Any]] = nullcontext,
    transport: Transport | None = None,
) -> None:
    if not node_id:
        message = 'node_id must be a non-empty string'
        raise InvalidParameterError(message)

    if not identity_field:
        message = 'identity_field must be a non-empty string'
        raise InvalidParameterError(message)

    if batch_bytes is not None and batch_bytes < 1:
        message = (
            f'batch_bytes must be >= 1 '
            f'or None, got {batch_bytes}'
        )

        raise InvalidParameterError(message)

    if batch_size is not None and batch_size < 1:
        message = (
            f'batch_size must be >= 1 '
            f'or None, got {batch_size}'
        )

        raise InvalidParameterError(message)

    if clock_drift_max is not None and clock_drift_max < 0:
        message = (
            f'clock_drift_max must be non-negative '
            f'or None, got {clock_drift_max}'
        )

        raise InvalidParameterError(message)

    if payload_bytes_max is not None and payload_bytes_max < 1:
        message = (
            f'payload_bytes_max must be >= 1 '
            f'or None, got {payload_bytes_max}'
        )

        raise InvalidParameterError(message)

    if payload_records_max is not None and payload_records_max < 1:
        message = (
            f'payload_records_max must be >= 1 '
            f'or None, got {payload_records_max}'
        )

        raise InvalidParameterError(message)

    self._batch_bytes = batch_bytes
    self._batch_size = batch_size
    self._clock = clock
    self._clock_drift_max = clock_drift_max
    self._graph = graph
    self._identity_field = identity_field
    self._lock = lock
    self._node_id = node_id
    self._on_complete = on_complete
    self._on_phase = on_phase
    self._payload_bytes_max = payload_bytes_max
    self._payload_records_max = payload_records_max
    self._progress = progress
    self._reconciler = reconciler or PayloadReconciler()
    self._storage = storage
    self._transaction = transaction
    self._transport = transport

process

Source code in django_spire/contrib/sync/database/engine.py
def process(
    self,
    incoming: SyncManifest,
) -> tuple[SyncManifest, DatabaseResult]:
    result = DatabaseResult()

    self._validate_manifest(incoming)
    self._validate_clock(incoming)

    valid_payloads = self._validate_incoming_models(
        incoming, result,
    )

    with self._transaction():
        if self._lock:
            self._lock.hold(self._node_id)

        now = self._clock.now()

        response_payloads, has_more, after_keys = self._apply_incoming(
            valid_payloads,
            incoming.checkpoint,
            result,
            received_at=now,
            records_max=self._batch_size,
            bytes_max=self._batch_bytes,
            after_keys=incoming.after_keys,
        )

    checkpoint_value = self._max_response_checkpoint(
        incoming.checkpoint,
        response_payloads,
    )

    has_activity = (
        any(p.records or p.deletes for p in valid_payloads)
        or any(p.records or p.deletes for p in response_payloads)
    )

    if not has_more and not has_activity:
        checkpoint_value = max(checkpoint_value, now)

    response = SyncManifest(
        node_id=self._node_id,
        checkpoint=checkpoint_value,
        after_keys=after_keys,
        node_time=int(time.time()),
        payloads=response_payloads,
        has_more=has_more,
    )

    response.checksum = response.compute_checksum()

    self._finalize(result)

    return response, result

sync

Source code in django_spire/contrib/sync/database/engine.py
def sync(self, dry_run: bool = False) -> DatabaseResult:
    if self._transport is None:
        message = (
            'Transport is required for sync(). '
            'Use process() for server-side.'
        )

        raise TransportRequiredError(message)

    result = DatabaseResult()

    persisted = self._storage.get_after_keys(self._node_id)

    server_cursors: dict[str, Any] = {
        k.removeprefix('server:'): v
        for k, v in persisted.items()
        if k.startswith('server:')
    }

    collect_cursors: dict[str, Any] = {
        k.removeprefix('collect:'): v
        for k, v in persisted.items()
        if k.startswith('collect:')
    }

    with self._managed_session(result) as session_id:
        while True:
            self._enter_phase(
                SyncPhase.COLLECTING,
                session_id,
                SyncStage.VALIDATE,
            )

            checkpoint = self._storage.get_checkpoint(
                self._node_id,
            )

            manifest = self._collect(
                checkpoint,
                limit=self._batch_size,
                bytes_limit=self._batch_bytes,
                after_keys=collect_cursors,
            )

            manifest.after_keys = server_cursors
            manifest.checksum = manifest.compute_checksum()

            sent_snapshot = self._extract_record_snapshot(
                manifest,
            )

            self._record_pushed(manifest, result)

            self._enter_phase(
                SyncPhase.EXCHANGING, session_id,
                SyncStage.CLASSIFY,
            )

            response = self._exchange_and_validate(manifest)

            received_snapshot = self._extract_record_snapshot(
                response,
            )

            self._enter_phase(
                SyncPhase.RECONCILING, session_id,
                SyncStage.MUTATE,
            )

            if manifest.has_more:
                collect_cursors = {}

                for payload in manifest.payloads:
                    cursor = _last_cursor(payload.records)

                    if cursor:
                        collect_cursors[payload.model_label] = cursor
            else:
                collect_cursors = {}

            if response.has_more:
                server_cursors = response.after_keys
            else:
                server_cursors = {}

            if not dry_run:
                self._enter_phase(
                    SyncPhase.COMMITTING,
                    session_id,
                )

                self._commit(
                    checkpoint, response,
                    sent_snapshot, received_snapshot,
                    result,
                    server_cursors=server_cursors,
                    collect_cursors=collect_cursors,
                )

            converged = (
                not manifest.has_more
                and not response.has_more
            )

            if dry_run:
                break

            if converged:
                exchanged = (
                    any(p.records or p.deletes for p in manifest.payloads)
                    or any(p.records or p.deletes for p in response.payloads)
                )

                if not exchanged:
                    break

        self._enter_phase(SyncPhase.COMPLETE, session_id)
        self._finalize(result)

    self._log_sync_summary(result)

    return result

DependencyGraph

Source code in django_spire/contrib/sync/core/graph.py
def __init__(self, edges: dict[str, set[str]]) -> None:
    for label in edges:
        if not label:
            message = 'edges must not contain empty labels'
            raise InvalidParameterError(message)

    self._edges = {
        label: set(dependencies)
        for label, dependencies in edges.items()
    }

    all_labels = set(self._edges)

    for label, dependencies in self._edges.items():
        unknown = dependencies - all_labels

        if unknown:
            message = (
                f'Model {label!r} declares dependencies on '
                f'unknown models: {unknown}'
            )

            raise UnknownDependencyError(message)

    self._dependents: dict[str, set[str]] = {
        label: set() for label in self._edges
    }

    for label, dependencies in self._edges.items():
        for dep in dependencies:
            self._dependents[dep].add(label)

    self._order = self._compute_order()

dependencies

Source code in django_spire/contrib/sync/core/graph.py
def dependencies(self, label: str) -> set[str]:
    return set(self._edges.get(label, set()))

known_models

Source code in django_spire/contrib/sync/core/graph.py
def known_models(self) -> frozenset[str]:
    return frozenset(self._edges)

sync_order

Source code in django_spire/contrib/sync/core/graph.py
def sync_order(self) -> list[str]:
    return list(self._order)

SyncLock

Bases: Protocol

acquire

Source code in django_spire/contrib/sync/database/lock.py
def acquire(self, node_id: str) -> str: ...

hold

Source code in django_spire/contrib/sync/database/lock.py
def hold(self, node_id: str) -> None: ...

release

Source code in django_spire/contrib/sync/database/lock.py
def release(self, session_id: str, status: SyncStatus, result: DatabaseResult | None = None) -> None: ...

update_phase

Source code in django_spire/contrib/sync/database/lock.py
def update_phase(self, session_id: str, phase: SyncPhase) -> None: ...

ConflictEntry dataclass

conflict instance-attribute

resolution_source instance-attribute

DatabaseResult dataclass

applied = field(default_factory=(lambda: defaultdict(list))) class-attribute instance-attribute

compatible = field(default_factory=(lambda: defaultdict(list))) class-attribute instance-attribute

conflict_log = field(default_factory=list) class-attribute instance-attribute

conflicts = field(default_factory=(lambda: defaultdict(list))) class-attribute instance-attribute

created = field(default_factory=(lambda: defaultdict(list))) class-attribute instance-attribute

deleted = field(default_factory=(lambda: defaultdict(list))) class-attribute instance-attribute

errors = field(default_factory=list) class-attribute instance-attribute

pushed = field(default_factory=(lambda: defaultdict(list))) class-attribute instance-attribute

skipped = field(default_factory=(lambda: defaultdict(list))) class-attribute instance-attribute

ok property

ModelPayload dataclass

model_label instance-attribute

records = field(default_factory=dict) class-attribute instance-attribute

deletes = field(default_factory=dict) class-attribute instance-attribute

from_dict classmethod

Source code in django_spire/contrib/sync/database/manifest.py
@classmethod
def from_dict(
    cls, data: dict[str, Any],
) -> ModelPayload:
    model_label = data.get('model_label')

    if model_label is None:
        message = "ModelPayload requires 'model_label'"
        raise ManifestFieldError(message)

    if not isinstance(model_label, str):
        message = "'model_label' must be a string"
        raise ManifestFieldError(message)

    if not model_label:
        message = "'model_label' must be a non-empty string"
        raise ManifestFieldError(message)

    raw_records = data.get('records', {})

    if not isinstance(raw_records, dict):
        message = "'records' must be a dict"
        raise ManifestFieldError(message)

    raw_deletes = data.get('deletes', {})

    if not isinstance(raw_deletes, dict):
        message = "'deletes' must be a dict"
        raise ManifestFieldError(message)

    deletes: dict[str, int] = {}

    for key, tombstone in raw_deletes.items():
        if not isinstance(key, str):
            message = f'delete key {key!r} must be a string'
            raise ManifestFieldError(message)

        if (
            not isinstance(tombstone, int)
            or isinstance(tombstone, bool)
        ):
            message = (
                f"delete tombstone for {key!r} must be "
                f"an integer, got "
                f"{type(tombstone).__name__}"
            )

            raise ManifestFieldError(message)

        if tombstone < 0:
            message = (
                f"delete tombstone for {key!r} must be "
                f"non-negative, got {tombstone}"
            )

            raise ManifestFieldError(message)

        if key in raw_records:
            message = (
                f"key {key!r} present in both "
                f"'records' and 'deletes'"
            )

            raise ManifestFieldError(message)

        deletes[key] = tombstone

    return cls(
        model_label=model_label,
        records={
            key: SyncRecord.from_dict(key, value)
            for key, value in raw_records.items()
        },
        deletes=deletes,
    )

to_dict

Source code in django_spire/contrib/sync/database/manifest.py
def to_dict(self) -> dict[str, Any]:
    return {
        'deletes': dict(sorted(self.deletes.items())),
        'model_label': self.model_label,
        'records': {
            key: record.to_dict()
            for key, record in self.records.items()
        },
    }

SyncManifest dataclass

node_id instance-attribute

checkpoint instance-attribute

after_keys = field(default_factory=dict) class-attribute instance-attribute

checksum = '' class-attribute instance-attribute

has_more = False class-attribute instance-attribute

node_time = 0 class-attribute instance-attribute

payloads = field(default_factory=list) class-attribute instance-attribute

compute_checksum

Source code in django_spire/contrib/sync/database/manifest.py
def compute_checksum(self) -> str:
    body = json.dumps(
        self._serializable(),
        sort_keys=True,
        ensure_ascii=True,
    ).encode('utf-8')

    return hashlib.sha256(body).hexdigest()

verify

Source code in django_spire/contrib/sync/database/manifest.py
def verify(self) -> bool:
    if not self.checksum:
        return False

    return self.checksum == self.compute_checksum()

from_dict classmethod

Source code in django_spire/contrib/sync/database/manifest.py
@classmethod
def from_dict(
    cls, data: dict[str, Any],
) -> SyncManifest:
    node_id = data.get('node_id')
    checkpoint = data.get('checkpoint')

    if node_id is None:
        message = "SyncManifest requires 'node_id'"
        raise ManifestFieldError(message)

    if not isinstance(node_id, str):
        message = "'node_id' must be a string"
        raise ManifestFieldError(message)

    if not node_id:
        message = "'node_id' must be a non-empty string"
        raise ManifestFieldError(message)

    if checkpoint is None:
        message = "SyncManifest requires 'checkpoint'"
        raise ManifestFieldError(message)

    if (
        not isinstance(checkpoint, int)
        or isinstance(checkpoint, bool)
    ):
        message = "'checkpoint' must be an integer"
        raise ManifestFieldError(message)

    if checkpoint < 0:
        message = (
            f"'checkpoint' must be non-negative, "
            f"got {checkpoint}"
        )

        raise ManifestFieldError(message)

    node_time = data.get('node_time', 0)

    if (
        not isinstance(node_time, int)
        or isinstance(node_time, bool)
    ):
        message = "'node_time' must be an integer"
        raise ManifestFieldError(message)

    if node_time < 0:
        message = (
            f"'node_time' must be non-negative, "
            f"got {node_time}"
        )

        raise ManifestFieldError(message)

    after_keys = data.get('after_keys', {})
    has_more = data.get('has_more', False)

    raw_payloads = data.get('payloads', [])

    if not isinstance(raw_payloads, list):
        message = "'payloads' must be a list"
        raise ManifestFieldError(message)

    if len(raw_payloads) > _PAYLOADS_MAX:
        message = (
            f"'payloads' exceeds maximum of "
            f"{_PAYLOADS_MAX}"
        )

        raise ManifestFieldError(message)

    seen_labels: set[str] = set()
    payloads: list[ModelPayload] = []

    for raw_payload in raw_payloads:
        payload = ModelPayload.from_dict(raw_payload)

        if payload.model_label in seen_labels:
            message = (
                f"duplicate model_label: "
                f"{payload.model_label!r}"
            )

            raise ManifestFieldError(message)

        seen_labels.add(payload.model_label)
        payloads.append(payload)

    return cls(
        node_id=node_id,
        checkpoint=checkpoint,
        after_keys=after_keys if isinstance(after_keys, dict) else {},
        checksum=data.get('checksum', ''),
        has_more=has_more,
        node_time=node_time,
        payloads=payloads,
    )

to_dict

Source code in django_spire/contrib/sync/database/manifest.py
def to_dict(self) -> dict[str, Any]:
    result = self._serializable()
    result['after_keys'] = self.after_keys
    result['checksum'] = self.checksum or self.compute_checksum()
    result['has_more'] = self.has_more

    return result

PayloadReconciler

Source code in django_spire/contrib/sync/database/reconciler.py
def __init__(
    self,
    resolver: ConflictResolver | None = None,
) -> None:
    self._resolver = resolver or FieldTimestampWins()

reconcile

Source code in django_spire/contrib/sync/database/reconciler.py
def reconcile(
    self,
    payload: ModelPayload,
    local_records: dict[str, SyncRecord],
    checkpoint: int,
) -> ReconciliationResult:
    if checkpoint < 0:
        message = (
            f'checkpoint must be non-negative, '
            f'got {checkpoint}'
        )

        raise InvalidParameterError(message)

    result = ReconciliationResult()

    for key, remote in payload.records.items():
        self._classify_record(
            key,
            remote,
            payload.model_label,
            local_records,
            checkpoint,
            result,
        )

    self._classify_deletes(payload, local_records, result)

    return result

ReconciliationResult dataclass

applied_keys = field(default_factory=set) class-attribute instance-attribute

compatible_keys = field(default_factory=list) class-attribute instance-attribute

conflict_keys = field(default_factory=list) class-attribute instance-attribute

conflict_log = field(default_factory=list) class-attribute instance-attribute

created_keys = field(default_factory=set) class-attribute instance-attribute

errors = field(default_factory=list) class-attribute instance-attribute

response_records = field(default_factory=dict) class-attribute instance-attribute

to_delete = field(default_factory=dict) class-attribute instance-attribute

to_upsert = field(default_factory=dict) class-attribute instance-attribute

SyncRecord dataclass

key instance-attribute

data instance-attribute

timestamps instance-attribute

received_at = field(default=0, compare=False) class-attribute instance-attribute

sync_field_last_modified property

__post_init__

Source code in django_spire/contrib/sync/database/record.py
def __post_init__(self) -> None:
    if self.key == '':
        message = 'SyncRecord key must be a non-empty string'
        raise InvalidParameterError(message)

    if self.received_at < 0:
        message = (
            f'received_at must be non-negative, '
            f'got {self.received_at}'
        )

        raise InvalidParameterError(message)

from_dict classmethod

Source code in django_spire/contrib/sync/database/record.py
@classmethod
def from_dict(cls, key: str, data: dict[str, Any]) -> SyncRecord:
    if key == '':
        message = 'SyncRecord key must be a non-empty string'
        raise RecordFieldError(message)

    record_data = data.get('data', {})
    record_timestamps = data.get('timestamps', {})
    record_received_at = data.get('received_at', 0)

    if not isinstance(record_data, dict):
        message = (
            f"Record {key!r}: 'data' must be a dict, "
            f'got {type(record_data).__name__}'
        )

        raise RecordFieldError(message)

    if not isinstance(record_timestamps, dict):
        message = (
            f"Record {key!r}: 'timestamps' must be a "
            f'dict, got {type(record_timestamps).__name__}'
        )

        raise RecordFieldError(message)

    if (
        not isinstance(record_received_at, int)
        or isinstance(record_received_at, bool)
    ):
        message = (
            f"Record {key!r}: 'received_at' must be an int, "
            f'got {type(record_received_at).__name__}'
        )

        raise RecordFieldError(message)

    if record_received_at < 0:
        message = (
            f"Record {key!r}: 'received_at' must be "
            f'non-negative, got {record_received_at}'
        )

        raise RecordFieldError(message)

    for ts_key, ts_value in record_timestamps.items():
        if not isinstance(ts_key, str):
            message = (
                f'Record {key!r}: timestamp key '
                f'{ts_key!r} must be a string'
            )

            raise RecordFieldError(message)

        if not isinstance(ts_value, int) or isinstance(ts_value, bool):
            message = (
                f'Record {key!r}: timestamp for '
                f'{ts_key!r} must be an int, '
                f'got {type(ts_value).__name__}'
            )

            raise RecordFieldError(message)

        if ts_value < 0:
            message = (
                f'Record {key!r}: timestamp for '
                f'{ts_key!r} must be non-negative, '
                f'got {ts_value}'
            )

            raise RecordFieldError(message)

    return cls(
        key=key,
        data=record_data,
        timestamps=record_timestamps,
        received_at=record_received_at,
    )

to_dict

Source code in django_spire/contrib/sync/database/record.py
def to_dict(self) -> dict[str, Any]:
    return {
        'data': self.data,
        'timestamps': self.timestamps,
    }

DatabaseSyncStorage

FieldUpdateTracker

Source code in django_spire/contrib/sync/database/tracker.py
def __init__(self) -> None:
    self._original: dict[str, Any] = {}

get_dirty

Source code in django_spire/contrib/sync/database/tracker.py
def get_dirty(self, current: dict[str, Any]) -> set[str]:
    dirty: set[str] = set()

    for key, value in current.items():
        if key not in self._original or self._original[key] != value:
            dirty.add(key)

    return dirty

snapshot

Source code in django_spire/contrib/sync/database/tracker.py
def snapshot(self, fields: dict[str, Any]) -> None:
    self._original = {
        key: copy.deepcopy(value) if isinstance(value, _MUTABLE_TYPES) else value
        for key, value in fields.items()
    }