def acquire(self, node_id: str, peer_node_id: str) -> str:
self._ensure_lock_row(node_id, peer_node_id)
with transaction.atomic():
locked = (
SyncNodeLock.objects
.select_for_update()
.filter(node_id=node_id, peer_node_id=peer_node_id)
.first()
)
if locked is None:
message = (
f'Sync lock row for {node_id!r}<->{peer_node_id!r} '
f'is missing after creation'
)
raise LockContentionError(message)
self._abandon_stale_sessions(node_id, peer_node_id)
self._check_active_session(node_id, peer_node_id)
session_id = self._create_session(node_id, peer_node_id)
logger.info(
'Acquired sync lock for %s<->%s (session %s)',
node_id,
peer_node_id,
session_id,
)
return session_id