Skip to content

bidirectional

django_spire.contrib.sync.file.bidirectional

logger = logging.getLogger(__name__) module-attribute

BidirectionalEngine

Source code in django_spire/contrib/sync/file/bidirectional.py
def __init__(
    self,
    storage: BidirectionalStorage,
    identity_field: str,
    compare_fields: list[str] | None = None,
    conflict_strategy: ConflictStrategy | None = None,
    deactivation_threshold: float | None = None,
    transaction: Callable[[], AbstractContextManager[Any]] | None = None,
    on_complete: Callable[[BidirectionalResult], None] | None = None,
    progress: Callable[[SyncStage, int, int], None] | None = None,
) -> None:
    if not identity_field:
        message = 'identity_field must be a non-empty string'
        raise FileSyncParameterError(message)

    if deactivation_threshold is not None and deactivation_threshold < 0.0:
        message = (
            f'deactivation_threshold must be non-negative '
            f'or None, got {deactivation_threshold}'
        )

        raise FileSyncParameterError(message)

    self._identity_field = identity_field
    self._conflict_strategy = conflict_strategy or SourceWins()
    self._deactivation_threshold = deactivation_threshold
    self._on_complete = on_complete
    self._progress = progress
    self._storage = storage
    self._transaction = transaction or nullcontext
    self._hasher = RecordHasher(identity_field, compare_fields)

sync

Source code in django_spire/contrib/sync/file/bidirectional.py
def sync(
    self,
    file_path: str | Path,
    reader: Reader,
    writer: Writer,
    dry_run: bool = False,
) -> BidirectionalResult:
    file_path = Path(file_path)
    result = BidirectionalResult()

    snapshot = self._collect(file_path, reader, result)

    classified = self._classify(
        snapshot.source,
        snapshot.target,
        snapshot.baseline_hashes,
        snapshot.source_hashes,
        snapshot.target_hashes,
    )

    check_deactivation_threshold(
        self._deactivation_threshold,
        len(snapshot.target),
        len(classified.target_deactivations),
    )

    source_timestamp = self._get_file_timestamp(file_path)

    target_timestamps = (
        self._storage.get_timestamps(set(classified.conflict_keys))
        if classified.conflict_keys
        else {}
    )

    resolved = self._resolve_conflicts(
        classified.conflict_keys,
        snapshot.source, snapshot.target,
        source_timestamp, target_timestamps, result,
    )

    self._apply_resolutions(
        resolved, classified, snapshot.source, snapshot.target,
    )

    self._populate_result(result, classified, resolved)

    if dry_run:
        return result

    self._commit(
        snapshot, classified, resolved,
        file_path, writer, result,
    )

    self._finalize(result)

    return result