Source code for kataglyphis_webdavclient.webdavclient

"""WebDAV client implementation for listing and downloading remote files."""

import urllib.parse
from pathlib import Path, PurePosixPath

import requests
from defusedxml import ElementTree
from loguru import logger
from requests.auth import HTTPBasicAuth


REQUEST_TIMEOUT_SECONDS = 30


def _join_remote_url(*parts: str) -> str:
    cleaned_parts = [part.strip("/") for part in parts if part]
    return "/".join(cleaned_parts)


[docs] class WebDavClient: """A simple WebDav client for downloading files and folders. It supports listing folders, listing files, and iterative downloads from a remote host (for example a cloud provider). Attributes: hostname (str) : full address to webdav host username (str) : username of connection password (str) : most properly a token generated for AUTH Methods: download_all_files_iterative(a,b): Downloads all files from a and stores them under b locally. """ def __init__(self, hostname: str, username: str, password: str) -> None: """Initialize a client with credentials and logging output folder.""" self.hostname: str = hostname self.username: str = username self.password: str = password self.auth: HTTPBasicAuth = HTTPBasicAuth( username, password, ) self.ensure_folder_exists("logs") logger.add("logs/downloadMd_s.log", rotation="500 MB")
[docs] def list_files(self, url: str) -> list[str]: """List all files directly below a given WebDAV URL. Args: url (str) : web dev host url. Returns: type: list[str] list of all files who stay under the url (no recursion) Raises: OSError Examples: Example usage of the method: >>> hostname = "https://yourhost.de/webdav" >>> username = "Schlawiner23" >>> password = "YOUR_PERSONAL_TOKEN" >>> remote_base_path = "MyProjectFolder" >>> auth: HTTPBasicAuth = HTTPBasicAuth(username, password) >>> webdevclient = WebDavClient(hostname, username, password) >>> files = webdevclient.list_files( ... os.path.join(hostname, remote_base_path) ... ) """ headers: dict[str, str] = {"Content-Type": "application/xml", "Depth": "1"} response: requests.Response = requests.request( "PROPFIND", url, auth=self.auth, headers=headers, timeout=REQUEST_TIMEOUT_SECONDS, ) if response.status_code != 207: error_message: str = "Failed to list directory contents via requests: " logger.error( "Error message {} with code {}", error_message, response.status_code ) error_details = f"{error_message}{response.status_code}" raise OSError(error_details) tree = ElementTree.fromstring(response.content) files = [] for response in tree.findall("{DAV:}response"): href = response.find("{DAV:}href").text if not href.endswith("/"): logger.debug("Found file: {} for the following url: {}", href, url) files.append(href) return files
[docs] def list_folders(self, remote_base_path: str) -> list[str]: """List all folders directly below a remote base path. This method list all folders from your WebDav host that stay EXACTLY under the remote_base_path. No subfolders are considered. Args: remote_base_path (str) : Folder on host for which the folders should be listed Returns: type: list[str] list of all folders who stay under the parent folder Raises: OSError Examples: Example usage of the method: >>> hostname = "https://yourhost.de/webdav" >>> username = "Schlawiner23" >>> password = "YOUR_PERSONAL_TOKEN" >>> remote_base_path = "MyProjectFolder" >>> webdevclient = WebDavClient(args.hostname, args.username, args.password) >>> webdevclient.list_folders(remote_base_path) """ headers = {"Content-Type": "application/xml", "Depth": "1"} url: str = _join_remote_url(self.hostname, remote_base_path) response = requests.request( "PROPFIND", url, auth=self.auth, headers=headers, timeout=REQUEST_TIMEOUT_SECONDS, ) if response.status_code != 207: error_message = "Failed to list directory contents: " logger.error("{} {}", error_message, response.status_code) error_details = f"{error_message}{response.status_code}" raise OSError(error_details) tree = ElementTree.fromstring(response.content) folders = [] for response in tree.findall("{DAV:}response"): href = response.find("{DAV:}href").text folder = PurePosixPath(href.rstrip("/")).name is_folder = href.endswith("/") is_not_remote_base_path = ( href != url + "/" ) and folder != remote_base_path.rsplit("/", maxsplit=1)[-1] is_hidden_folder = folder.startswith(".") if is_folder and is_not_remote_base_path and not is_hidden_folder: logger.debug( "Found folder: {} in the parent folder: {}", folder, remote_base_path, ) folders.append(folder) return folders
[docs] def filter_after_global_base_path(self, path: str, remote_base_path: str) -> str: """Remove hostname and base path prefix from a remote URL path. Args: path (str) : Url to host, e.g. https://host.org remote_base_path (str): single folder on remote host e.g. data Returns: type: str Raises: None directly Example: host-url= https://host.org/ remote_base_path = data path = https://host.org/data/example1 "example1" is returned """ search_str = "/" + remote_base_path + "/" if search_str in path: logger.debug( "Found folder {} for path {} and remote base path: {}", search_str, path, remote_base_path, ) url_after_removing_everything_before_and_including_remote_base_name = ( path.split(search_str, 1)[1] ) logger.info( "Folder structure everything after the remote_base_path is: {}", url_after_removing_everything_before_and_including_remote_base_name, ) return url_after_removing_everything_before_and_including_remote_base_name logger.error("Could not find search string: {} in path: {}", search_str, path) return path
[docs] def ensure_folder_exists(self, path: str) -> None: """Ensure that the given folder exists. Args: path (str) : Path to folder. Returns: type: None Raises: None directly """ folder_path = Path(path) if not folder_path.exists(): folder_path.mkdir(parents=True, exist_ok=True) logger.debug("Folder created: {}", path) else: logger.debug("Folder already exists: {}", path)
[docs] def get_sub_path(self, full_path: str, initial_part: str) -> str: """Returns the sub-path after the initial part of the path. Args: full_path (str): The full path string. Does NOT have host url within initial_part (str): The initial part of the path string to be removed. Returns: type: str: The sub-path string after the initial part. Example 1: full_path = /data/subfolder1/text.txt initial_part (str) = data returns ==> subfolder1/text.txt Raises: ValueError """ # Decode URL-encoded parts of the path logger.debug("We are in the 'get_sub_path' method.") decoded_full_path = urllib.parse.unquote(full_path) logger.debug("The decoded full file path is: {}", decoded_full_path) decoded_initial_part = urllib.parse.unquote(initial_part) logger.debug("The decoded initial file path is: {}", decoded_initial_part) # Ensure the initial part ends with a slash # removes weird edge cases for later processing if not decoded_initial_part.endswith("/"): decoded_initial_part += "/" # Find the position where the initial part ends in the full path start_idx = decoded_full_path.find(decoded_initial_part) if start_idx == -1: logger.error( "The {} string is not in the full_path={}", initial_part, full_path ) error_details = "The full path does not contain the initial part." raise ValueError(error_details) # Handle the edge case where the full path is exactly the initial part decoded_initial_part = "/" + decoded_initial_part if full_path == decoded_initial_part.rstrip("/"): logger.debug("The get_sub_path() method returns empty string") return "" # Remove the initial part from the full path if full_path.startswith(initial_part): logger.debug( "The get_sub_path() method returns {}", full_path[len(initial_part) :] ) return full_path[len(initial_part) :] # Calculate the start index of the sub-path sub_path_start_idx = start_idx + len(decoded_initial_part) - 1 # Extract the sub-path sub_path = decoded_full_path[sub_path_start_idx:] logger.debug("The get_sub_path() method returns {}", sub_path) return sub_path
[docs] def download_files( self, global_remote_base_path: str, remote_base_path: str, local_base_path: str, ) -> None: """Download all files directly below a remote base path. This method downloads all files from your WebDav host that stay EXACTLY under the remote_base_path. No subfolders are considered. Args: global_remote_base_path (str): Root folder that anchors relative paths. remote_base_path (str): Folder on host which should be primary source for downloading files local_base_path (str) : all files (with preserved folder structures) are put inside this local path Returns: type: None Raises: None directly Examples: Example usage of the method: >>> hostname = "https://yourhost.de/webdav" >>> username = "Schlawiner23" >>> password = "YOUR_PERSONAL_TOKEN" >>> remote_base_path = "MyProjectFolder" >>> local_base_path = "assets" >>> auth: HTTPBasicAuth = HTTPBasicAuth(username, password) >>> download_files(hostname, auth, current_remote_path, local_base_path) """ if not Path(local_base_path).exists(): logger.info("Dir {} will be created", local_base_path) Path(local_base_path).mkdir(parents=True, exist_ok=True) url = _join_remote_url(self.hostname, remote_base_path) files_on_host = self.list_files(url) if len(files_on_host) == 0: logger.info("Found no files on remote_base_path: {}", remote_base_path) for file_path in files_on_host: logger.info("Found the file: {} on current remote_base_path", file_path) file_name = self.filter_after_global_base_path(file_path, remote_base_path) logger.info("The pure of filename of this file is: {}", file_name) # Decoding the URL-encoded string decoded_filename = urllib.parse.unquote(file_name) logger.info("The decoded filename version is: {}", decoded_filename) remote_file_url = _join_remote_url( self.hostname, remote_base_path, file_name, # file_path.split("/")[-1] ) logger.info("The remote file url is: {}", remote_file_url) sub_path = self.get_sub_path(file_path, global_remote_base_path) if sub_path.endswith(decoded_filename): sub_path = sub_path[: len(sub_path) - len(decoded_filename)] if sub_path == decoded_filename: sub_path = "" logger.debug("The current sub path is: {}", sub_path) local_file_path = Path(local_base_path) / sub_path / decoded_filename logger.debug( "The current file that is stored has the full path: {}", local_file_path ) response = requests.get( remote_file_url, auth=self.auth, stream=True, timeout=REQUEST_TIMEOUT_SECONDS, ) if response.status_code == 200: folder_path = local_file_path.parent self.ensure_folder_exists(str(folder_path)) with local_file_path.open("wb") as f: f.writelines(response.iter_content(chunk_size=8192)) else: logger.debug( "Failed to download {}: {}", remote_file_url, response.status_code )
[docs] def download_all_files_iterative( self, remote_base_path: str, local_base_path: str, ) -> None: """Download all files recursively below a remote base path. This method downloads all files from your WebDav host that stay under the remote_base_path. All subfolders will also be downloaded and folder structure is preserved. Args: remote_base_path (str): Folder on host which should be primary source for downloading files local_base_path (str) : all files (with preserved folder structures) are put inside this local path Returns: type: None Raises: None directly Examples: Example usage of the method: >>> hostname = "https://yourhost.de/webdav" >>> username = "Schlawiner23" >>> password = "YOUR_PERSONAL_TOKEN" >>> remote_base_path = "MyProjectFolder" >>> local_base_path = "assets" >>> webdevclient = WebDavClient(args.hostname, args.username, args.password) >>> webdevclient.download_all_files_iterative( >>> args.remote_base_path, args.local_base_path >>> ) """ # Initialize the stack with the root directory stack: list[str] = [remote_base_path] global_remote_base_path: str = remote_base_path while stack: current_remote_path: str = stack.pop() logger.debug("Current remote path is: {}", current_remote_path) # Download files in the current directory self.download_files( global_remote_base_path, current_remote_path, local_base_path, ) # List all folders in the current remote path folders: list[str] = self.list_folders(current_remote_path) if len(folders) == 0: logger.info( "Found no subfolders for current folder: {}", current_remote_path ) # Add each subfolder to the stack for folder in folders: logger.info( "Found subfolder {} for current folder: {}.", folder, current_remote_path, ) relative_folder_path: str = str( PurePosixPath(current_remote_path) / folder ) stack.append(relative_folder_path)