def apply_many(
self,
model: type[SyncableMixin],
records: dict[str, SyncRecord],
deserialized: dict[str, dict[str, Any]],
) -> set[str]:
if not records:
return set()
connection = self._connection(model)
if connection.vendor not in _SUPPORTED_VENDORS:
message = (
f'BulkUpsertStrategy requires postgresql or sqlite, '
f'got {connection.vendor!r}'
)
raise NotImplementedError(message)
writable, skipped = self._filter_ghosts(records)
if not writable:
return skipped
fields = self._writable_fields(model)
sorted_keys = sorted(writable.keys())
instances = [
self._build_instance(
model,
key,
writable[key],
deserialized[key],
)
for key in sorted_keys
]
identity = model._meta.get_field(self._identity_field)
rows_per_batch = self._rows_per_batch(len(fields))
applied: set[str] = set()
with sync_bypass(), connection.cursor() as cursor:
for offset in range(0, len(instances), rows_per_batch):
batch = instances[offset:offset + rows_per_batch]
params: list[Any] = []
for instance in batch:
params.extend(
self._build_row_params(
instance,
fields,
connection,
),
)
sql = self._build_sql(
model,
fields,
connection,
len(batch),
)
cursor.execute(sql, params)
applied.update(
str(identity.to_python(row[0]))
for row in cursor.fetchall()
)
for key in writable:
if key not in applied:
skipped.add(key)
return skipped