Skip to content

sftp

django_spire.contrib.sync.file.source.sftp

logger = logging.getLogger(__name__) module-attribute

DEFAULT_TIMEOUT = 30 module-attribute

SFTPSource

Bases: Source

Source code in django_spire/contrib/sync/file/source/sftp.py
def __init__(
    self,
    host: str,
    port: int,
    username: str,
    password: str | None = None,
    key_path: str | None = None,
    key_data: str | None = None,
    key_passphrase: str | None = None,
    timeout: float = DEFAULT_TIMEOUT,
    retries: int = 3,
    retry_delay: float = 1.0,
) -> None:
    self._host = host
    self._port = port
    self._username = username
    self._password = password
    self._key_path = key_path
    self._key_data = key_data
    self._key_passphrase = key_passphrase
    self._timeout = timeout
    self._retries = retries
    self._retry_delay = retry_delay
    self._transport: paramiko.Transport | None = None
    self._sftp: paramiko.SFTPClient | None = None

connect

Source code in django_spire/contrib/sync/file/source/sftp.py
def connect(self) -> None:
    sock = (self._host, self._port)
    self._transport = paramiko.Transport(sock)

    if self._key_path or self._key_data:
        key = self._load_key()
        self._transport.connect(username=self._username, pkey=key)
    else:
        self._transport.connect(
            username=self._username,
            password=self._password,
        )

    self._transport.set_keepalive(int(self._timeout))
    self._sftp = paramiko.SFTPClient.from_transport(self._transport)
    self._sftp.get_channel().settimeout(self._timeout)

    logger.info('Connected to SFTP %s:%d', self._host, self._port)

close

Source code in django_spire/contrib/sync/file/source/sftp.py
def close(self) -> None:
    if self._sftp:
        self._sftp.close()

    if self._transport:
        self._transport.close()

    self._sftp = None
    self._transport = None

    logger.info('Disconnected from SFTP')

__enter__

Source code in django_spire/contrib/sync/file/source/sftp.py
def __enter__(self) -> Self:
    self.connect()
    return self

__exit__

Source code in django_spire/contrib/sync/file/source/sftp.py
def __exit__(self, *exc: object) -> None:
    self.close()

download

Source code in django_spire/contrib/sync/file/source/sftp.py
def download(
    self,
    remote_path: str,
    local_path: Path,
    callback: Callable[[int, int], None] | None = None,
) -> None:
    sftp = self._require_connection()
    local_path.parent.mkdir(parents=True, exist_ok=True)

    retry(
        lambda: sftp.get(remote_path, str(local_path), callback=callback),
        attempts=self._retries,
        delay=self._retry_delay,
        exceptions=(IOError, paramiko.SSHException),
    )

    logger.info('Downloaded %s -> %s', remote_path, local_path)

list_dir

Source code in django_spire/contrib/sync/file/source/sftp.py
def list_dir(self, remote_path: str) -> list[str]:
    sftp = self._require_connection()
    return sftp.listdir(remote_path)

open

Source code in django_spire/contrib/sync/file/source/sftp.py
def open(self, remote_path: str, mode: str = 'rb') -> paramiko.SFTPFile:
    sftp = self._require_connection()
    return sftp.open(remote_path, mode)

upload

Source code in django_spire/contrib/sync/file/source/sftp.py
def upload(
    self,
    local_path: Path,
    remote_path: str,
    callback: Callable[[int, int], None] | None = None,
) -> None:
    sftp = self._require_connection()

    retry(
        lambda: sftp.put(str(local_path), remote_path, callback=callback),
        attempts=self._retries,
        delay=self._retry_delay,
        exceptions=(IOError, paramiko.SSHException),
    )

    logger.info('Uploaded %s -> %s', local_path, remote_path)