Skip to content

lock

django_spire.contrib.sync.django.lock

logger = logging.getLogger(__name__) module-attribute

DjangoSyncLock

Source code in django_spire/contrib/sync/django/lock.py
def __init__(
    self,
    timeout_stale: int = _STALE_TIMEOUT_DEFAULT,
) -> None:
    self._timeout_stale = timeout_stale

acquire

Source code in django_spire/contrib/sync/django/lock.py
def acquire(self, node_id: str) -> str:
    self._ensure_lock_row(node_id)

    with transaction.atomic():
        locked = (
            SyncNodeLock.objects
            .select_for_update()
            .filter(node_id=node_id)
            .first()
        )

        if locked is None:
            message = f'Sync lock row for {node_id!r} is missing after creation'
            raise LockContentionError(message)

        self._abandon_stale_sessions(node_id)
        self._check_active_session(node_id)
        session_id = self._create_session(node_id)

    logger.info(
        'Acquired sync lock for node %s (session %s)',
        node_id,
        session_id,
    )

    return session_id

hold

Source code in django_spire/contrib/sync/django/lock.py
def hold(self, node_id: str) -> None:
    self._ensure_lock_row(node_id)
    SyncNodeLock.objects.select_for_update().filter(node_id=node_id).first()

release

Source code in django_spire/contrib/sync/django/lock.py
def release(
    self,
    session_id: str,
    status: SyncStatus,
    result: DatabaseResult | None = None,
) -> None:
    session = (
        SyncSession.objects
        .filter(id=session_id)
        .first()
    )

    if session is None:
        logger.warning(
            'Sync session %s not found during release',
            session_id,
        )

        return

    now = timezone.now()
    elapsed_ms = (now - session.started_at).total_seconds() * 1000

    session.completed_at = now
    session.duration_ms = int(elapsed_ms)

    session.phase = (
        SyncPhase.COMPLETE
        if status == SyncStatus.SUCCESS
        else SyncPhase.FAILED
    )

    session.status = status

    if result is not None:
        session.records_pushed = sum(
            len(keys)
            for keys in result.pushed.values()
        )

        session.records_applied = sum(
            len(keys)
            for keys in result.applied.values()
        )

        session.records_created = sum(
            len(keys)
            for keys in result.created.values()
        )

        session.records_deleted = sum(
            len(keys)
            for keys in result.deleted.values()
        )

        session.conflicts = sum(
            len(keys)
            for keys in result.conflicts.values()
        )

        session.errors = len(result.errors)

    session.save()

    logger.info(
        'Released sync lock for session %s (status=%s)',
        session_id,
        status,
    )

update_phase

Source code in django_spire/contrib/sync/django/lock.py
def update_phase(self, session_id: str, phase: SyncPhase) -> None:
    SyncSession.objects.filter(id=session_id).update(phase=phase)