Skip to content

upsert

django_spire.contrib.sync.django.storage.strategy.upsert

logger = logging.getLogger(__name__) module-attribute

UpsertStrategy

Bases: Protocol

apply_many

Source code in django_spire/contrib/sync/django/storage/strategy/upsert.py
def apply_many(
    self,
    model: type[SyncableMixin],
    records: dict[str, SyncRecord],
    deserialized: dict[str, dict[str, Any]],
    origin_node: str,
) -> UpsertResult: ...

BulkUpsertStrategy

Source code in django_spire/contrib/sync/django/storage/strategy/upsert.py
def __init__(self, identity_field: str = 'id') -> None:
    self._identity_field = identity_field

apply_many

Source code in django_spire/contrib/sync/django/storage/strategy/upsert.py
def apply_many(
    self,
    model: type[SyncableMixin],
    records: dict[str, SyncRecord],
    deserialized: dict[str, dict[str, Any]],
    origin_node: str,
) -> UpsertResult:
    if not records:
        return UpsertResult()

    connection = self._connection(model)

    if connection.vendor not in _SUPPORTED_VENDORS:
        message = (
            f'BulkUpsertStrategy requires postgresql or sqlite, '
            f'got {connection.vendor!r}'
        )

        raise NotImplementedError(message)

    model_label = model._meta.label
    writable, errors = self._validate_records(model_label, records)

    if not writable:
        return UpsertResult(skipped=set(), errors=errors)

    from django_spire.contrib.sync.django.sequence import (  # noqa: PLC0415
        SyncSequenceAllocator,
    )

    fields = self._writable_fields(model)
    sorted_keys = sorted(writable.keys())

    sequence_first = SyncSequenceAllocator().allocate(len(sorted_keys)).value_first

    instances = [
        self._build_instance(RecordContext(
            model=model,
            key=key,
            sync_record=writable[key],
            field_data=deserialized[key],
            sequence=sequence_first + index,
            origin_node=origin_node,
        ))
        for index, key in enumerate(sorted_keys)
    ]

    identity = model._meta.get_field(self._identity_field)
    rows_per_batch = self._rows_per_batch(len(fields))
    applied: set[str] = set()

    with sync_bypass(), connection.cursor() as cursor:
        for offset in range(0, len(instances), rows_per_batch):
            batch = instances[offset:offset + rows_per_batch]

            params: list[Any] = []

            for instance in batch:
                params.extend(
                    self._build_row_params(
                        instance,
                        fields,
                        connection,
                    ),
                )

            sql = self._build_upsert_sql(
                model,
                fields,
                connection,
                len(batch),
            )

            cursor.execute(sql, params)

            applied.update(
                str(identity.to_python(row[0]))
                for row in cursor.fetchall()
            )

    skipped: set[str] = set()

    for key in writable:
        if key not in applied:
            skipped.add(key)

    return UpsertResult(skipped=skipped, errors=errors)

StalenessGuardedUpsertStrategy

Source code in django_spire/contrib/sync/django/storage/strategy/upsert.py
def __init__(self, identity_field: str = 'id') -> None:
    self._identity_field = identity_field

apply_many

Source code in django_spire/contrib/sync/django/storage/strategy/upsert.py
def apply_many(
    self,
    model: type[SyncableMixin],
    records: dict[str, SyncRecord],
    deserialized: dict[str, dict[str, Any]],
    origin_node: str,
) -> UpsertResult:
    from django_spire.contrib.sync.django.sequence import (  # noqa: PLC0415
        SyncSequenceAllocator,
    )

    skipped: set[str] = set()
    errors: list[Error] = []
    sorted_keys = sorted(records.keys())

    if not sorted_keys:
        return UpsertResult()

    sequence_first = SyncSequenceAllocator().allocate(len(sorted_keys)).value_first

    for index, key in enumerate(sorted_keys):
        sync_record = records[key]
        field_data = deserialized[key]

        if sync_record.sync_field_last_modified == 0:
            message = (
                f'Ghost record for {model._meta.label} key={key}: '
                f'sync_field_last_modified=0 indicates record was '
                f'never properly stamped '
                f'(timestamps={sync_record.timestamps!r}, '
                f'received_at={sync_record.received_at}). '
                f'Run stamp_unstamped_records on the source node '
                f'or check that AppConfig.ready() runs.'
            )

            logger.error(message)
            errors.append(Error(key=key, message=message))
            continue

        ctx = RecordContext(
            model=model,
            key=key,
            sync_record=sync_record,
            field_data=field_data,
            sequence=sequence_first + index,
            origin_node=origin_node,
        )

        if not self._apply_one(ctx):
            skipped.add(key)

    return UpsertResult(skipped=skipped, errors=errors)