Source code for thinftp.fileman

"""
File handling utilities for thinFTP

This module defines the `FileHandler` class used by the thinFTP server to 
manage filesystem operations securely within a defined root directory. It 
ensures safe access to files and directories and provides support for FTP-style 
commands such as directory navigation, file listing, reading/writing files, 
deletion, and renaming.

Classes:
    FileHandler: Encapsulates all file operations, enforcing confinement to 
    the FTP server's root directory. Raises appropriate exceptions for invalid 
    actions.

Exceptions:
    FileHandlerError: Raised for invalid file-related operations such as 
    attempting to get the size of or delete a non-file.

Dependencies:
    - pathlib.Path: For file path resolution and operations.
    - stat: For interpreting file permission modes.
    - time: For formatting file modification times.
    - .errors.FileHandlerError: Custom exception used in this module.
"""

import stat
import time
from pathlib import Path
from .errors import FileHandlerError

[docs] class FileHandler: """ Custom File Handler class for thinFTP. Handles file operations such as navigating directories, listing contents, reading and writing files, and renaming or deleting files and directories. """
[docs] def __init__(self, root_dir): """ Initialize the FileHandler with a root directory. Parameters: root_dir (str): The root directory for file operations. """ self.root_dir = Path(root_dir).resolve() self.cur_dir = self.root_dir self.ren_old = None
[docs] def resolve_path(self, path): """ Resolve a given path to an absolute path within the root directory. Parameters: path (str): The path to resolve. Returns: Path: An absolute Path object. """ if path.startswith('/') or path.startswith('\\'): return (self.root_dir / path.lstrip('/\\')).resolve() return (self.cur_dir / path).resolve()
[docs] def pwd(self): """ Get the current working directory relative to the root directory. Returns: str: The current directory as a string. """ if self.cur_dir == self.root_dir: return '/' return '/' + self.cur_dir.relative_to(self.root_dir).as_posix()
[docs] def get_abs(self, path): """ Get the absolute path of a given path relative to the root directory. Parameters: path (str): The path to resolve. Returns: str: The absolute path as a string. """ new_dir = self.resolve_path(path) return '/' + new_dir.relative_to(self.root_dir).as_posix()
[docs] def cwd(self, path): """ Change the current working directory. Parameters: path (str): The directory to change to. Raises: FileNotFoundError: If the directory does not exist. NotADirectoryError: If the path is not a directory. PermissionError: If attempting to move outside the root directory. """ new_path = self.resolve_path(path) if new_path.exists(): if new_path.is_dir(): if not new_path.is_relative_to(self.root_dir): raise PermissionError('Attempt to move behind root directory') self.cur_dir = new_path else: raise NotADirectoryError else: raise FileNotFoundError
[docs] def cd_up(self): """ Change to the parent directory. Raises: FileNotFoundError: If the parent directory does not exist. PermissionError: If attempting to move outside the root directory. """ par_dir = self.cur_dir.parent.resolve() if par_dir.exists(): if not par_dir.absolute().is_relative_to(self.root_dir): raise PermissionError('Attempt to move behind root directory') self.cur_dir = par_dir else: raise FileNotFoundError
[docs] def mkdir(self, path): """ Create a new directory. Parameters: path (str): Path of directory to create. """ self.resolve_path(path).mkdir(parents=True)
[docs] def name_ls(self, path): """ List the names of entries in a directory. Parameters: path (str): The directory to list. Returns: list: A list of Path objects representing entries in the directory. Raises: PermissionError: If attempting to move outside the root directory. """ target_dir = self.resolve_path(path) if target_dir.exists(): if not target_dir.is_relative_to(self.root_dir): raise PermissionError('Attempt to move behind root directory') matches = target_dir.iterdir() if target_dir.is_dir() else [target_dir] else: matches = target_dir.glob(path) matches = list(matches) for i in range(len(matches)): try: entry = matches[i].resolve() if not entry.is_relative_to(self.root_dir): matches.pop(i) except FileNotFoundError: continue return matches
[docs] def ls(self, path): """ List the contents of the specified directory. Parameters: path (str): The directory to list. Returns: list: A list of strings representing the contents of the directory, in the format: `<permissions> <nlinks> <owner> <group> <size> <modtime> <name>` If the directory does not exist, an empty list is returned. """ matches = self.name_ls(path) if not matches: return [] lines = [] for entry in sorted(matches): stats = entry.stat() perms = stat.filemode(stats.st_mode) size = stats.st_size mtime = time.strftime("%b %d %H:%M", time.localtime(stats.st_mtime)) lines.append(f"{perms} 1 user group {size:>8} {mtime} {entry.name}") return lines
[docs] def read(self, fname, type): """ Read a file in chunks. Parameters: fname (str): The file to read. type (str): The transfer type ('A' for ASCII, 'I' for binary). Yields: bytes: Chunks of the file content. Raises: FileNotFoundError: If the file does not exist. PermissionError: If attempting to move outside the root directory. """ mode = 'rb' if type == 'I' else 'r' path = self.resolve_path(fname) with open(path, mode) as f: while True: chunk = f.read(8192) if not chunk: break if mode == 'r': yield chunk.replace('\n', '\r\n').encode('ascii') else: yield chunk
[docs] def size(self, fname): """ Get the size of a file. Parameters: fname (str): The file to check. Returns: int: The size of the file in bytes. Raises: PermissionError: If attempting to move outside the root directory. FileHandlerError: If the path is not a file. """ path = self.resolve_path(fname) if not path.is_relative_to(self.root_dir): raise PermissionError('Attempt to move behind root directory') if not path.is_file(): raise FileHandlerError(f'Not a file: {fname!r}') return path.stat().st_size
[docs] def delete(self, fname): """ Delete a file. Parameters: fname (str): The file to delete. Raises: PermissionError: If attempting to move outside the root directory. FileHandlerError: If the path is not a file. """ path = self.resolve_path(fname) if not path.is_relative_to(self.root_dir): raise PermissionError('Attempt to move behind root directory') if not path.is_file(): raise FileHandlerError(f'Not a file: {fname!r}') path.unlink()
[docs] def rmdir(self, path): """ Remove a directory. Parameters: path (str): The directory to remove. Raises: FileNotFoundError: If the directory does not exist. NotADirectoryError: If the path is not a directory. PermissionError: If attempting to move outside the root directory. """ path = self.resolve_path(path) if path.exists(): if path.is_dir(): if not path.is_relative_to(self.root_dir): raise PermissionError('Attempt to move behind root directory') path.rmdir() return raise NotADirectoryError raise FileNotFoundError
[docs] def rename_from(self, old): """ Mark a file or directory for renaming. Parameters: old (str): Existing file/directory name. Raises: FileNotFoundError: If the original path does not exist. PermissionError: If attempting to move outside the root directory. """ self.ren_old = self.resolve_path(old) if not self.ren_old.is_relative_to(self.root_dir): self.ren_old = None raise PermissionError('Attempt to move behind root directory') if not self.ren_old.exists(): self.ren_old = None raise FileNotFoundError
[docs] def rename_to(self, new): """ Rename a previously marked file or directory. Parameters: new (str): New file/directory name. """ new = self.resolve_path(new) self.ren_old.rename(new) self.ren_old = None
[docs] def write(self, fname, data): """ Write data to a file. Parameters: fname (str): The file to write to. data (iterable): An iterable of bytes to write. Raises: PermissionError: If attempting to move outside the root directory. """ path = self.resolve_path(fname) if not path.is_relative_to(self.root_dir): raise PermissionError('Attempt to move behind root directory') with open(path, 'wb') as f: for chunk in data: f.write(chunk)