Skip to content

engine

django_spire.contrib.sync.file.engine

logger = logging.getLogger(__name__) module-attribute

Engine

Source code in django_spire/contrib/sync/file/engine.py
def __init__(
    self,
    storage: Storage,
    identity_field: str,
    compare_fields: list[str] | None = None,
    deactivation_threshold: float | None = 0.5,
    transaction: Callable[[], AbstractContextManager[Any]] | None = None,
    on_created: Callable[[str, dict[str, Any]], None] | None = None,
    on_deactivated: Callable[[str], None] | None = None,
    on_updated: Callable[[str, dict[str, Any], dict[str, Any]], None] | None = None,
    on_complete: Callable[[Result], None] | None = None,
    progress: Callable[[SyncStage, int, int], None] | None = None,
) -> None:
    if not identity_field:
        message = 'identity_field must be a non-empty string'
        raise FileSyncParameterError(message)

    if deactivation_threshold is not None and deactivation_threshold < 0.0:
        message = (
            f'deactivation_threshold must be non-negative '
            f'or None, got {deactivation_threshold}'
        )

        raise FileSyncParameterError(message)

    self._identity_field = identity_field
    self._deactivation_threshold = deactivation_threshold
    self._on_complete = on_complete
    self._on_created = on_created
    self._on_deactivated = on_deactivated
    self._on_updated = on_updated
    self._progress = progress
    self._storage = storage
    self._transaction = transaction or nullcontext
    self._hasher = RecordHasher(identity_field, compare_fields)

sync

Source code in django_spire/contrib/sync/file/engine.py
def sync(
    self,
    file_path: str | Path,
    reader: Reader,
    dry_run: bool = False,
) -> Result:
    records = reader.read(file_path)
    return self.sync_records(records, dry_run=dry_run)

sync_records

Source code in django_spire/contrib/sync/file/engine.py
def sync_records(
    self,
    records: list[dict[str, Any]],
    dry_run: bool = False,
) -> Result:
    result = Result()

    validated = validate_records(
        records,
        self._identity_field,
        result.errors,
        self._progress,
    )

    classified = self._classify(validated)

    result.created = classified.new_keys
    result.updated = classified.changed_keys
    result.unchanged = classified.unchanged_keys
    result.deactivated = sorted(classified.stale_keys)

    if dry_run:
        return result

    old_records = self._mutate(validated, classified)

    result.changes = self._build_changes(
        validated,
        old_records,
        classified.changed_keys,
    )

    self._fire_callbacks(validated, old_records, result)
    self._finalize(result)

    return result

check_deactivation_threshold

Source code in django_spire/contrib/sync/file/engine.py
def check_deactivation_threshold(
    threshold: float | None,
    active_count: int,
    deactivation_count: int,
) -> None:
    if threshold is None:
        return

    if not active_count or not deactivation_count:
        return

    ratio = deactivation_count / active_count

    if ratio > threshold:
        message = (
            f'Deactivation ratio {ratio:.1%} exceeds threshold '
            f'{threshold:.1%} '
            f'({deactivation_count} of {active_count} records). '
            f'Set deactivation_threshold=None to disable this check.'
        )
        raise FileSyncAbortedError(message)

validate_records

Source code in django_spire/contrib/sync/file/engine.py
def validate_records(
    records: list[dict[str, Any]],
    identity_field: str,
    errors: list[Error],
    progress: Callable[[SyncStage, int, int], None] | None = None,
) -> dict[str, dict[str, Any]]:
    validated: dict[str, dict[str, Any]] = {}
    total = len(records)

    for i, record in enumerate(records):
        raw = record.get(identity_field)

        if raw is None:
            errors.append(Error(
                key='',
                message=f'Record missing identity field: {identity_field}',
            ))

            continue

        key = str(raw).strip()

        if not key:
            errors.append(Error(
                key='',
                message=f'Record has empty identity field: {identity_field}',
            ))

            continue

        if key in validated:
            errors.append(Error(
                key=key,
                message='Duplicate identity value',
            ))

            continue

        validated[key] = record

        if progress:
            progress(SyncStage.VALIDATE, i + 1, total)

    return validated