Skip to content

tasks

django_spire.contrib.progress.tasks

ParallelTask

Source code in django_spire/contrib/progress/tasks.py
def __init__(self, session: ProgressSession, task_id: str, future: Any) -> None:
    self._future = future
    self._session = session
    self._task_id = task_id

    self._session.start(task_id)

    thread = threading.Thread(
        target=self._wait_for_completion,
        daemon=True,
    )
    thread.start()

result property

SequentialTask

Source code in django_spire/contrib/progress/tasks.py
def __init__(
    self,
    session: ProgressSession,
    task_id: str,
    func: Callable,
    *args: Any,
    **kwargs: Any,
) -> None:
    self._exception: Exception | None = None
    self._result: Any = None
    self._session = session
    self._task_id = task_id

    self._session.start(task_id)

    try:
        self._result = func(*args, **kwargs)
        self._session.complete(task_id)
    except Exception as e:
        self._exception = e
        self._session.error(task_id)

result property