def exchange(
self, manifest: SyncManifest,
) -> SyncManifest:
payload = manifest.to_dict()
response_data = retry(
lambda: self._post(payload),
attempts=self._retries,
delay=self._retry_delay,
exceptions=_TRANSIENT_EXCEPTIONS,
)
if not isinstance(response_data, dict):
message = 'Server returned an invalid response format'
raise InvalidResponseError(message)
if 'node_id' not in response_data:
error = response_data.get(
'error',
'Missing required manifest fields',
)
message = (
f'Server returned an invalid sync '
f'response: {error}'
)
raise InvalidResponseError(message)
if 'checkpoint' not in response_data:
error = response_data.get(
'error',
'Missing required manifest fields',
)
message = (
f'Server returned an invalid sync '
f'response: {error}'
)
raise InvalidResponseError(message)
logger.info(
'Exchanged manifest with %s '
'(%d payloads sent, %d received)',
self._url,
len(manifest.payloads),
len(response_data.get('payloads', [])),
)
return SyncManifest.from_dict(response_data)