def apply_many(
self,
model: type[SyncableMixin],
records: dict[str, SyncRecord],
deserialized: dict[str, dict[str, Any]],
origin_node: str,
) -> UpsertResult:
if not records:
return UpsertResult()
connection = self._connection(model)
if connection.vendor not in _SUPPORTED_VENDORS:
message = (
f'BulkUpsertStrategy requires postgresql or sqlite, '
f'got {connection.vendor!r}'
)
raise NotImplementedError(message)
model_label = model._meta.label
writable, errors = self._validate_records(model_label, records)
if not writable:
return UpsertResult(skipped=set(), errors=errors)
from django_spire.contrib.sync.django.sequence import ( # noqa: PLC0415
SyncSequenceAllocator,
)
fields = self._writable_fields(model)
sorted_keys = sorted(writable.keys())
sequence_first = SyncSequenceAllocator().allocate(len(sorted_keys)).value_first
instances = [
self._build_instance(RecordContext(
model=model,
key=key,
sync_record=writable[key],
field_data=deserialized[key],
sequence=sequence_first + index,
origin_node=origin_node,
))
for index, key in enumerate(sorted_keys)
]
identity = model._meta.get_field(self._identity_field)
rows_per_batch = self._rows_per_batch(len(fields))
applied: set[str] = set()
with sync_bypass(), connection.cursor() as cursor:
for offset in range(0, len(instances), rows_per_batch):
batch = instances[offset:offset + rows_per_batch]
params: list[Any] = []
for instance in batch:
params.extend(
self._build_row_params(
instance,
fields,
connection,
),
)
sql = self._build_upsert_sql(
model,
fields,
connection,
len(batch),
)
cursor.execute(sql, params)
applied.update(
str(identity.to_python(row[0]))
for row in cursor.fetchall()
)
skipped: set[str] = set()
for key in writable:
if key not in applied:
skipped.add(key)
return UpsertResult(skipped=skipped, errors=errors)