import os
from loguru import logger
import urllib.parse
from xml.etree import ElementTree
import requests
from requests.auth import HTTPBasicAuth
[docs]
class WebDavClient:
"""
A simple WebDav client for downloading files and complete folder
hierarchies from a remote host (f.e. cloud provider).
Attributes:
hostname (str) : full address to webdav host
username (str) : username of connection
password (str) : most properly a token generated for AUTH
Methods:
download_all_files_iterative(a,b): Downloads all files from a and stroes
them under b locally.
subtract(a, b): Returns the difference between a and b.
"""
def __init__(self, hostname: str, username: str, password: str) -> None:
self.hostname: str = hostname
self.username: str = username
self.password: str = password
self.auth: HTTPBasicAuth = HTTPBasicAuth(
username,
password,
)
self.ensure_folder_exists("logs")
logger.add("logs/downloadMd_s.log", rotation="500 MB")
[docs]
def list_files(self, url: str) -> list[str]:
"""
This method list all files from your WebDav host that stay under the
url
Args:
url (str) : web dev host url.
Returns:
type: list[str]
list of all files who stay under the url (no recursion)
Raises:
OSError
Examples:
Example usage of the method:
>>> hostname = "https://yourhost.de/webdav"
>>> username = "Schlawiner23"
>>> password = "YOUR_PERSONAL_TOKEN"
>>> remote_base_path = "MyProjectFolder"
>>> auth: HTTPBasicAuth = HTTPBasicAuth(username, password)
>>> webdevclient = WebDavClient(hostname, username, password)
>>> files = webdevclient.list_files(os.path.join(hostname, remote_base_path))
"""
headers: dict[str, str] = {"Content-Type": "application/xml", "Depth": "1"}
response: requests.Response = requests.request(
"PROPFIND", url, auth=self.auth, headers=headers
)
if response.status_code != 207:
error_message: str = "Failed to list directory contents via requests: "
logger.error(
"Error message {} with code {}", error_message, response.status_code
)
raise OSError(f"{error_message}{response.status_code}")
tree = ElementTree.fromstring(response.content)
files = []
for response in tree.findall("{DAV:}response"):
href = response.find("{DAV:}href").text
if not href.endswith("/"):
logger.debug("Found file: {} for the following url: {}", href, url)
files.append(href)
return files
[docs]
def list_folders(self, remote_base_path: str) -> list[str]:
"""
This method list all folders from your WebDav host that stay EXACTLY
under the remote_base_path. No subfolders are considered.
Args:
remote_base_path (str) : Folder on host for which the folders should
be listed
Returns:
type: list[str]
list of all folders who stay under the parent folder
Raises:
OSError
Examples:
Example usage of the method:
>>> hostname = "https://yourhost.de/webdav"
>>> username = "Schlawiner23"
>>> password = "YOUR_PERSONAL_TOKEN"
>>> remote_base_path = "MyProjectFolder"
>>> webdevclient = WebDavClient(args.hostname, args.username, args.password)
>>> webdevclient.list_folders(remote_base_path)
"""
headers = {"Content-Type": "application/xml", "Depth": "1"}
url: str = os.path.join(self.hostname, remote_base_path)
# as we communicate we do not want WINDWOS \ as os.sep!
url = url.replace(os.sep, "/")
response = requests.request("PROPFIND", url, auth=self.auth, headers=headers)
if response.status_code != 207:
error_message = "Failed to list directory contents: "
logger.error("{} {}", error_message, response.status_code)
raise OSError(f"{error_message}{response.status_code}")
tree = ElementTree.fromstring(response.content)
folders = []
for response in tree.findall("{DAV:}response"):
href = response.find("{DAV:}href").text
folder = os.path.basename(os.path.normpath(href))
is_folder = href.endswith("/")
is_not_remote_base_path = (
href != url + "/"
) and folder != remote_base_path.split("/")[-1]
is_hidden_folder = folder.startswith(".")
if is_folder and is_not_remote_base_path and not is_hidden_folder:
logger.debug(
"Found folder: {} in the parent folder: {}",
folder,
remote_base_path,
)
folders.append(folder)
return folders
[docs]
def filter_after_global_base_path(self, path: str, remote_base_path: str) -> str:
"""
This method removes the hostname and the remote_base_path from an path
Args:
path (str) : Url to host, e.g. https://host.org
remote_base_path (str): single folder on remote host e.g. data
Returns:
type: str
Raises:
None directly
Example: host-url= https://host.org/
remote_base_path = data
path = https://host.org/data/example1
"example1" ist returned
"""
search_str = "/" + remote_base_path + "/"
if search_str in path:
logger.debug(
"Found folder {} for path {} and remote base path: {}",
search_str,
path,
remote_base_path,
)
url_after_removing_everything_before_and_including_remote_base_name = (
path.split(search_str, 1)[1]
)
logger.info(
"Folder structure everything after the remote_base_path is: {}",
url_after_removing_everything_before_and_including_remote_base_name,
)
return url_after_removing_everything_before_and_including_remote_base_name
logger.error("Could not find search string: {} in path: {}", search_str, path)
return path
[docs]
def ensure_folder_exists(self, path: str) -> None:
"""
This method ensures that the given folder will exist
Args:
path (str) : Path to folder.
Returns:
type: None
Raises:
None directly
"""
if not os.path.exists(path):
os.makedirs(path)
logger.debug("Folder created: {}", path)
else:
logger.debug("Folder already exists: {}", path)
[docs]
def get_sub_path(self, full_path: str, initial_part: str) -> str:
"""
Returns the sub-path after the initial part of the path.
Args:
full_path (str): The full path string. Does NOT have host url within
initial_part (str): The initial part of the path string to be removed.
Returns:
type: str: The sub-path string after the initial part.
Example 1:
full_path = /data/subfolder1/text.txt
initial_part (str) = data
returns ==> subfolder1/text.txt
Raises:
ValueError
"""
# Decode URL-encoded parts of the path
logger.debug("We are in the 'get_sub_path' method.")
decoded_full_path = urllib.parse.unquote(full_path)
logger.debug("The decoded full file path is: {}", decoded_full_path)
decoded_initial_part = urllib.parse.unquote(initial_part)
logger.debug("The decoded initial file path is: {}", decoded_initial_part)
# Ensure the initial part ends with a slash
# removes weird edge cases for later processing
if not decoded_initial_part.endswith("/"):
decoded_initial_part += "/"
# Find the position where the initial part ends in the full path
start_idx = decoded_full_path.find(decoded_initial_part)
if start_idx == -1:
logger.error(
"The {} string is not in the full_path={}", initial_part, full_path
)
raise ValueError("The full path does not contain the initial part.")
# Handle the edge case where the full path is exactly the initial part
decoded_initial_part = "/" + decoded_initial_part
if full_path == decoded_initial_part.rstrip("/"):
logger.debug("The get_sub_path() method returns empty string")
return ""
# Remove the initial part from the full path
if full_path.startswith(initial_part):
logger.debug(
"The get_sub_path() method returns {}", full_path[len(initial_part) :]
)
return full_path[len(initial_part) :]
else:
# Calculate the start index of the sub-path
sub_path_start_idx = start_idx + len(decoded_initial_part) - 1
# Extract the sub-path
sub_path = decoded_full_path[sub_path_start_idx:]
logger.debug("The get_sub_path() method returns {}", sub_path)
return sub_path
[docs]
def download_files(
self,
global_remote_base_path: str,
remote_base_path: str,
local_base_path: str,
) -> None:
"""
This method downloads all files from your WebDav host that stay EXACTLY
under the remote_base_path. No subfolders are considered.
Args:
remote_base_path (str): Folder on host which should be primary source
for downloading files
local_base_path (str) : all files (with preserved folder structures)
are put inside this local path
Returns:
type: None
Raises:
None directly
Examples:
Example usage of the method:
>>> hostname = "https://yourhost.de/webdav"
>>> username = "Schlawiner23"
>>> password = "YOUR_PERSONAL_TOKEN"
>>> remote_base_path = "MyProjectFolder"
>>> local_base_path = "assets"
>>> auth: HTTPBasicAuth = HTTPBasicAuth(username, password)
>>> download_files(hostname, auth, current_remote_path, local_base_path)
"""
if not os.path.exists(local_base_path):
logger.info("Dir {} will be created", local_base_path)
os.makedirs(local_base_path)
url = os.path.join(self.hostname, remote_base_path)
# as we communicate we do not want WINDWOS \ as os.sep!
url = url.replace(os.sep, "/")
files_on_host = self.list_files(url)
if len(files_on_host) == 0:
logger.info("Found no files on remote_base_path: {}", remote_base_path)
for file_path in files_on_host:
logger.info("Found the file: {} on current remote_base_path", file_path)
file_name = self.filter_after_global_base_path(file_path, remote_base_path)
logger.info("The pure of filename of this file is: {}", file_name)
# Decoding the URL-encoded string
decoded_filename = urllib.parse.unquote(file_name)
logger.info("The decoded filename version is: {}", decoded_filename)
remote_file_url = os.path.join(
self.hostname, remote_base_path, file_name # file_path.split("/")[-1]
)
remote_file_url = remote_file_url.replace(os.sep, "/")
logger.info("The remote file url is: {}", remote_file_url)
sub_path = self.get_sub_path(file_path, global_remote_base_path)
if sub_path.endswith(decoded_filename):
sub_path = sub_path[: len(sub_path) - len(decoded_filename)]
if sub_path == decoded_filename:
sub_path = ""
logger.debug("The current sub path is: {}", sub_path)
local_file_path = os.path.join(local_base_path, sub_path, decoded_filename)
local_file_path = local_file_path.replace(os.sep, "/")
logger.debug(
"The current file that is stored has the full path: {}", local_file_path
)
response = requests.get(remote_file_url, auth=self.auth, stream=True)
if response.status_code == 200:
folder_path = os.path.dirname(local_file_path)
self.ensure_folder_exists(folder_path)
with open(local_file_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
else:
logger.debug(
"Failed to download {}: {}", remote_file_url, response.status_code
)
[docs]
def download_all_files_iterative(
self,
remote_base_path: str,
local_base_path: str,
) -> None:
"""
This method downloads all files from your WebDav host that stay
under the remote_base_path. All subfolders will also be downloaded
and folder structure is preserved
Args:
remote_base_path (str): Folder on host which should be primary source
for downloading files
local_base_path (str) : all files (with preserved folder structures)
are put inside this local path
Returns:
type: None
Raises:
None directly
Examples:
Example usage of the method:
>>> hostname = "https://yourhost.de/webdav"
>>> username = "Schlawiner23"
>>> password = "YOUR_PERSONAL_TOKEN"
>>> remote_base_path = "MyProjectFolder"
>>> local_base_path = "assets"
>>> webdevclient = WebDavClient(args.hostname, args.username, args.password)
>>> webdevclient.download_all_files_iterative(
>>> args.remote_base_path, args.local_base_path
>>> )
"""
# Initialize the stack with the root directory
stack: list[str] = [remote_base_path]
global_remote_base_path: str = remote_base_path
while stack:
current_remote_path: str = stack.pop()
logger.debug("Current remote path is: {}", current_remote_path)
# Download files in the current directory
self.download_files(
global_remote_base_path,
current_remote_path,
local_base_path,
)
# List all folders in the current remote path
folders: list[str] = self.list_folders(current_remote_path)
if len(folders) == 0:
logger.info(
"Found no subfolders for current folder: {}", current_remote_path
)
# Add each subfolder to the stack
for folder in folders:
logger.info(
"Found subfolder {} for current folder: {}.",
folder,
current_remote_path,
)
relative_folder_path: str = os.path.join(current_remote_path, folder)
relative_folder_path = relative_folder_path.replace(os.sep, "/")
stack.append(relative_folder_path)