Skip to content

mixins

django_spire.contrib.sync.file.mixins

logger = logging.getLogger(__name__) module-attribute

FileSyncServiceMixin

sync_config instance-attribute

sync_reader = None class-attribute instance-attribute

sync_writer = None class-attribute instance-attribute

build_storage

Source code in django_spire/contrib/sync/file/mixins.py
def build_storage(
    self,
    scope: Any,
    baseline_path: Path | None = None,
) -> DjangoModelStorage:
    return DjangoModelStorage(
        model_label=self.sync_config.model_label,
        identity_field=self.sync_config.identity_field,
        sync_fields=self.sync_config.field_keys,
        scope_field=self.sync_config.scope_field,
        scope=scope,
        baseline_path=baseline_path,
        timestamp_field=self.sync_config.timestamp_field,
    )

build_unidirectional_engine

Source code in django_spire/contrib/sync/file/mixins.py
def build_unidirectional_engine(
    self, storage: DjangoModelStorage,
) -> Engine:
    return Engine(
        storage=storage,
        identity_field=self.sync_config.identity_field,
        deactivation_threshold=self.sync_config.deactivation_threshold,
        transaction=transaction.atomic,
    )

build_bidirectional_engine

Source code in django_spire/contrib/sync/file/mixins.py
def build_bidirectional_engine(
    self, storage: DjangoModelStorage,
) -> BidirectionalEngine:
    return BidirectionalEngine(
        storage=storage,
        identity_field=self.sync_config.identity_field,
        conflict_strategy=self.sync_config.conflict_strategy,
        deactivation_threshold=self.sync_config.deactivation_threshold,
        transaction=transaction.atomic,
    )

sync_bidirectional

Source code in django_spire/contrib/sync/file/mixins.py
def sync_bidirectional(
    self, scope: Any, directory: Path,
) -> BidirectionalResult:
    self._validate_io(require_writer=True)

    path = self._resolve_path(directory)

    logger.info('Starting bidirectional sync from %s', path)

    storage = self.build_storage(
        scope=scope,
        baseline_path=directory / 'baseline_hashes.json',
    )

    engine = self.build_bidirectional_engine(storage)

    return engine.sync(
        path,
        reader=self.sync_reader,
        writer=self.sync_writer,
    )

sync_unidirectional

Source code in django_spire/contrib/sync/file/mixins.py
def sync_unidirectional(
    self, scope: Any, directory: Path,
) -> Result:
    self._validate_io()

    path = self._resolve_path(directory)

    logger.info('Starting unidirectional sync from %s', path)

    storage = self.build_storage(scope=scope)
    engine = self.build_unidirectional_engine(storage)

    return engine.sync(path, reader=self.sync_reader)