file-manager/
├── core/
│ ├── __init__.py
│ ├── file_operations.py # 文件操作核心
│ ├── tree_generator.py # 目录树生成
│ ├── batch_processor.py # 批量处理
│ └── metadata_manager.py # 元数据管理
├── utils/
│ ├── formatters.py # 格式转换
│ ├── validators.py # 验证器
│ └── exceptions.py # 自定义异常
├── cli/
│ └── interface.py # 命令行接口
├── tests/
└── main.py
class FileManager:
"""主管理类"""
def __init__(self, root_path):
self.root = Path(root_path).resolve()
self.file_ops = FileOperations()
self.tree_gen = TreeGenerator()
self.batch_proc = BatchProcessor()
def analyze_structure(self, max_depth=None):
"""分析目录结构"""
pass
def export_structure(self, output_format='txt'):
"""导出结构"""
pass
import os
from pathlib import Path
from typing import List, Dict, Optional, Generator
import json
import yaml
from datetime import datetime
class TreeGenerator:
"""智能目录树生成器"""
def __init__(self, show_hidden=False, max_depth=10):
self.show_hidden = show_hidden
self.max_depth = max_depth
self.exclude_patterns = ['.git', '__pycache__', '.DS_Store']
def generate(
self,
path: Path,
prefix: str = "",
depth: int = 0,
is_last: bool = True
) -> Generator[str, None, None]:
"""生成目录树的可视化表示"""
if depth > self.max_depth:
return
# 当前项目符号
connector = "└── " if is_last else "├── "
yield prefix + connector + path.name
if path.is_dir():
# 获取目录内容
try:
children = sorted([
p for p in path.iterdir()
if self._should_include(p)
], key=lambda x: (not x.is_dir(), x.name.lower()))
except PermissionError:
yield prefix + " [权限拒绝]"
return
# 新的前缀
new_prefix = prefix + (" " if is_last else "│ ")
# 递归处理子项
for i, child in enumerate(children):
is_last_child = (i == len(children) - 1)
yield from self.generate(
child,
new_prefix,
depth + 1,
is_last_child
)
def generate_statistics(self, path: Path) -> Dict:
"""生成目录统计信息"""
stats = {
'total_files': 0,
'total_dirs': 0,
'size_bytes': 0,
'by_extension': {},
'by_type': {'text': 0, 'binary': 0, 'other': 0},
'modified_timeline': []
}
for root, dirs, files in os.walk(path):
stats['total_dirs'] += len(dirs)
for file in files:
filepath = Path(root) / file
if self._should_include(filepath):
stats['total_files'] += 1
# 文件大小
try:
size = filepath.stat().st_size
stats['size_bytes'] += size
except OSError:
continue
# 按扩展名统计
ext = filepath.suffix.lower()
if ext:
stats['by_extension'][ext] = \
stats['by_extension'].get(ext, 0) + 1
# 按文件类型统计
if self._is_text_file(filepath):
stats['by_type']['text'] += 1
elif self._is_binary_file(filepath):
stats['by_type']['binary'] += 1
else:
stats['by_type']['other'] += 1
return stats
def _should_include(self, path: Path) -> bool:
"""判断是否应该包含该路径"""
name = path.name
if not self.show_hidden and name.startswith('.'):
return False
if any(pattern in str(path) for pattern in self.exclude_patterns):
return False
return True
def _is_text_file(self, path: Path) -> bool:
"""判断是否为文本文件"""
text_extensions = {'.txt', '.py', '.js', '.html', '.css',
'.json', '.xml', '.md', '.csv'}
return path.suffix.lower() in text_extensions
def _is_binary_file(self, path: Path) -> bool:
"""判断是否为二进制文件"""
binary_extensions = {'.exe', '.dll', '.so', '.dylib',
'.jpg', '.png', '.pdf', '.zip'}
return path.suffix.lower() in binary_extensions
import shutil
import hashlib
import filecmp
from typing import Tuple, Set
class FileOperations:
"""高级文件操作管理器"""
@staticmethod
def smart_copy(src: Path, dst: Path, overwrite: bool = False) -> bool:
"""智能复制文件/目录"""
try:
if src.is_file():
if dst.exists() and not overwrite:
# 检查是否需要覆盖
if FileOperations._files_equal(src, dst):
return False # 文件相同,跳过
# 添加备份
backup = dst.with_suffix(dst.suffix + '.bak')
shutil.copy2(dst, backup)
shutil.copy2(src, dst)
return True
elif src.is_dir():
shutil.copytree(src, dst, dirs_exist_ok=True)
return True
except Exception as e:
print(f"复制失败 {src} -> {dst}: {e}")
return False
@staticmethod
def find_duplicates(directory: Path) -> Dict[str, List[Path]]:
"""查找重复文件"""
hashes = {}
for filepath in directory.rglob('*'):
if filepath.is_file():
try:
file_hash = FileOperations._calculate_hash(filepath)
hashes.setdefault(file_hash, []).append(filepath)
except IOError:
continue
# 返回有重复的文件
return {h: paths for h, paths in hashes.items() if len(paths) > 1}
@staticmethod
def organize_by_type(directory: Path,
category_map: Dict[str, List[str]] = None) -> None:
"""按类型整理文件"""
if category_map is None:
category_map = {
'Documents': ['.pdf', '.doc', '.docx', '.txt', '.md'],
'Images': ['.jpg', '.jpeg', '.png', '.gif', '.bmp'],
'Videos': ['.mp4', '.avi', '.mov', '.mkv'],
'Code': ['.py', '.js', '.java', '.cpp', '.html', '.css'],
'Archives': ['.zip', '.rar', '.tar', '.gz'],
}
for filepath in directory.iterdir():
if filepath.is_file():
ext = filepath.suffix.lower()
for category, extensions in category_map.items():
if ext in extensions:
target_dir = directory / category
target_dir.mkdir(exist_ok=True)
shutil.move(filepath, target_dir / filepath.name)
break
@staticmethod
def _calculate_hash(filepath: Path, block_size: int = 65536) -> str:
"""计算文件哈希值"""
hasher = hashlib.sha256()
with open(filepath, 'rb') as f:
for block in iter(lambda: f.read(block_size), b''):
hasher.update(block)
return hasher.hexdigest()
@staticmethod
def _files_equal(file1: Path, file2: Path) -> bool:
"""比较两个文件是否相同"""
return filecmp.cmp(file1, file2, shallow=False)
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
class BatchProcessor:
"""批量文件处理器"""
def __init__(self, max_workers: int = 4):
self.max_workers = max_workers
def batch_rename(
self,
directory: Path,
pattern: str,
replacement: str,
regex: bool = False,
dry_run: bool = False
) -> List[Tuple[Path, Path]]:
"""批量重命名文件"""
renamed = []
for filepath in directory.iterdir():
if filepath.is_file():
old_name = filepath.name
if regex:
new_name = re.sub(pattern, replacement, old_name)
else:
new_name = old_name.replace(pattern, replacement)
if new_name != old_name:
new_path = filepath.parent / new_name
renamed.append((filepath, new_path))
if not dry_run:
try:
filepath.rename(new_path)
except Exception as e:
print(f"重命名失败 {filepath}: {e}")
return renamed
def batch_convert(
self,
input_dir: Path,
output_dir: Path,
conversion_func,
input_ext: str,
output_ext: str
) -> Dict[str, int]:
"""批量转换文件"""
results = {'success': 0, 'failed': 0, 'skipped': 0}
output_dir.mkdir(exist_ok=True)
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {}
for input_file in input_dir.rglob(f'*{input_ext}'):
if input_file.is_file():
output_file = output_dir / \
input_file.with_suffix(output_ext).name
future = executor.submit(
self._safe_convert,
conversion_func,
input_file,
output_file
)
futures[future] = input_file
for future in as_completed(futures):
input_file = futures[future]
try:
success = future.result()
if success:
results['success'] += 1
else:
results['failed'] += 1
except Exception as e:
print(f"转换失败 {input_file}: {e}")
results['failed'] += 1
return results
def _safe_convert(self, func, input_file: Path, output_file: Path) -> bool:
"""安全的文件转换"""
try:
return func(input_file, output_file)
except Exception as e:
print(f"转换错误 {input_file}: {e}")
return False
import exifread
from PIL import Image
import mimetypes
from dataclasses import dataclass
from typing import Optional
@dataclass
class FileMetadata:
"""文件元数据"""
path: Path
size: int
created: datetime
modified: datetime
mime_type: Optional[str] = None
image_info: Optional[Dict] = None
exif_data: Optional[Dict] = None
class MetadataManager:
"""文件元数据管理器"""
@staticmethod
def extract_metadata(filepath: Path) -> FileMetadata:
"""提取文件元数据"""
stat = filepath.stat()
metadata = FileMetadata(
path=filepath,
size=stat.st_size,
created=datetime.fromtimestamp(stat.st_ctime),
modified=datetime.fromtimestamp(stat.st_mtime),
mime_type=mimetypes.guess_type(str(filepath))[0]
)
# 图片文件提取EXIF
if metadata.mime_type and metadata.mime_type.startswith('image/'):
metadata.image_info = MetadataManager._extract_image_info(filepath)
metadata.exif_data = MetadataManager._extract_exif(filepath)
return metadata
@staticmethod
def _extract_image_info(filepath: Path) -> Optional[Dict]:
"""提取图片信息"""
try:
with Image.open(filepath) as img:
return {
'format': img.format,
'size': img.size,
'mode': img.mode,
'info': img.info
}
except Exception:
return None
@staticmethod
def _extract_exif(filepath: Path) -> Optional[Dict]:
"""提取EXIF数据"""
try:
with open(filepath, 'rb') as f:
tags = exifread.process_file(f)
return {
str(tag): str(value)
for tag, value in tags.items()
if not tag.startswith('Thumbnail')
}
except Exception:
return None
class SmartSearcher:
"""智能文件搜索器"""
def __init__(self, index_dir: Optional[Path] = None):
self.index = {}
if index_dir:
self.build_index(index_dir)
def build_index(self, directory: Path) -> None:
"""构建文件索引"""
for filepath in directory.rglob('*'):
if filepath.is_file():
try:
metadata = MetadataManager.extract_metadata(filepath)
content_preview = self._extract_preview(filepath)
self.index[str(filepath)] = {
'metadata': metadata,
'preview': content_preview,
'keywords': self._extract_keywords(filepath, content_preview)
}
except Exception as e:
print(f"索引失败 {filepath}: {e}")
def search(self, query: str,
search_content: bool = False,
file_type: Optional[str] = None) -> List[Path]:
"""搜索文件"""
results = []
query_lower = query.lower()
for filepath_str, data in self.index.items():
filepath = Path(filepath_str)
# 文件名搜索
if query_lower in filepath.name.lower():
results.append(filepath)
continue
# 内容搜索
if search_content and query_lower in data['preview'].lower():
results.append(filepath)
continue
# 关键词搜索
if any(query_lower in keyword.lower()
for keyword in data['keywords']):
results.append(filepath)
# 按文件类型过滤
if file_type:
results = [
r for r in results
if r.suffix.lower() == f'.{file_type.lower()}'
]
return results
def _extract_preview(self, filepath: Path,
max_lines: int = 10) -> str:
"""提取文件内容预览"""
try:
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
lines = []
for i, line in enumerate(f):
if i >= max_lines:
break
lines.append(line.strip())
return ' '.join(lines)
except UnicodeDecodeError:
return "[二进制文件]"
except Exception:
return ""
def _extract_keywords(self, filepath: Path,
content: str) -> List[str]:
"""提取关键词"""
keywords = []
# 从文件名提取
keywords.extend(filepath.stem.split('_'))
keywords.extend(filepath.stem.split('-'))
# 从内容提取(简单实现)
words = re.findall(r'\b\w{4,}\b', content)
keywords.extend(words[:20]) # 限制数量
return list(set(keywords))
import argparse
import sys
from rich.console import Console
from rich.tree import Tree
from rich import print as rprint
class CLIInterface:
"""命令行界面"""
def __init__(self):
self.console = Console()
self.manager = None
def run(self):
"""运行CLI"""
parser = argparse.ArgumentParser(
description='高级文件结构管理工具',
formatter_class=argparse.RawDescriptionHelpFormatter
)
subparsers = parser.add_subparsers(dest='command', help='命令')
# 树状图命令
tree_parser = subparsers.add_parser('tree', help='显示目录树')
tree_parser.add_argument('path', help='目录路径')
tree_parser.add_argument('--depth', type=int, default=3,
help='最大深度')
tree_parser.add_argument('--export', help='导出格式')
# 搜索命令
search_parser = subparsers.add_parser('search', help='搜索文件')
search_parser.add_argument('query', help='搜索词')
search_parser.add_argument('--path', default='.',
help='搜索路径')
search_parser.add_argument('--type', help='文件类型')
# 整理命令
organize_parser = subparsers.add_parser('organize',
help='整理文件')
organize_parser.add_argument('path', help='目录路径')
args = parser.parse_args()
if not args.command:
parser.print_help()
return
self._handle_command(args)
def _handle_command(self, args):
"""处理命令"""
if args.command == 'tree':
self._show_tree(Path(args.path), args.depth)
elif args.command == 'search':
self._search_files(args.query, Path(args.path), args.type)
elif args.command == 'organize':
self._organize_files(Path(args.path))
def _show_tree(self, path: Path, depth: int):
"""显示目录树"""
tree_gen = TreeGenerator(max_depth=depth)
if not path.exists():
self.console.print(f"[red]路径不存在: {path}[/red]")
return
self.console.print(f"\n[bold cyan]目录树: {path}[/bold cyan]\n")
# 使用rich显示彩色树
rich_tree = Tree(f"[bold]{path.name}[/bold]")
self._build_rich_tree(path, rich_tree, depth, tree_gen)
self.console.print(rich_tree)
# 显示统计信息
stats = tree_gen.generate_statistics(path)
self.console.print(f"\n[bold yellow]统计信息:[/bold yellow]")
self.console.print(f"文件数: {stats['total_files']}")
self.console.print(f"目录数: {stats['total_dirs']}")
self.console.print(f"总大小: {self._format_size(stats['size_bytes'])}")
def _build_rich_tree(self, path: Path, tree_node,
depth: int, tree_gen: TreeGenerator):
"""构建rich树"""
if depth <= 0:
return
try:
children = sorted([
p for p in path.iterdir()
if tree_gen._should_include(p)
], key=lambda x: (not x.is_dir(), x.name.lower()))
except PermissionError:
tree_node.add("[grey]权限拒绝[/grey]")
return
for child in children:
if child.is_dir():
child_node = tree_node.add(f"[blue]{child.name}/[/blue]")
self._build_rich_tree(
child, child_node, depth-1, tree_gen
)
else:
tree_node.add(f"[green]{child.name}[/green]")
def _format_size(self, size_bytes: int) -> str:
"""格式化文件大小"""
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024.0:
return f"{size_bytes:.2f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.2f} TB"
class FileSynchronizer:
"""文件同步器"""
@staticmethod
def sync_directories(source: Path, target: Path,
bidirectional: bool = False) -> Dict:
"""同步两个目录"""
actions = {'copied': [], 'deleted': [], 'updated': []}
# 单向同步:源 -> 目标
FileSynchronizer._sync_one_way(source, target, actions)
if bidirectional:
FileSynchronizer._sync_one_way(target, source, actions)
return actions
@staticmethod
def _sync_one_way(source: Path, target: Path, actions: Dict):
"""单向同步"""
# 确保目标目录存在
target.mkdir(parents=True, exist_ok=True)
# 同步文件
for item in source.iterdir():
target_item = target / item.name
if item.is_dir():
if not target_item.exists():
shutil.copytree(item, target_item)
actions['copied'].append(str(item))
else:
# 递归同步子目录
sub_actions = FileSynchronizer._sync_one_way(
item, target_item, actions
)
else:
if not target_item.exists():
shutil.copy2(item, target_item)
actions['copied'].append(str(item))
elif item.stat().st_mtime > target_item.stat().st_mtime:
shutil.copy2(item, target_item)
actions['updated'].append(str(item))
import configparser
import tomllib
import yaml
class ConfigManager:
"""配置文件管理器"""
@staticmethod
def load_config(filepath: Path) -> Dict:
"""加载配置文件"""
suffix = filepath.suffix.lower()
if suffix == '.json':
with open(filepath, 'r') as f:
return json.load(f)
elif suffix in ['.yaml', '.yml']:
with open(filepath, 'r') as f:
return yaml.safe_load(f)
elif suffix == '.toml':
with open(filepath, 'rb') as f:
return tomllib.load(f)
elif suffix in ['.ini', '.cfg']:
config = configparser.ConfigParser()
config.read(filepath)
return {
section: dict(config.items(section))
for section in config.sections()
}
else:
raise ValueError(f"不支持的配置文件格式: {suffix}")
def main():
"""主函数"""
import sys
if len(sys.argv) > 1:
# 命令行模式
cli = CLIInterface()
cli.run()
else:
# 交互式模式
manager = FileManager('.')
# 显示目录树
print("目录结构:")
for line in manager.tree_gen.generate(Path('.')):
print(line)
# 分析统计
stats = manager.tree_gen.generate_statistics(Path('.'))
print(f"\n文件统计: {stats['total_files']} 个文件")
# 查找重复文件
duplicates = manager.file_ops.find_duplicates(Path('.'))
if duplicates:
print(f"\n找到 {len(duplicates)} 组重复文件")
# 整理文件
manager.file_ops.organize_by_type(Path('./downloads'))
if __name__ == "__main__":
main()
这个工具涵盖了文件管理的核心功能,并提供了良好的扩展性。可以根据具体需求添加更多功能,如: