def __init__(
self,
storage: DatabaseSyncStorage,
graph: DependencyGraph,
clock: HybridLogicalClock,
node_id: str,
*,
batch_bytes: int | None = BATCH_BYTES_DEFAULT,
batch_size: int | None = None,
clock_drift_max: int | None = CLOCK_DRIFT_MAX_DEFAULT,
foreign_key_columns: dict[str, list[tuple[str, str]]] | None = None,
identity_field: str = 'id',
lock: SyncLock | None = None,
on_complete: Callable[[DatabaseResult], None] | None = None,
on_phase: Callable[[SyncPhase], None] | None = None,
payload_bytes_max: int | None = None,
payload_records_max: int | None = None,
peer_node_id: str | None = None,
progress: Callable[[SyncStage, int, int], None] | None = None,
reconciler: PayloadReconciler | None = None,
transaction: Callable[[], AbstractContextManager[Any]] = nullcontext,
transport: Transport | None = None,
) -> None:
if not node_id:
message = 'node_id must be a non-empty string'
raise InvalidParameterError(message)
if not identity_field:
message = 'identity_field must be a non-empty string'
raise InvalidParameterError(message)
if batch_bytes is not None and batch_bytes < 1:
message = (
f'batch_bytes must be >= 1 '
f'or None, got {batch_bytes}'
)
raise InvalidParameterError(message)
if batch_size is not None and batch_size < 1:
message = (
f'batch_size must be >= 1 '
f'or None, got {batch_size}'
)
raise InvalidParameterError(message)
if clock_drift_max is not None and clock_drift_max < 0:
message = (
f'clock_drift_max must be non-negative '
f'or None, got {clock_drift_max}'
)
raise InvalidParameterError(message)
if payload_bytes_max is not None and payload_bytes_max < 1:
message = (
f'payload_bytes_max must be >= 1 '
f'or None, got {payload_bytes_max}'
)
raise InvalidParameterError(message)
if payload_records_max is not None and payload_records_max < 1:
message = (
f'payload_records_max must be >= 1 '
f'or None, got {payload_records_max}'
)
raise InvalidParameterError(message)
if transport is not None and not peer_node_id:
message = (
'peer_node_id is required when transport is set '
'(client mode)'
)
raise InvalidParameterError(message)
if peer_node_id is not None and peer_node_id == node_id:
message = (
f'peer_node_id must differ from node_id '
f'(both are {node_id!r})'
)
raise InvalidParameterError(message)
self._batch_bytes = batch_bytes
self._batch_size = batch_size
self._clock = clock
self._clock_drift_max = clock_drift_max
self._errored_keys: dict[str, set[str]] = defaultdict(set)
self._foreign_key_columns = foreign_key_columns or {}
self._graph = graph
self._identity_field = identity_field
self._lock = lock
self._node_id = node_id
self._on_complete = on_complete
self._on_phase = on_phase
self._payload_bytes_max = payload_bytes_max
self._payload_records_max = payload_records_max
self._peer_node_id = peer_node_id or ''
self._progress = progress
self._reconciler = reconciler or PayloadReconciler()
self._storage = storage
self._transaction = transaction
self._transport = transport