Skip to content

upsert

django_spire.contrib.sync.django.storage.strategy.upsert

logger = logging.getLogger(__name__) module-attribute

UpsertStrategy

Bases: Protocol

apply_many

Source code in django_spire/contrib/sync/django/storage/strategy/upsert.py
def apply_many(
    self,
    model: type[SyncableMixin],
    records: dict[str, SyncRecord],
    deserialized: dict[str, dict[str, Any]],
) -> set[str]: ...

BulkUpsertStrategy

Source code in django_spire/contrib/sync/django/storage/strategy/upsert.py
def __init__(self, identity_field: str = 'id') -> None:
    self._identity_field = identity_field

apply_many

Source code in django_spire/contrib/sync/django/storage/strategy/upsert.py
def apply_many(
    self,
    model: type[SyncableMixin],
    records: dict[str, SyncRecord],
    deserialized: dict[str, dict[str, Any]],
) -> set[str]:
    if not records:
        return set()

    connection = self._connection(model)

    if connection.vendor not in _SUPPORTED_VENDORS:
        message = (
            f'BulkUpsertStrategy requires postgresql or sqlite, '
            f'got {connection.vendor!r}'
        )

        raise NotImplementedError(message)

    writable, skipped = self._filter_ghosts(records)

    if not writable:
        return skipped

    fields = self._writable_fields(model)
    sorted_keys = sorted(writable.keys())

    instances = [
        self._build_instance(
            model,
            key,
            writable[key],
            deserialized[key],
        )
        for key in sorted_keys
    ]

    identity = model._meta.get_field(self._identity_field)
    rows_per_batch = self._rows_per_batch(len(fields))
    applied: set[str] = set()

    with sync_bypass(), connection.cursor() as cursor:
        for offset in range(0, len(instances), rows_per_batch):
            batch = instances[offset:offset + rows_per_batch]

            params: list[Any] = []

            for instance in batch:
                params.extend(
                    self._build_row_params(
                        instance,
                        fields,
                        connection,
                    ),
                )

            sql = self._build_sql(
                model,
                fields,
                connection,
                len(batch),
            )

            cursor.execute(sql, params)

            applied.update(
                str(identity.to_python(row[0]))
                for row in cursor.fetchall()
            )

    for key in writable:
        if key not in applied:
            skipped.add(key)

    return skipped

StalenessGuardedUpsertStrategy

Source code in django_spire/contrib/sync/django/storage/strategy/upsert.py
def __init__(self, identity_field: str = 'id') -> None:
    self._identity_field = identity_field

apply_many

Source code in django_spire/contrib/sync/django/storage/strategy/upsert.py
def apply_many(
    self,
    model: type[SyncableMixin],
    records: dict[str, SyncRecord],
    deserialized: dict[str, dict[str, Any]],
) -> set[str]:
    skipped: set[str] = set()

    for key in sorted(records.keys()):
        sync_record = records[key]
        field_data = deserialized[key]

        applied = self._apply_one(
            model,
            key,
            sync_record,
            field_data,
        )

        if not applied:
            skipped.add(key)

    return skipped