def sync(self, dry_run: bool = False) -> DatabaseResult:
if self._transport is None:
message = (
'Transport is required for sync(). '
'Use process() for server-side.'
)
raise TransportRequiredError(message)
result = DatabaseResult()
persisted = self._storage.get_after_keys(self._node_id)
server_cursors: dict[str, Any] = {
k.removeprefix('server:'): v
for k, v in persisted.items()
if k.startswith('server:')
}
collect_cursors: dict[str, Any] = {
k.removeprefix('collect:'): v
for k, v in persisted.items()
if k.startswith('collect:')
}
with self._managed_session(result) as session_id:
while True:
self._enter_phase(
SyncPhase.COLLECTING,
session_id,
SyncStage.VALIDATE,
)
checkpoint = self._storage.get_checkpoint(
self._node_id,
)
manifest = self._collect(
checkpoint,
limit=self._batch_size,
bytes_limit=self._batch_bytes,
after_keys=collect_cursors,
)
manifest.after_keys = server_cursors
manifest.checksum = manifest.compute_checksum()
sent_snapshot = self._extract_record_snapshot(
manifest,
)
self._record_pushed(manifest, result)
self._enter_phase(
SyncPhase.EXCHANGING, session_id,
SyncStage.CLASSIFY,
)
response = self._exchange_and_validate(manifest)
received_snapshot = self._extract_record_snapshot(
response,
)
self._enter_phase(
SyncPhase.RECONCILING, session_id,
SyncStage.MUTATE,
)
if manifest.has_more:
collect_cursors = {}
for payload in manifest.payloads:
cursor = _last_cursor(payload.records)
if cursor:
collect_cursors[payload.model_label] = cursor
else:
collect_cursors = {}
if response.has_more:
server_cursors = response.after_keys
else:
server_cursors = {}
if not dry_run:
self._enter_phase(
SyncPhase.COMMITTING,
session_id,
)
self._commit(
checkpoint, response,
sent_snapshot, received_snapshot,
result,
server_cursors=server_cursors,
collect_cursors=collect_cursors,
)
converged = (
not manifest.has_more
and not response.has_more
)
if dry_run:
break
if converged:
exchanged = (
any(p.records or p.deletes for p in manifest.payloads)
or any(p.records or p.deletes for p in response.payloads)
)
if not exchanged:
break
self._enter_phase(SyncPhase.COMPLETE, session_id)
self._finalize(result)
self._log_sync_summary(result)
return result