Skip to content

tracker

django_spire.contrib.progress.tracker

log = logging.getLogger(__name__) module-attribute

ProgressTracker

Source code in django_spire/contrib/progress/tracker.py
def __init__(self, key: str, timeout: int = 300) -> None:
    self._key = f'progress_tracker_{key}'
    self._lock = threading.Lock()
    self._tasks: list[Task] = []
    self._timeout = timeout

clear

Source code in django_spire/contrib/progress/tracker.py
def clear(self) -> None:
    cache.delete(self._key)

complete

Source code in django_spire/contrib/progress/tracker.py
def complete(self, message: str = 'Complete!') -> None:
    with self._lock:
        state = self._get_state()
        state.message = message
        state.progress = 100
        state.status = ProgressStatus.COMPLETE
        self._save_state(state)

error

Source code in django_spire/contrib/progress/tracker.py
def error(self, message: str) -> None:
    with self._lock:
        state = self._get_state()
        state.message = message
        state.progress = 0
        state.status = ProgressStatus.ERROR
        self._save_state(state)

execute

Source code in django_spire/contrib/progress/tracker.py
def execute(self) -> dict[str, Any] | None:
    self._create_state()

    error = False
    results: dict[str, TaskResult] = {}

    try:
        error = self._execute_parallel_tasks(results)

        if not error:
            error = self._execute_sequential_tasks(results)

        if not error:
            self.complete()
            time.sleep(1)
    except BaseException:
        log.exception('Progress tracker failed')
        error = True

    if error:
        self.error('An unexpected error occurred')
        time.sleep(2)

    self.clear()

    if error:
        return None

    return {name: result.value for name, result in results.items()}

get

Source code in django_spire/contrib/progress/tracker.py
def get(self) -> dict[str, Any] | None:
    return cache.get(self._key)

parallel

Source code in django_spire/contrib/progress/tracker.py
def parallel(
    self,
    name: str,
    func: Callable[[], Any],
    label: str | None = None
) -> ProgressTracker:
    task = Task(
        func=func,
        label=label or self._generate_label(name),
        name=name,
        parallel=True,
    )

    self._tasks.append(task)

    return self

sequential

Source code in django_spire/contrib/progress/tracker.py
def sequential(
    self,
    name: str,
    func: Callable[[dict[str, Any]], Any],
    label: str | None = None
) -> ProgressTracker:
    task = Task(
        func=func,
        label=label or self._generate_label(name),
        name=name,
        parallel=False,
    )

    self._tasks.append(task)

    return self

start

Source code in django_spire/contrib/progress/tracker.py
def start(self) -> None:
    state = TrackerState()
    cache.set(self._key, state.to_dict(), timeout=self._timeout)

update

Source code in django_spire/contrib/progress/tracker.py
def update(
    self,
    status: ProgressStatus,
    message: str,
    progress: int,
    **kwargs: Any
) -> None:
    with self._lock:
        state = self._get_state()
        state.message = message
        state.progress = progress
        state.status = status
        self._save_state(state, **kwargs)