Skip to content

file

django_spire.contrib.sync.file

__all__ = ['BidirectionalEngine', 'BidirectionalStorage', 'Conflict', 'ConflictStrategy', 'Engine', 'FileSyncAbortedError', 'FileSyncArchiveError', 'FileSyncConfig', 'FileSyncConfigError', 'FileSyncConflictError', 'FileSyncError', 'FileSyncParameterError', 'FileSyncParseError', 'FileSyncSourceNotFoundError', 'LastWriteWins', 'Resolution', 'SourceWins', 'Storage', 'TargetWins'] module-attribute

BidirectionalEngine

Source code in django_spire/contrib/sync/file/bidirectional.py
def __init__(
    self,
    storage: BidirectionalStorage,
    identity_field: str,
    compare_fields: list[str] | None = None,
    conflict_strategy: ConflictStrategy | None = None,
    deactivation_threshold: float | None = None,
    transaction: Callable[[], AbstractContextManager[Any]] | None = None,
    on_complete: Callable[[BidirectionalResult], None] | None = None,
    progress: Callable[[SyncStage, int, int], None] | None = None,
) -> None:
    if not identity_field:
        message = 'identity_field must be a non-empty string'
        raise FileSyncParameterError(message)

    if deactivation_threshold is not None and deactivation_threshold < 0.0:
        message = (
            f'deactivation_threshold must be non-negative '
            f'or None, got {deactivation_threshold}'
        )

        raise FileSyncParameterError(message)

    self._identity_field = identity_field
    self._conflict_strategy = conflict_strategy or SourceWins()
    self._deactivation_threshold = deactivation_threshold
    self._on_complete = on_complete
    self._progress = progress
    self._storage = storage
    self._transaction = transaction or nullcontext
    self._hasher = RecordHasher(identity_field, compare_fields)

sync

Source code in django_spire/contrib/sync/file/bidirectional.py
def sync(
    self,
    file_path: str | Path,
    reader: Reader,
    writer: Writer,
    dry_run: bool = False,
) -> BidirectionalResult:
    file_path = Path(file_path)
    result = BidirectionalResult()

    snapshot = self._collect(file_path, reader, result)

    classified = self._classify(
        snapshot.source,
        snapshot.target,
        snapshot.baseline_hashes,
        snapshot.source_hashes,
        snapshot.target_hashes,
    )

    check_deactivation_threshold(
        self._deactivation_threshold,
        len(snapshot.target),
        len(classified.target_deactivations),
    )

    source_timestamp = self._get_file_timestamp(file_path)

    target_timestamps = (
        self._storage.get_timestamps(set(classified.conflict_keys))
        if classified.conflict_keys
        else {}
    )

    resolved = self._resolve_conflicts(
        classified.conflict_keys,
        snapshot.source, snapshot.target,
        source_timestamp, target_timestamps, result,
    )

    self._apply_resolutions(
        resolved, classified, snapshot.source, snapshot.target,
    )

    self._populate_result(result, classified, resolved)

    if dry_run:
        return result

    self._commit(
        snapshot, classified, resolved,
        file_path, writer, result,
    )

    self._finalize(result)

    return result

FileSyncConfig dataclass

model_label instance-attribute

identity_field instance-attribute

scope_field instance-attribute

filename instance-attribute

fields instance-attribute

conflict_strategy = field(default_factory=SourceWins) class-attribute instance-attribute

deactivation_threshold = None class-attribute instance-attribute

timestamp_field = 'modified_datetime' class-attribute instance-attribute

field_keys property

__post_init__

Source code in django_spire/contrib/sync/file/config.py
def __post_init__(self) -> None:
    if not self.model_label:
        message = 'model_label must not be empty'
        raise FileSyncParameterError(message)

    if not self.identity_field:
        message = 'identity_field must not be empty'
        raise FileSyncParameterError(message)

    if not self.scope_field:
        message = 'scope_field must not be empty'
        raise FileSyncParameterError(message)

    if not self.filename:
        message = 'filename must not be empty'
        raise FileSyncParameterError(message)

    if not self.fields:
        message = 'fields must not be empty'
        raise FileSyncParameterError(message)

    if self.deactivation_threshold is not None and self.deactivation_threshold < 0.0:
        message = (
            f'deactivation_threshold must be non-negative '
            f'or None, got {self.deactivation_threshold}'
        )

        raise FileSyncParameterError(message)

Conflict dataclass

key instance-attribute

source_record instance-attribute

target_record instance-attribute

baseline_record = None class-attribute instance-attribute

source_timestamp = None class-attribute instance-attribute

target_timestamp = None class-attribute instance-attribute

ConflictStrategy

Bases: Protocol

resolve

Source code in django_spire/contrib/sync/file/conflict.py
def resolve(self, conflict: Conflict) -> Resolution: ...

LastWriteWins

resolve

Source code in django_spire/contrib/sync/file/conflict.py
def resolve(self, conflict: Conflict) -> Resolution:
    if conflict.source_timestamp is None and conflict.target_timestamp is None:
        message = (
            f'Cannot resolve conflict for key {conflict.key!r}: '
            f'no timestamps available on either side'
        )

        raise FileSyncConflictError(message)

    if conflict.source_timestamp is None:
        return Resolution(
            action=ResolutionAction.USE_TARGET,
            record=conflict.target_record,
        )

    if conflict.target_timestamp is None:
        return Resolution(
            action=ResolutionAction.USE_SOURCE,
            record=conflict.source_record,
        )

    if conflict.source_timestamp >= conflict.target_timestamp:
        return Resolution(
            action=ResolutionAction.USE_SOURCE,
            record=conflict.source_record,
        )

    return Resolution(
        action=ResolutionAction.USE_TARGET,
        record=conflict.target_record,
    )

Resolution dataclass

action instance-attribute

record instance-attribute

SourceWins

resolve

Source code in django_spire/contrib/sync/file/conflict.py
def resolve(self, conflict: Conflict) -> Resolution:
    return Resolution(
        action=ResolutionAction.USE_SOURCE,
        record=conflict.source_record,
    )

TargetWins

resolve

Source code in django_spire/contrib/sync/file/conflict.py
def resolve(self, conflict: Conflict) -> Resolution:
    return Resolution(
        action=ResolutionAction.USE_TARGET,
        record=conflict.target_record,
    )

Engine

Source code in django_spire/contrib/sync/file/engine.py
def __init__(
    self,
    storage: Storage,
    identity_field: str,
    compare_fields: list[str] | None = None,
    deactivation_threshold: float | None = 0.5,
    transaction: Callable[[], AbstractContextManager[Any]] | None = None,
    on_created: Callable[[str, dict[str, Any]], None] | None = None,
    on_deactivated: Callable[[str], None] | None = None,
    on_updated: Callable[[str, dict[str, Any], dict[str, Any]], None] | None = None,
    on_complete: Callable[[Result], None] | None = None,
    progress: Callable[[SyncStage, int, int], None] | None = None,
) -> None:
    if not identity_field:
        message = 'identity_field must be a non-empty string'
        raise FileSyncParameterError(message)

    if deactivation_threshold is not None and deactivation_threshold < 0.0:
        message = (
            f'deactivation_threshold must be non-negative '
            f'or None, got {deactivation_threshold}'
        )

        raise FileSyncParameterError(message)

    self._identity_field = identity_field
    self._deactivation_threshold = deactivation_threshold
    self._on_complete = on_complete
    self._on_created = on_created
    self._on_deactivated = on_deactivated
    self._on_updated = on_updated
    self._progress = progress
    self._storage = storage
    self._transaction = transaction or nullcontext
    self._hasher = RecordHasher(identity_field, compare_fields)

sync

Source code in django_spire/contrib/sync/file/engine.py
def sync(
    self,
    file_path: str | Path,
    reader: Reader,
    dry_run: bool = False,
) -> Result:
    records = reader.read(file_path)
    return self.sync_records(records, dry_run=dry_run)

sync_records

Source code in django_spire/contrib/sync/file/engine.py
def sync_records(
    self,
    records: list[dict[str, Any]],
    dry_run: bool = False,
) -> Result:
    result = Result()

    validated = validate_records(
        records,
        self._identity_field,
        result.errors,
        self._progress,
    )

    classified = self._classify(validated)

    result.created = classified.new_keys
    result.updated = classified.changed_keys
    result.unchanged = classified.unchanged_keys
    result.deactivated = sorted(classified.stale_keys)

    if dry_run:
        return result

    old_records = self._mutate(validated, classified)

    result.changes = self._build_changes(
        validated,
        old_records,
        classified.changed_keys,
    )

    self._fire_callbacks(validated, old_records, result)
    self._finalize(result)

    return result

FileSyncAbortedError

Bases: FileSyncError

Sync aborted due to a safety threshold violation.

FileSyncArchiveError

Bases: FileSyncError

Archive extraction or path validation failed.

FileSyncConfigError

Bases: FileSyncError

Sync service mixin is missing required configuration.

FileSyncConflictError

Bases: FileSyncError

Conflict could not be resolved with available data.

FileSyncError

Bases: Exception

Base exception for all file sync operations.

FileSyncParameterError

Bases: FileSyncError

Constructor or function received an invalid argument.

FileSyncParseError

Bases: FileSyncError

Source file record failed validation or type casting.

FileSyncSourceNotFoundError

Bases: FileSyncError

Source file required for sync does not exist.

BidirectionalStorage

Bases: Storage, Protocol

get_baseline_hashes

Source code in django_spire/contrib/sync/file/storage.py
def get_baseline_hashes(self) -> dict[str, str]: ...

get_timestamps

Source code in django_spire/contrib/sync/file/storage.py
def get_timestamps(self, keys: set[str]) -> dict[str, datetime]: ...

save_baseline_hashes

Source code in django_spire/contrib/sync/file/storage.py
def save_baseline_hashes(self, hashes: dict[str, str]) -> None: ...

Storage

Bases: Protocol

create_many

Source code in django_spire/contrib/sync/file/storage.py
def create_many(self, records: list[dict[str, Any]], hashes: dict[str, str]) -> None: ...

deactivate_many

Source code in django_spire/contrib/sync/file/storage.py
def deactivate_many(self, keys: set[str]) -> None: ...

get_active_keys

Source code in django_spire/contrib/sync/file/storage.py
def get_active_keys(self) -> set[str]: ...

get_hashes

Source code in django_spire/contrib/sync/file/storage.py
def get_hashes(self, keys: set[str]) -> dict[str, str]: ...

get_many

Source code in django_spire/contrib/sync/file/storage.py
def get_many(self, keys: set[str]) -> dict[str, dict[str, Any]]: ...

update_many

Source code in django_spire/contrib/sync/file/storage.py
def update_many(self, records: list[dict[str, Any]], hashes: dict[str, str]) -> None: ...