def process_sync_request(
request: HttpRequest,
engine: DatabaseEngine,
request_bytes_max: int = _REQUEST_BYTES_MAX,
validate_node_id: Callable[[HttpRequest, str], bool] | None = None,
) -> JsonResponse:
if request_bytes_max < 1:
message = (
f'request_bytes_max must be >= 1, '
f'got {request_bytes_max}'
)
raise InvalidParameterError(message)
if request.method != 'POST':
return JsonResponse(
{'ok': False, 'error': 'Method not allowed'},
status=405,
)
content_type = request.content_type or ''
if content_type != 'application/json':
return JsonResponse(
{
'ok': False,
'error': 'Content-Type must be application/json',
},
status=415,
)
rejection = _reject_if_oversized_header(
request, request_bytes_max,
)
if rejection is not None:
return rejection
body, error = _read_body(request, request_bytes_max)
if error is not None:
return error
incoming, error = _parse_manifest(body)
if error is not None:
return error
if validate_node_id is not None:
if not validate_node_id(request, incoming.node_id):
return JsonResponse(
{
'ok': False,
'error': 'Node ID not authorized for this user',
},
status=403,
)
logger.info(
'Received sync from node %s with %d payloads',
incoming.node_id,
len(incoming.payloads),
)
try:
response, result = engine.process(incoming)
except SyncAbortedError:
logger.exception(
'Sync aborted for node %s',
incoming.node_id,
)
return JsonResponse(
{'ok': False, 'error': 'Sync aborted'},
status=409,
)
return JsonResponse({
**response.to_dict(),
'ok': result.ok,
'errors': [
{'key': error.key, 'message': error.message}
for error in result.errors
],
})