欢迎光临石四片叶子网
详情描述

一、核心架构设计

1.1 项目结构

file-manager/
├── core/
│   ├── __init__.py
│   ├── file_operations.py    # 文件操作核心
│   ├── tree_generator.py    # 目录树生成
│   ├── batch_processor.py   # 批量处理
│   └── metadata_manager.py  # 元数据管理
├── utils/
│   ├── formatters.py        # 格式转换
│   ├── validators.py        # 验证器
│   └── exceptions.py        # 自定义异常
├── cli/
│   └── interface.py         # 命令行接口
├── tests/
└── main.py

1.2 核心类设计

class FileManager:
    """主管理类"""
    def __init__(self, root_path):
        self.root = Path(root_path).resolve()
        self.file_ops = FileOperations()
        self.tree_gen = TreeGenerator()
        self.batch_proc = BatchProcessor()

    def analyze_structure(self, max_depth=None):
        """分析目录结构"""
        pass

    def export_structure(self, output_format='txt'):
        """导出结构"""
        pass

二、核心模块实现

2.1 目录树生成器

import os
from pathlib import Path
from typing import List, Dict, Optional, Generator
import json
import yaml
from datetime import datetime

class TreeGenerator:
    """智能目录树生成器"""

    def __init__(self, show_hidden=False, max_depth=10):
        self.show_hidden = show_hidden
        self.max_depth = max_depth
        self.exclude_patterns = ['.git', '__pycache__', '.DS_Store']

    def generate(
        self, 
        path: Path, 
        prefix: str = "",
        depth: int = 0,
        is_last: bool = True
    ) -> Generator[str, None, None]:
        """生成目录树的可视化表示"""
        if depth > self.max_depth:
            return

        # 当前项目符号
        connector = "└── " if is_last else "├── "
        yield prefix + connector + path.name

        if path.is_dir():
            # 获取目录内容
            try:
                children = sorted([
                    p for p in path.iterdir() 
                    if self._should_include(p)
                ], key=lambda x: (not x.is_dir(), x.name.lower()))
            except PermissionError:
                yield prefix + "    [权限拒绝]"
                return

            # 新的前缀
            new_prefix = prefix + ("    " if is_last else "│   ")

            # 递归处理子项
            for i, child in enumerate(children):
                is_last_child = (i == len(children) - 1)
                yield from self.generate(
                    child, 
                    new_prefix, 
                    depth + 1, 
                    is_last_child
                )

    def generate_statistics(self, path: Path) -> Dict:
        """生成目录统计信息"""
        stats = {
            'total_files': 0,
            'total_dirs': 0,
            'size_bytes': 0,
            'by_extension': {},
            'by_type': {'text': 0, 'binary': 0, 'other': 0},
            'modified_timeline': []
        }

        for root, dirs, files in os.walk(path):
            stats['total_dirs'] += len(dirs)
            for file in files:
                filepath = Path(root) / file
                if self._should_include(filepath):
                    stats['total_files'] += 1

                    # 文件大小
                    try:
                        size = filepath.stat().st_size
                        stats['size_bytes'] += size
                    except OSError:
                        continue

                    # 按扩展名统计
                    ext = filepath.suffix.lower()
                    if ext:
                        stats['by_extension'][ext] = \
                            stats['by_extension'].get(ext, 0) + 1

                    # 按文件类型统计
                    if self._is_text_file(filepath):
                        stats['by_type']['text'] += 1
                    elif self._is_binary_file(filepath):
                        stats['by_type']['binary'] += 1
                    else:
                        stats['by_type']['other'] += 1

        return stats

    def _should_include(self, path: Path) -> bool:
        """判断是否应该包含该路径"""
        name = path.name
        if not self.show_hidden and name.startswith('.'):
            return False
        if any(pattern in str(path) for pattern in self.exclude_patterns):
            return False
        return True

    def _is_text_file(self, path: Path) -> bool:
        """判断是否为文本文件"""
        text_extensions = {'.txt', '.py', '.js', '.html', '.css', 
                          '.json', '.xml', '.md', '.csv'}
        return path.suffix.lower() in text_extensions

    def _is_binary_file(self, path: Path) -> bool:
        """判断是否为二进制文件"""
        binary_extensions = {'.exe', '.dll', '.so', '.dylib', 
                           '.jpg', '.png', '.pdf', '.zip'}
        return path.suffix.lower() in binary_extensions

2.2 文件操作管理器

import shutil
import hashlib
import filecmp
from typing import Tuple, Set

class FileOperations:
    """高级文件操作管理器"""

    @staticmethod
    def smart_copy(src: Path, dst: Path, overwrite: bool = False) -> bool:
        """智能复制文件/目录"""
        try:
            if src.is_file():
                if dst.exists() and not overwrite:
                    # 检查是否需要覆盖
                    if FileOperations._files_equal(src, dst):
                        return False  # 文件相同,跳过
                    # 添加备份
                    backup = dst.with_suffix(dst.suffix + '.bak')
                    shutil.copy2(dst, backup)

                shutil.copy2(src, dst)
                return True

            elif src.is_dir():
                shutil.copytree(src, dst, dirs_exist_ok=True)
                return True

        except Exception as e:
            print(f"复制失败 {src} -> {dst}: {e}")
            return False

    @staticmethod
    def find_duplicates(directory: Path) -> Dict[str, List[Path]]:
        """查找重复文件"""
        hashes = {}

        for filepath in directory.rglob('*'):
            if filepath.is_file():
                try:
                    file_hash = FileOperations._calculate_hash(filepath)
                    hashes.setdefault(file_hash, []).append(filepath)
                except IOError:
                    continue

        # 返回有重复的文件
        return {h: paths for h, paths in hashes.items() if len(paths) > 1}

    @staticmethod
    def organize_by_type(directory: Path, 
                        category_map: Dict[str, List[str]] = None) -> None:
        """按类型整理文件"""
        if category_map is None:
            category_map = {
                'Documents': ['.pdf', '.doc', '.docx', '.txt', '.md'],
                'Images': ['.jpg', '.jpeg', '.png', '.gif', '.bmp'],
                'Videos': ['.mp4', '.avi', '.mov', '.mkv'],
                'Code': ['.py', '.js', '.java', '.cpp', '.html', '.css'],
                'Archives': ['.zip', '.rar', '.tar', '.gz'],
            }

        for filepath in directory.iterdir():
            if filepath.is_file():
                ext = filepath.suffix.lower()
                for category, extensions in category_map.items():
                    if ext in extensions:
                        target_dir = directory / category
                        target_dir.mkdir(exist_ok=True)
                        shutil.move(filepath, target_dir / filepath.name)
                        break

    @staticmethod
    def _calculate_hash(filepath: Path, block_size: int = 65536) -> str:
        """计算文件哈希值"""
        hasher = hashlib.sha256()
        with open(filepath, 'rb') as f:
            for block in iter(lambda: f.read(block_size), b''):
                hasher.update(block)
        return hasher.hexdigest()

    @staticmethod
    def _files_equal(file1: Path, file2: Path) -> bool:
        """比较两个文件是否相同"""
        return filecmp.cmp(file1, file2, shallow=False)

2.3 批量处理器

import re
from concurrent.futures import ThreadPoolExecutor, as_completed

class BatchProcessor:
    """批量文件处理器"""

    def __init__(self, max_workers: int = 4):
        self.max_workers = max_workers

    def batch_rename(
        self, 
        directory: Path, 
        pattern: str, 
        replacement: str,
        regex: bool = False,
        dry_run: bool = False
    ) -> List[Tuple[Path, Path]]:
        """批量重命名文件"""
        renamed = []

        for filepath in directory.iterdir():
            if filepath.is_file():
                old_name = filepath.name

                if regex:
                    new_name = re.sub(pattern, replacement, old_name)
                else:
                    new_name = old_name.replace(pattern, replacement)

                if new_name != old_name:
                    new_path = filepath.parent / new_name
                    renamed.append((filepath, new_path))

                    if not dry_run:
                        try:
                            filepath.rename(new_path)
                        except Exception as e:
                            print(f"重命名失败 {filepath}: {e}")

        return renamed

    def batch_convert(
        self, 
        input_dir: Path, 
        output_dir: Path,
        conversion_func,
        input_ext: str,
        output_ext: str
    ) -> Dict[str, int]:
        """批量转换文件"""
        results = {'success': 0, 'failed': 0, 'skipped': 0}

        output_dir.mkdir(exist_ok=True)

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = {}

            for input_file in input_dir.rglob(f'*{input_ext}'):
                if input_file.is_file():
                    output_file = output_dir / \
                        input_file.with_suffix(output_ext).name

                    future = executor.submit(
                        self._safe_convert,
                        conversion_func,
                        input_file,
                        output_file
                    )
                    futures[future] = input_file

            for future in as_completed(futures):
                input_file = futures[future]
                try:
                    success = future.result()
                    if success:
                        results['success'] += 1
                    else:
                        results['failed'] += 1
                except Exception as e:
                    print(f"转换失败 {input_file}: {e}")
                    results['failed'] += 1

        return results

    def _safe_convert(self, func, input_file: Path, output_file: Path) -> bool:
        """安全的文件转换"""
        try:
            return func(input_file, output_file)
        except Exception as e:
            print(f"转换错误 {input_file}: {e}")
            return False

2.4 元数据管理器

import exifread
from PIL import Image
import mimetypes
from dataclasses import dataclass
from typing import Optional

@dataclass
class FileMetadata:
    """文件元数据"""
    path: Path
    size: int
    created: datetime
    modified: datetime
    mime_type: Optional[str] = None
    image_info: Optional[Dict] = None
    exif_data: Optional[Dict] = None

class MetadataManager:
    """文件元数据管理器"""

    @staticmethod
    def extract_metadata(filepath: Path) -> FileMetadata:
        """提取文件元数据"""
        stat = filepath.stat()

        metadata = FileMetadata(
            path=filepath,
            size=stat.st_size,
            created=datetime.fromtimestamp(stat.st_ctime),
            modified=datetime.fromtimestamp(stat.st_mtime),
            mime_type=mimetypes.guess_type(str(filepath))[0]
        )

        # 图片文件提取EXIF
        if metadata.mime_type and metadata.mime_type.startswith('image/'):
            metadata.image_info = MetadataManager._extract_image_info(filepath)
            metadata.exif_data = MetadataManager._extract_exif(filepath)

        return metadata

    @staticmethod
    def _extract_image_info(filepath: Path) -> Optional[Dict]:
        """提取图片信息"""
        try:
            with Image.open(filepath) as img:
                return {
                    'format': img.format,
                    'size': img.size,
                    'mode': img.mode,
                    'info': img.info
                }
        except Exception:
            return None

    @staticmethod
    def _extract_exif(filepath: Path) -> Optional[Dict]:
        """提取EXIF数据"""
        try:
            with open(filepath, 'rb') as f:
                tags = exifread.process_file(f)
                return {
                    str(tag): str(value)
                    for tag, value in tags.items()
                    if not tag.startswith('Thumbnail')
                }
        except Exception:
            return None

三、高级功能实现

3.1 智能搜索器

class SmartSearcher:
    """智能文件搜索器"""

    def __init__(self, index_dir: Optional[Path] = None):
        self.index = {}
        if index_dir:
            self.build_index(index_dir)

    def build_index(self, directory: Path) -> None:
        """构建文件索引"""
        for filepath in directory.rglob('*'):
            if filepath.is_file():
                try:
                    metadata = MetadataManager.extract_metadata(filepath)
                    content_preview = self._extract_preview(filepath)

                    self.index[str(filepath)] = {
                        'metadata': metadata,
                        'preview': content_preview,
                        'keywords': self._extract_keywords(filepath, content_preview)
                    }
                except Exception as e:
                    print(f"索引失败 {filepath}: {e}")

    def search(self, query: str, 
               search_content: bool = False,
               file_type: Optional[str] = None) -> List[Path]:
        """搜索文件"""
        results = []
        query_lower = query.lower()

        for filepath_str, data in self.index.items():
            filepath = Path(filepath_str)

            # 文件名搜索
            if query_lower in filepath.name.lower():
                results.append(filepath)
                continue

            # 内容搜索
            if search_content and query_lower in data['preview'].lower():
                results.append(filepath)
                continue

            # 关键词搜索
            if any(query_lower in keyword.lower() 
                   for keyword in data['keywords']):
                results.append(filepath)

        # 按文件类型过滤
        if file_type:
            results = [
                r for r in results 
                if r.suffix.lower() == f'.{file_type.lower()}'
            ]

        return results

    def _extract_preview(self, filepath: Path, 
                        max_lines: int = 10) -> str:
        """提取文件内容预览"""
        try:
            with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
                lines = []
                for i, line in enumerate(f):
                    if i >= max_lines:
                        break
                    lines.append(line.strip())
                return ' '.join(lines)
        except UnicodeDecodeError:
            return "[二进制文件]"
        except Exception:
            return ""

    def _extract_keywords(self, filepath: Path, 
                         content: str) -> List[str]:
        """提取关键词"""
        keywords = []

        # 从文件名提取
        keywords.extend(filepath.stem.split('_'))
        keywords.extend(filepath.stem.split('-'))

        # 从内容提取(简单实现)
        words = re.findall(r'\b\w{4,}\b', content)
        keywords.extend(words[:20])  # 限制数量

        return list(set(keywords))

3.2 命令行界面

import argparse
import sys
from rich.console import Console
from rich.tree import Tree
from rich import print as rprint

class CLIInterface:
    """命令行界面"""

    def __init__(self):
        self.console = Console()
        self.manager = None

    def run(self):
        """运行CLI"""
        parser = argparse.ArgumentParser(
            description='高级文件结构管理工具',
            formatter_class=argparse.RawDescriptionHelpFormatter
        )

        subparsers = parser.add_subparsers(dest='command', help='命令')

        # 树状图命令
        tree_parser = subparsers.add_parser('tree', help='显示目录树')
        tree_parser.add_argument('path', help='目录路径')
        tree_parser.add_argument('--depth', type=int, default=3, 
                               help='最大深度')
        tree_parser.add_argument('--export', help='导出格式')

        # 搜索命令
        search_parser = subparsers.add_parser('search', help='搜索文件')
        search_parser.add_argument('query', help='搜索词')
        search_parser.add_argument('--path', default='.', 
                                 help='搜索路径')
        search_parser.add_argument('--type', help='文件类型')

        # 整理命令
        organize_parser = subparsers.add_parser('organize', 
                                              help='整理文件')
        organize_parser.add_argument('path', help='目录路径')

        args = parser.parse_args()

        if not args.command:
            parser.print_help()
            return

        self._handle_command(args)

    def _handle_command(self, args):
        """处理命令"""
        if args.command == 'tree':
            self._show_tree(Path(args.path), args.depth)
        elif args.command == 'search':
            self._search_files(args.query, Path(args.path), args.type)
        elif args.command == 'organize':
            self._organize_files(Path(args.path))

    def _show_tree(self, path: Path, depth: int):
        """显示目录树"""
        tree_gen = TreeGenerator(max_depth=depth)

        if not path.exists():
            self.console.print(f"[red]路径不存在: {path}[/red]")
            return

        self.console.print(f"\n[bold cyan]目录树: {path}[/bold cyan]\n")

        # 使用rich显示彩色树
        rich_tree = Tree(f"[bold]{path.name}[/bold]")
        self._build_rich_tree(path, rich_tree, depth, tree_gen)
        self.console.print(rich_tree)

        # 显示统计信息
        stats = tree_gen.generate_statistics(path)
        self.console.print(f"\n[bold yellow]统计信息:[/bold yellow]")
        self.console.print(f"文件数: {stats['total_files']}")
        self.console.print(f"目录数: {stats['total_dirs']}")
        self.console.print(f"总大小: {self._format_size(stats['size_bytes'])}")

    def _build_rich_tree(self, path: Path, tree_node, 
                         depth: int, tree_gen: TreeGenerator):
        """构建rich树"""
        if depth <= 0:
            return

        try:
            children = sorted([
                p for p in path.iterdir() 
                if tree_gen._should_include(p)
            ], key=lambda x: (not x.is_dir(), x.name.lower()))
        except PermissionError:
            tree_node.add("[grey]权限拒绝[/grey]")
            return

        for child in children:
            if child.is_dir():
                child_node = tree_node.add(f"[blue]{child.name}/[/blue]")
                self._build_rich_tree(
                    child, child_node, depth-1, tree_gen
                )
            else:
                tree_node.add(f"[green]{child.name}[/green]")

    def _format_size(self, size_bytes: int) -> str:
        """格式化文件大小"""
        for unit in ['B', 'KB', 'MB', 'GB']:
            if size_bytes < 1024.0:
                return f"{size_bytes:.2f} {unit}"
            size_bytes /= 1024.0
        return f"{size_bytes:.2f} TB"

四、扩展功能示例

4.1 文件同步器

class FileSynchronizer:
    """文件同步器"""

    @staticmethod
    def sync_directories(source: Path, target: Path, 
                        bidirectional: bool = False) -> Dict:
        """同步两个目录"""
        actions = {'copied': [], 'deleted': [], 'updated': []}

        # 单向同步:源 -> 目标
        FileSynchronizer._sync_one_way(source, target, actions)

        if bidirectional:
            FileSynchronizer._sync_one_way(target, source, actions)

        return actions

    @staticmethod
    def _sync_one_way(source: Path, target: Path, actions: Dict):
        """单向同步"""
        # 确保目标目录存在
        target.mkdir(parents=True, exist_ok=True)

        # 同步文件
        for item in source.iterdir():
            target_item = target / item.name

            if item.is_dir():
                if not target_item.exists():
                    shutil.copytree(item, target_item)
                    actions['copied'].append(str(item))
                else:
                    # 递归同步子目录
                    sub_actions = FileSynchronizer._sync_one_way(
                        item, target_item, actions
                    )
            else:
                if not target_item.exists():
                    shutil.copy2(item, target_item)
                    actions['copied'].append(str(item))
                elif item.stat().st_mtime > target_item.stat().st_mtime:
                    shutil.copy2(item, target_item)
                    actions['updated'].append(str(item))

4.2 配置文件管理

import configparser
import tomllib
import yaml

class ConfigManager:
    """配置文件管理器"""

    @staticmethod
    def load_config(filepath: Path) -> Dict:
        """加载配置文件"""
        suffix = filepath.suffix.lower()

        if suffix == '.json':
            with open(filepath, 'r') as f:
                return json.load(f)
        elif suffix in ['.yaml', '.yml']:
            with open(filepath, 'r') as f:
                return yaml.safe_load(f)
        elif suffix == '.toml':
            with open(filepath, 'rb') as f:
                return tomllib.load(f)
        elif suffix in ['.ini', '.cfg']:
            config = configparser.ConfigParser()
            config.read(filepath)
            return {
                section: dict(config.items(section))
                for section in config.sections()
            }
        else:
            raise ValueError(f"不支持的配置文件格式: {suffix}")

五、使用示例

def main():
    """主函数"""
    import sys

    if len(sys.argv) > 1:
        # 命令行模式
        cli = CLIInterface()
        cli.run()
    else:
        # 交互式模式
        manager = FileManager('.')

        # 显示目录树
        print("目录结构:")
        for line in manager.tree_gen.generate(Path('.')):
            print(line)

        # 分析统计
        stats = manager.tree_gen.generate_statistics(Path('.'))
        print(f"\n文件统计: {stats['total_files']} 个文件")

        # 查找重复文件
        duplicates = manager.file_ops.find_duplicates(Path('.'))
        if duplicates:
            print(f"\n找到 {len(duplicates)} 组重复文件")

        # 整理文件
        manager.file_ops.organize_by_type(Path('./downloads'))

if __name__ == "__main__":
    main()

六、最佳实践建议

错误处理: 所有文件操作都要有完善的异常处理 性能优化: 对大目录使用异步处理,避免阻塞 日志记录: 记录所有重要操作,便于审计和调试 测试覆盖: 编写单元测试和集成测试 配置化: 将可配置项提取到配置文件中 国际化: 支持多语言(使用gettext) 插件系统: 设计插件架构,方便扩展功能

这个工具涵盖了文件管理的核心功能,并提供了良好的扩展性。可以根据具体需求添加更多功能,如:

  • 文件版本控制
  • 云存储同步
  • 文件加密/解密
  • 智能分类(基于机器学习)
  • 定时任务调度
  • GUI界面(使用PyQt或Tkinter)