Skip to content

storage

django_spire.contrib.sync.django.storage

__all__ = ['DeleteStrategy', 'DjangoCheckpointStore', 'DjangoRecordReader', 'DjangoRecordWriter', 'DjangoSyncStorage', 'HardDeleteStrategy', 'ManyToManyApplier', 'SoftDeleteStrategy', 'StalenessGuardedUpsertStrategy', 'UpsertStrategy'] module-attribute

DjangoCheckpointStore

get_after_keys

Source code in django_spire/contrib/sync/django/storage/checkpoint.py
def get_after_keys(self, peer_node_id: str) -> dict[str, Any]:
    checkpoint = (
        SyncCheckpoint.objects
        .filter(peer_node_id=peer_node_id)
        .first()
    )

    if checkpoint is None:
        return {}

    return checkpoint.after_keys or {}

get_checkpoint

Source code in django_spire/contrib/sync/django/storage/checkpoint.py
def get_checkpoint(self, peer_node_id: str) -> CheckpointPosition:
    checkpoint = (
        SyncCheckpoint.objects
        .filter(peer_node_id=peer_node_id)
        .first()
    )

    if checkpoint is None:
        return CheckpointPosition(peer_sequence=0, local_sequence_pushed=0)

    return CheckpointPosition(
        peer_sequence=checkpoint.peer_sequence,
        local_sequence_pushed=checkpoint.local_sequence_pushed,
    )

save_checkpoint

Source code in django_spire/contrib/sync/django/storage/checkpoint.py
def save_checkpoint(
    self,
    peer_node_id: str,
    peer_sequence: int,
    local_sequence_pushed: int,
    after_keys: dict[str, Any] | None = None,
) -> None:
    SyncCheckpoint.objects.update_or_create(
        peer_node_id=peer_node_id,
        defaults={
            'after_keys': after_keys or {},
            'local_sequence_pushed': local_sequence_pushed,
            'peer_sequence': peer_sequence,
        },
    )

DjangoSyncStorage

Source code in django_spire/contrib/sync/django/storage/facade.py
def __init__(
    self,
    models: list[type[SyncableMixin]],
    identity_field: str = 'id',
    batch_size_max: int = _BATCH_SIZE_MAX,
    checkpoint_store: CheckpointStore | None = None,
    deferred_foreign_keys: list[DeferredForeignKey] | None = None,
    record_reader: RecordReader | None = None,
    record_writer: RecordWriter | None = None,
    sequence_allocator: SequenceAllocator | None = None,
) -> None:
    self._checkpoint_store = checkpoint_store or DjangoCheckpointStore()

    self._record_reader = record_reader or DjangoRecordReader(
        models=models,
        identity_field=identity_field,
    )

    self._record_writer = record_writer or DjangoRecordWriter(
        models=models,
        identity_field=identity_field,
        batch_size_max=batch_size_max,
        deferred_foreign_keys=deferred_foreign_keys,
    )

    self._sequence_allocator = (
        sequence_allocator or
        SyncSequenceAllocator()
    )

clear_tombstones

Source code in django_spire/contrib/sync/django/storage/facade.py
def clear_tombstones(
    self,
    model_label: str,
    keys: set[str],
) -> None:
    self._record_writer.clear_tombstones(model_label, keys)

delete_many

Source code in django_spire/contrib/sync/django/storage/facade.py
def delete_many(
    self,
    model_label: str,
    deletes: dict[str, int],
    origin_node: str,
) -> None:
    self._record_writer.delete_many(
        model_label,
        deletes,
        origin_node,
    )

flush_deferred_backfill

Source code in django_spire/contrib/sync/django/storage/facade.py
def flush_deferred_backfill(self) -> None:
    self._record_writer.flush_deferred_backfill()

get_after_keys

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_after_keys(self, peer_node_id: str) -> dict[str, Any]:
    return self._checkpoint_store.get_after_keys(peer_node_id)

get_changed_since

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_changed_since(
    self,
    model_label: str,
    sequence: int,
    peer_node_id: str,
    sequence_max: int | None = None,
    limit: int | None = None,
    after_key: str | None = None,
) -> dict[str, SyncRecord]:
    return self._record_reader.get_changed_since(
        model_label,
        sequence,
        peer_node_id,
        sequence_max=sequence_max,
        limit=limit,
        after_key=after_key,
    )

get_deletes_since

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_deletes_since(
    self,
    model_label: str,
    sequence: int,
    peer_node_id: str,
    sequence_max: int | None = None,
) -> dict[str, int]:
    return self._record_reader.get_deletes_since(
        model_label,
        sequence,
        peer_node_id,
        sequence_max=sequence_max,
    )

get_checkpoint

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_checkpoint(self, peer_node_id: str) -> CheckpointPosition:
    return self._checkpoint_store.get_checkpoint(peer_node_id)

get_records

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_records(
    self,
    model_label: str,
    keys: set[str],
) -> dict[str, SyncRecord]:
    return self._record_reader.get_records(model_label, keys)

get_sequence_allocator

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_sequence_allocator(self) -> SequenceAllocator:
    return self._sequence_allocator

get_syncable_models

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_syncable_models(self) -> list[str]:
    return self._record_reader.get_syncable_models()

get_tombstones

Source code in django_spire/contrib/sync/django/storage/facade.py
def get_tombstones(
    self,
    model_label: str,
    keys: set[str],
) -> dict[str, int]:
    return self._record_reader.get_tombstones(model_label, keys)

save_checkpoint

Source code in django_spire/contrib/sync/django/storage/facade.py
def save_checkpoint(
    self,
    peer_node_id: str,
    peer_sequence: int,
    local_sequence_pushed: int,
    after_keys: dict[str, Any] | None = None,
) -> None:
    self._checkpoint_store.save_checkpoint(
        peer_node_id,
        peer_sequence,
        local_sequence_pushed,
        after_keys=after_keys,
    )

stamp_unstamped_records

Source code in django_spire/contrib/sync/django/storage/facade.py
def stamp_unstamped_records(
    self,
    clock: HybridLogicalClock,
    model_order: list[str] | None = None,
) -> None:
    count = self._record_writer.stamp_unstamped_records(
        clock=clock,
        model_order=model_order,
    )

    if count:
        logger.info(
            'Pre-sync stamping complete: %d records',
            count,
        )

upsert_many

Source code in django_spire/contrib/sync/django/storage/facade.py
def upsert_many(
    self,
    model_label: str,
    records: dict[str, SyncRecord],
    origin_node: str,
) -> UpsertResult:
    return self._record_writer.upsert_many(
        model_label,
        records,
        origin_node,
    )

ManyToManyApplier

Source code in django_spire/contrib/sync/django/storage/many_to_many.py
def __init__(self, identity_field: str = 'id') -> None:
    self._identity_field = identity_field

apply

Source code in django_spire/contrib/sync/django/storage/many_to_many.py
def apply(
    self,
    model: type[SyncableMixin],
    pending: dict[str, dict[str, list[Any]]],
) -> UpsertResult:
    result = UpsertResult()

    if not pending:
        return result

    identity_lookup = {f'{self._identity_field}__in': list(pending.keys())}

    instances_by_key = {
        str(getattr(instance, self._identity_field)): instance
        for instance in model.objects.filter(**identity_lookup)
    }

    with sync_bypass():
        for key in sorted(pending.keys()):
            many_to_many_data = pending[key]
            instance = instances_by_key.get(key)

            if instance is None:
                logger.warning(
                    'Skipping M2M relations for %s key=%s: '
                    'instance not found after upsert',
                    model._meta.label,
                    key,
                )

                result.skipped.add(key)

                continue

            for field_name, values in sorted(many_to_many_data.items()):
                try:
                    getattr(instance, field_name).set(values)
                except IntegrityError as exception:
                    logger.exception(
                        'M2M set failed for %s:%s field=%s values=%s',
                        model._meta.label,
                        key,
                        field_name,
                        values,
                    )

                    result.errors.append(Error(
                        key=key,
                        message=(
                            f'M2M set failed for '
                            f'{model._meta.label}:{key} '
                            f'field={field_name}: {exception}'
                        ),
                        exception=exception,
                    ))

    return result

DjangoRecordReader

Source code in django_spire/contrib/sync/django/storage/reader.py
def __init__(
    self,
    models: list[type[SyncableMixin]],
    identity_field: str = 'id',
) -> None:
    self._identity_field = identity_field

    self._models: dict[str, type[SyncableMixin]] = {
        model._meta.label: model
        for model in models
    }

    self._serializers: dict[str, SyncFieldSerializer] = {
        model._meta.label: SyncFieldSerializer(model)
        for model in models
    }

get_changed_since

Source code in django_spire/contrib/sync/django/storage/reader.py
def get_changed_since(
    self,
    model_label: str,
    sequence: int,
    peer_node_id: str,
    sequence_max: int | None = None,
    limit: int | None = None,
    after_key: str | None = None,
) -> dict[str, SyncRecord]:
    model = self._get_model(model_label)
    many_to_many_names = self._get_many_to_many_names(model)

    if after_key:
        identity_gt = {
            f'{self._identity_field}__gt': after_key,
        }

        queryset = model.objects.filter(
            Q(sync_field_sequence__gt=sequence) |
            Q(
                sync_field_sequence=sequence,
                **identity_gt,
            ),
        )
    else:
        queryset = model.objects.filter(
            sync_field_sequence__gt=sequence,
        )

    if sequence_max is not None:
        queryset = queryset.filter(
            sync_field_sequence__lte=sequence_max,
        )

    if peer_node_id:
        queryset = queryset.exclude(
            sync_field_origin_node=peer_node_id,
        )

    queryset = queryset.order_by(
        'sync_field_sequence',
        self._identity_field,
    )

    if limit is not None:
        queryset = queryset[:limit]

    if many_to_many_names:
        queryset = queryset.prefetch_related(
            *many_to_many_names,
        )

    return {
        str(getattr(instance, self._identity_field)): self._instance_to_record(instance)
        for instance in queryset
    }

get_deletes_since

Source code in django_spire/contrib/sync/django/storage/reader.py
def get_deletes_since(
    self,
    model_label: str,
    sequence: int,
    peer_node_id: str,
    sequence_max: int | None = None,
) -> dict[str, int]:
    from django_spire.contrib.sync.django.models.tombstone import SyncTombstone  # noqa: PLC0415

    queryset = SyncTombstone.objects.filter(
        model_label=model_label,
        sequence__gt=sequence,
    )

    if sequence_max is not None:
        queryset = queryset.filter(sequence__lte=sequence_max)

    if peer_node_id:
        queryset = queryset.exclude(origin_node=peer_node_id)

    rows = queryset.values_list('record_key', 'timestamp')

    return dict(rows)

get_records

Source code in django_spire/contrib/sync/django/storage/reader.py
def get_records(
    self,
    model_label: str,
    keys: set[str],
) -> dict[str, SyncRecord]:
    if not keys:
        return {}

    model = self._get_model(model_label)
    many_to_many_names = self._get_many_to_many_names(model)

    identity_lookup = {f'{self._identity_field}__in': keys}
    queryset = model.objects.filter(**identity_lookup)

    if many_to_many_names:
        queryset = queryset.prefetch_related(*many_to_many_names)

    return {
        str(getattr(instance, self._identity_field)): self._instance_to_record(instance)
        for instance in queryset
    }

get_syncable_models

Source code in django_spire/contrib/sync/django/storage/reader.py
def get_syncable_models(self) -> list[str]:
    return sorted(self._models.keys())

get_tombstones

Source code in django_spire/contrib/sync/django/storage/reader.py
def get_tombstones(
    self,
    model_label: str,
    keys: set[str],
) -> dict[str, int]:
    if not keys:
        return {}

    from django_spire.contrib.sync.django.models.tombstone import SyncTombstone  # noqa: PLC0415

    rows = (
        SyncTombstone.objects
        .filter(model_label=model_label, record_key__in=keys)
        .values_list('record_key', 'timestamp')
    )

    return dict(rows)

DeleteStrategy

Bases: Protocol

delete

Source code in django_spire/contrib/sync/django/storage/strategy/delete.py
def delete(
    self,
    model: type[SyncableMixin],
    deletes: dict[str, int],
    origin_node: str,
) -> None: ...

HardDeleteStrategy

Source code in django_spire/contrib/sync/django/storage/strategy/delete.py
def __init__(self, identity_field: str = 'id') -> None:
    self._identity_field = identity_field

delete

Source code in django_spire/contrib/sync/django/storage/strategy/delete.py
def delete(
    self,
    model: type[SyncableMixin],
    deletes: dict[str, int],
    origin_node: str,
) -> None:
    if not deletes:
        return

    from django_spire.contrib.sync.django.sequence import (  # noqa: PLC0415
        SyncSequenceAllocator,
    )

    keys = sorted(deletes.keys())
    sequence_first = SyncSequenceAllocator().allocate(len(keys)).value_first
    model_label = model._meta.label

    with sync_bypass():
        for index, key in enumerate(keys):
            tombstone_timestamp = deletes[key]

            staleness_filter = {
                self._identity_field: key,
                'sync_field_last_modified__lte': tombstone_timestamp,
            }

            model.objects.filter(**staleness_filter).delete()

            _record_tombstone(
                model_label,
                key,
                tombstone_timestamp,
                sequence_first + index,
                origin_node,
            )

SoftDeleteStrategy

Source code in django_spire/contrib/sync/django/storage/strategy/delete.py
def __init__(self, identity_field: str = 'id') -> None:
    self._identity_field = identity_field

delete

Source code in django_spire/contrib/sync/django/storage/strategy/delete.py
def delete(
    self,
    model: type[SyncableMixin],
    deletes: dict[str, int],
    origin_node: str,
) -> None:
    if not deletes:
        return

    from django_spire.contrib.sync.django.sequence import (  # noqa: PLC0415
        SyncSequenceAllocator,
    )

    identity_lookup = {f'{self._identity_field}__in': list(deletes.keys())}
    instances = list(model.objects.filter(**identity_lookup))

    pending = self._collect_pending(instances, deletes)

    if not pending:
        return

    sequence_first = SyncSequenceAllocator().allocate(len(pending)).value_first
    model_label = model._meta.label

    with sync_bypass():
        for index, instance in enumerate(pending):
            key = str(getattr(instance, self._identity_field))
            tombstone_timestamp = deletes[key]
            local_sequence = sequence_first + index

            staleness_filter = {
                self._identity_field: key,
                'sync_field_last_modified__lte': tombstone_timestamp,
            }

            model.objects.filter(**staleness_filter).update(
                is_deleted=True,
                sync_field_last_modified=instance.sync_field_last_modified,
                sync_field_origin_node=origin_node,
                sync_field_sequence=local_sequence,
                sync_field_timestamps=instance.sync_field_timestamps,
            )

            _record_tombstone(
                model_label,
                key,
                tombstone_timestamp,
                local_sequence,
                origin_node,
            )

StalenessGuardedUpsertStrategy

Source code in django_spire/contrib/sync/django/storage/strategy/upsert.py
def __init__(self, identity_field: str = 'id') -> None:
    self._identity_field = identity_field

apply_many

Source code in django_spire/contrib/sync/django/storage/strategy/upsert.py
def apply_many(
    self,
    model: type[SyncableMixin],
    records: dict[str, SyncRecord],
    deserialized: dict[str, dict[str, Any]],
    origin_node: str,
) -> UpsertResult:
    from django_spire.contrib.sync.django.sequence import (  # noqa: PLC0415
        SyncSequenceAllocator,
    )

    skipped: set[str] = set()
    errors: list[Error] = []
    sorted_keys = sorted(records.keys())

    if not sorted_keys:
        return UpsertResult()

    sequence_first = SyncSequenceAllocator().allocate(len(sorted_keys)).value_first

    for index, key in enumerate(sorted_keys):
        sync_record = records[key]
        field_data = deserialized[key]

        if sync_record.sync_field_last_modified == 0:
            message = (
                f'Ghost record for {model._meta.label} key={key}: '
                f'sync_field_last_modified=0 indicates record was '
                f'never properly stamped '
                f'(timestamps={sync_record.timestamps!r}, '
                f'received_at={sync_record.received_at}). '
                f'Run stamp_unstamped_records on the source node '
                f'or check that AppConfig.ready() runs.'
            )

            logger.error(message)
            errors.append(Error(key=key, message=message))
            continue

        ctx = RecordContext(
            model=model,
            key=key,
            sync_record=sync_record,
            field_data=field_data,
            sequence=sequence_first + index,
            origin_node=origin_node,
        )

        if not self._apply_one(ctx):
            skipped.add(key)

    return UpsertResult(skipped=skipped, errors=errors)

UpsertStrategy

Bases: Protocol

apply_many

Source code in django_spire/contrib/sync/django/storage/strategy/upsert.py
def apply_many(
    self,
    model: type[SyncableMixin],
    records: dict[str, SyncRecord],
    deserialized: dict[str, dict[str, Any]],
    origin_node: str,
) -> UpsertResult: ...

DjangoRecordWriter

Source code in django_spire/contrib/sync/django/storage/writer.py
def __init__(
    self,
    models: list[type[SyncableMixin]],
    identity_field: str = 'id',
    batch_size_max: int = _BATCH_SIZE_MAX,
    deferred_foreign_keys: list[DeferredForeignKey] | None = None,
    delete_strategies: dict[str, DeleteStrategy] | None = None,
    many_to_many_applier: ManyToManyApplier | None = None,
    upsert_strategy: UpsertStrategy | None = None,
) -> None:
    if not models:
        message = 'models must not be empty'
        raise InvalidParameterError(message)

    if not identity_field:
        message = 'identity_field must be a non-empty string'
        raise InvalidParameterError(message)

    if batch_size_max < 1:
        message = (
            f'batch_size_max must be >= 1, '
            f'got {batch_size_max}'
        )

        raise InvalidParameterError(message)

    self._batch_size_max = batch_size_max
    self._identity_field = identity_field

    self._models: dict[str, type[SyncableMixin]] = {
        model._meta.label: model
        for model in models
    }

    self._serializers: dict[str, SyncFieldSerializer] = {
        model._meta.label: SyncFieldSerializer(model)
        for model in models
    }

    self._upsert_strategy = (
        upsert_strategy
        or BulkUpsertStrategy(
            identity_field=identity_field,
        )
    )

    self._many_to_many_applier = (
        many_to_many_applier
        or ManyToManyApplier(
            identity_field=identity_field,
        )
    )

    self._delete_strategies = (
        delete_strategies or
        self._build_delete_strategies(models)
    )

    self._deferred_attribute_names: dict[str, set[str]] = (
        defaultdict(set)
    )

    self._external_nullable_attribute_names: dict[str, set[str]] = (
        defaultdict(set)
    )

    syncable_labels = set(self._models.keys())

    for deferred_foreign_key in (deferred_foreign_keys or []):
        if deferred_foreign_key.target_label in syncable_labels:
            self._deferred_attribute_names[deferred_foreign_key.source_label].add(
                deferred_foreign_key.attribute_name,
            )
        else:
            self._external_nullable_attribute_names[deferred_foreign_key.source_label].add(
                deferred_foreign_key.attribute_name,
            )

clear_tombstones

Source code in django_spire/contrib/sync/django/storage/writer.py
def clear_tombstones(
    self,
    model_label: str,
    keys: set[str],
) -> None:
    if not keys:
        return

    from django_spire.contrib.sync.django.models.tombstone import SyncTombstone  # noqa: PLC0415

    SyncTombstone.objects.filter(
        model_label=model_label,
        record_key__in=keys,
    ).delete()

delete_many

Source code in django_spire/contrib/sync/django/storage/writer.py
def delete_many(
    self,
    model_label: str,
    deletes: dict[str, int],
    origin_node: str,
) -> None:
    if not deletes:
        return

    model = self._get_model(model_label)
    strategy = self._delete_strategies[model_label]
    keys = list(deletes.keys())

    for start in range(0, len(keys), self._batch_size_max):
        chunk_keys = keys[start:start + self._batch_size_max]
        chunk = {key: deletes[key] for key in chunk_keys}

        with transaction.atomic():
            strategy.delete(model, chunk, origin_node)

flush_deferred_backfill

Source code in django_spire/contrib/sync/django/storage/writer.py
def flush_deferred_backfill(self) -> None:
    from django_spire.contrib.sync.django.models.deferred_backfill import (  # noqa: PLC0415
        SyncDeferredBackfill,
    )

    rows = list(SyncDeferredBackfill.objects.all())

    if not rows:
        return

    by_label: dict[str, list[SyncDeferredBackfill]] = defaultdict(list)

    for row in rows:
        by_label[row.model_label].append(row)

    for model_label, label_rows in by_label.items():
        if model_label not in self._models:
            logger.warning(
                'Skipping %d deferred backfill row(s) for '
                'unknown model %s',
                len(label_rows),
                model_label,
            )

            continue

        model = self._get_model(model_label)

        key_columns: dict[str, dict[str, Any]] = {}

        for row in label_rows:
            key_columns.setdefault(row.record_key, {})[row.attname] = row.fk_value

        still_pending = self._run_backfill(model, key_columns)

        ids_to_delete: list[int] = []

        for row in label_rows:
            still_pending_for_key = still_pending.get(row.record_key, {})

            if row.attname not in still_pending_for_key:
                ids_to_delete.append(row.id)

        if ids_to_delete:
            SyncDeferredBackfill.objects.filter(
                id__in=ids_to_delete,
            ).delete()

        still_count = sum(
            len(values) for values in still_pending.values()
        )

        if still_count:
            logger.debug(
                '%d deferred FK backfill(s) still pending '
                'for %s (targets not yet present)',
                still_count,
                model_label,
            )

stamp_unstamped_records

Source code in django_spire/contrib/sync/django/storage/writer.py
def stamp_unstamped_records(
    self,
    clock: HybridLogicalClock,
    model_order: list[str] | None = None,
) -> int:
    from django_spire.contrib.sync.django.queryset import sync_bypass  # noqa: PLC0415
    from django_spire.contrib.sync.django.sequence import (  # noqa: PLC0415
        SyncSequenceAllocator,
    )

    total = 0
    labels = model_order or sorted(self._models.keys())

    self._ensure_sync_columns(labels)
    self._reconcile_counter(labels)

    with sync_bypass():
        for model_label in labels:
            if model_label not in self._models:
                continue

            model = self._models[model_label]

            connection = connections[
                router.db_for_write(model) or
                'default'
            ]

            quote = connection.ops.quote_name
            table = quote(model._meta.db_table)

            id_field = model._meta.get_field(self._identity_field)

            id_column = quote(id_field.column)

            with connection.cursor() as cursor:
                cursor.execute(
                    self._build_zero_field_count_sql(
                        table,
                        'sync_field_sequence',
                    ),
                )

                sequence_count = cursor.fetchone()[0]

                cursor.execute(
                    self._build_zero_field_count_sql(
                        table,
                        'sync_field_last_modified',
                    ),
                )

                timestamp_count = cursor.fetchone()[0]

            if sequence_count == 0 and timestamp_count == 0:
                continue

            stamp_timestamp = clock.now()

            stamp_field_names = (
                list(model.get_syncable_field_names()) +
                list(model.get_syncable_many_to_many_names())
            )

            stamp_timestamps_json = json.dumps(
                dict.fromkeys(
                    stamp_field_names,
                    stamp_timestamp
                )
            )

            sequence_first = 0

            with transaction.atomic():
                if sequence_count > 0:
                    sequence_first = SyncSequenceAllocator().allocate(sequence_count).value_first

                    with connection.cursor() as cursor:
                        cursor.execute(
                            self._build_assign_sequence_sql(
                                table,
                                id_column,
                            ),
                            [sequence_first],
                        )

                if timestamp_count > 0:
                    with connection.cursor() as cursor:
                        cursor.execute(
                            self._build_stamp_modified_sql(table),
                            [stamp_timestamp, stamp_timestamps_json],
                        )

            stamped = max(sequence_count, timestamp_count)

            logger.info(
                'Stamped %s in %s '
                '(%d sequences from %d, %d timestamps, ts=%d)',
                stamped,
                model_label,
                sequence_count,
                sequence_first,
                timestamp_count,
                stamp_timestamp,
            )

            total += stamped

    return total

upsert_many

Source code in django_spire/contrib/sync/django/storage/writer.py
def upsert_many(
    self,
    model_label: str,
    records: dict[str, SyncRecord],
    origin_node: str,
) -> UpsertResult:
    if not records:
        return UpsertResult()

    model = self._get_model(model_label)
    many_to_many_names = self._get_many_to_many_names(model)
    serializer = self._serializers[model_label]

    skipped: set[str] = set()
    errors: list[Error] = []
    keys = sorted(records.keys())

    for start in range(0, len(keys), self._batch_size_max):
        chunk_keys = keys[start:start + self._batch_size_max]

        chunk_result = self._upsert_chunk(
            model,
            records,
            chunk_keys,
            many_to_many_names,
            serializer,
            origin_node,
        )

        skipped |= chunk_result.skipped
        errors.extend(chunk_result.errors)

    return UpsertResult(skipped=skipped, errors=errors)