文件处理系统

ZIP文件处理和文件夹监控解决方案

SQL文件导入

上传ZIP包中的SQL文件,合并后可一键导入数据库。

文件夹监控

自动监控指定文件夹,检测新文件并根据设置进行处理。

正在获取监控状态...
系统状态
系统信息
  • 版本: 1.0.0
  • 状态: 运行中
  • API状态: 检查中...
快速链接
API接口文档

以下是可用的API接口及其调用方法:

获取系统健康状态信息

请求示例:
fetch('/api/health')
  .then(response => response.json())
  .then(data => console.log(data));
响应示例:
{
  "status": "ok",
  "time": "2023-06-01 12:34:56",
  "version": "1.0.0"
}

获取文件监控的当前状态

请求示例:
fetch('/api/monitor/status')
  .then(response => response.json())
  .then(data => console.log(data));
响应示例:
{
  "success": true,
  "status": {
    "running": true,
    "source": "C:\\path\\to\\source",
    "destination": "C:\\path\\to\\destination"
  }
}

启动文件夹监控功能

参数:
  • source - 监控源文件夹路径
  • destination - 目标文件夹路径
请求示例:
fetch('/api/monitor/start', {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json'
  },
  body: JSON.stringify({
    source: 'C:\\监控源文件夹',
    destination: 'C:\\目标文件夹'
  })
})
.then(response => response.json())
.then(data => console.log(data));
响应示例:
{
  "success": true,
  "message": "文件监控已启动",
  "status": {
    "running": true,
    "source": "C:\\监控源文件夹",
    "destination": "C:\\目标文件夹"
  }
}

停止文件夹监控功能

请求示例:
fetch('/api/monitor/stop', {
  method: 'POST'
})
.then(response => response.json())
.then(data => console.log(data));
响应示例:
{
  "success": true,
  "message": "监控已停止",
  "status": {
    "running": false,
    "source": "",
    "destination": ""
  }
}

上传ZIP压缩包,自动解压包内SQL文件并导入到目标数据库

参数:
  • zipfile - ZIP文件 (必需)
  • db_host - 数据库主机 (可选)
  • db_user - 数据库用户名 (可选)
  • db_password - 数据库密码 (可选)
  • db_name - 数据库名称 (可选)
请求示例:
const formData = new FormData();
                                    import os
                                    import time
                                    import json
                                    import requests
                                    
                                    class ZipMonitor:
                                        def __init__(self, target_folder, api_url, db_config=None, interval=60, record_file="processed_records.json"):
                                            """
                                            初始化ZIP文件监听器
                                            
                                            参数:
                                                target_folder: 要监听的文件夹路径
                                                api_url: 处理ZIP文件的API地址
                                                db_config: 数据库配置参数(字典)
                                                interval: 扫描间隔(秒)
                                                record_file: 记录已处理文件的JSON文件
                                            """
                                            self.target_folder = target_folder
                                            self.api_url = api_url
                                            self.interval = interval
                                            self.record_file = record_file
                                            
                                            # 设置默认数据库配置
                                            self.db_config = {
                                                'db_host': '',
                                                'db_user': '',
                                                'db_password': '',
                                                'db_name': ''
                                            }
                                            
                                            # 如果提供了数据库配置,则更新默认配置
                                            if db_config:
                                                self.db_config.update(db_config)
                                            
                                            # 运行状态标志
                                            self.is_running = False
                                            
                                            # 加载已处理的文件记录
                                            self.processed_files = self.load_processed_records()
                                        
                                        def load_processed_records(self):
                                            """加载已处理的文件记录"""
                                            try:
                                                if os.path.exists(self.record_file):
                                                    with open(self.record_file, 'r', encoding='utf-8') as f:
                                                        records = json.load(f)
                                                        return {record['file_path']: record for record in records}
                                                else:
                                                    return {}
                                            except Exception as e:
                                                print(f"加载记录文件时出错: {str(e)}")
                                                return {}
                                        
                                        def save_processed_records(self):
                                            """保存已处理的文件记录"""
                                            try:
                                                records = list(self.processed_files.values())
                                                with open(self.record_file, 'w', encoding='utf-8') as f:
                                                    json.dump(records, f, ensure_ascii=False, indent=2)
                                            except Exception as e:
                                                print(f"保存记录文件时出错: {str(e)}")
                                        
                                        def get_zip_package_name(self, file_path):
                                            """从ZIP文件路径中提取包名"""
                                            file_name = os.path.basename(file_path)
                                            # 移除.zip扩展名
                                            package_name = os.path.splitext(file_name)[0]
                                            return package_name
                                        
                                        def process_zip_file(self, file_path):
                                            """处理单个ZIP文件"""
                                            # 检查文件是否是ZIP文件
                                            if not file_path.lower().endswith('.zip'):
                                                return False
                                                
                                            # 避免重复处理同一个文件
                                            if file_path in self.processed_files:
                                                return False
                                                
                                            package_name = self.get_zip_package_name(file_path)
                                            print(f"检测到新的ZIP文件: {file_path}")
                                            print(f"包名: {package_name}")
                                            
                                            # 确保文件存在且可访问
                                            if not os.path.exists(file_path) or not os.access(file_path, os.R_OK):
                                                print(f"文件 {file_path} 不存在或无法访问")
                                                return False
                                                
                                            try:
                                                # 上传文件到API
                                                with open(file_path, 'rb') as zip_file:
                                                    files = {'zipfile': zip_file}
                                                    response = requests.post(self.api_url, files=files, data=self.db_config)
                                                    
                                                # 检查响应
                                                if response.status_code == 200:
                                                    print(f"成功处理ZIP文件: {file_path}")
                                                    print(f"API响应: {response.json()}")
                                                    
                                                    # 记录已处理的文件
                                                    timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
                                                    self.processed_files[file_path] = {
                                                        "file_path": file_path,
                                                        "package_name": package_name,
                                                        "processed_time": timestamp,
                                                        "size": os.path.getsize(file_path),
                                                        "status": "success"
                                                    }
                                                    
                                                    # 保存记录
                                                    self.save_processed_records()
                                                    return True
                                                else:
                                                    print(f"API请求失败,状态码: {response.status_code}")
                                                    print(f"响应内容: {response.text}")
                                                    
                                                    # 记录失败的处理
                                                    timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
                                                    self.processed_files[file_path] = {
                                                        "file_path": file_path,
                                                        "package_name": package_name,
                                                        "processed_time": timestamp,
                                                        "size": os.path.getsize(file_path),
                                                        "status": "failed",
                                                        "error": f"HTTP {response.status_code}: {response.text}"
                                                    }
                                                    
                                                    # 保存记录
                                                    self.save_processed_records()
                                                    return False
                                                    
                                            except Exception as e:
                                                print(f"处理文件 {file_path} 时出错: {str(e)}")
                                                
                                                # 记录错误
                                                timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
                                                self.processed_files[file_path] = {
                                                    "file_path": file_path,
                                                    "package_name": package_name,
                                                    "processed_time": timestamp,
                                                    "size": os.path.getsize(file_path) if os.path.exists(file_path) else 0,
                                                    "status": "error",
                                                    "error": str(e)
                                                }
                                                
                                                # 保存记录
                                                self.save_processed_records()
                                                return False
                                        
                                        def scan_folder(self):
                                            """扫描文件夹中的所有ZIP文件"""
                                            processed_count = 0
                                            
                                            for filename in os.listdir(self.target_folder):
                                                if filename.lower().endswith('.zip'):
                                                    file_path = os.path.join(self.target_folder, filename)
                                                    if self.process_zip_file(file_path):
                                                        processed_count += 1
                                            
                                            return processed_count
                                        
                                        def start(self, run_once=False):
                                            """
                                            启动监听进程
                                            
                                            参数:
                                                run_once: 如果为True,则只扫描一次;否则持续扫描
                                            """
                                            print(f"开始监听文件夹: {self.target_folder}")
                                            print(f"每{self.interval}秒检查一次ZIP文件并上传到: {self.api_url}")
                                            print(f"处理记录将保存到: {self.record_file}")
                                            
                                            self.is_running = True
                                            
                                            try:
                                                if run_once:
                                                    print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 扫描文件夹...")
                                                    count = self.scan_folder()
                                                    print(f"扫描完成,处理了{count}个新文件")
                                                    return
                                                
                                                while self.is_running:
                                                    print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 扫描文件夹...")
                                                    count = self.scan_folder()
                                                    print(f"扫描完成,处理了{count}个新文件,等待{self.interval}秒后再次扫描...")
                                                    
                                                    # 等待指定的间隔时间
                                                    time.sleep(self.interval)
                                                    
                                            except KeyboardInterrupt:
                                                self.stop()
                                        
                                        def stop(self):
                                            """停止监听进程"""
                                            self.is_running = False
                                            print("监听已停止") 
                                            import os
                                            import json
                                            from zip_monitor_module import ZipMonitor
                                            
                                            def load_config(config_file="config.json"):
                                                """
                                                从配置文件加载配置
                                                """
                                                try:
                                                    with open(config_file, 'r', encoding='utf-8') as f:
                                                        config = json.load(f)
                                                    print(f"已成功加载配置文件: {config_file}")
                                                    return config
                                                except Exception as e:
                                                    print(f"加载配置文件出错: {str(e)}")
                                                    return None
                                            
                                            def main():
                                                # 加载配置文件
                                                config = load_config()
                                                if not config:
                                                    print("无法加载配置文件,程序退出")
                                                    return
                                            
                                                # 从配置中获取参数
                                                target_folder = config['monitor']['target_folder']
                                                interval = config['monitor']['interval']
                                                record_file = config['monitor'].get('record_file', "processed_records.json")
                                                api_url = config['api']['url']
                                                
                                                # 构建数据库配置
                                                db_config = {
                                                    'db_host': config['database']['host'],
                                                    'db_user': config['database']['user'],
                                                    'db_password': config['database']['password'],
                                                    'db_name': config['database']['name'],
                                                    'db_port': config['database']['port']
                                                }
                                            
                                                # 显示配置信息
                                                print("已加载以下配置:")
                                                print(f"监听文件夹: {target_folder}")
                                                print(f"扫描间隔: {interval}秒")
                                                print(f"API地址: {api_url}")
                                                print(f"数据库配置: {db_config}")
                                                
                                                # 确保目标文件夹存在
                                                if not os.path.exists(target_folder):
                                                    print(f"错误: 目标文件夹 '{target_folder}' 不存在")
                                                    return
                                                
                                                # 创建监听器实例
                                                monitor = ZipMonitor(
                                                    target_folder=target_folder,
                                                    api_url=api_url,
                                                    db_config=db_config,
                                                    interval=interval,
                                                    record_file=record_file
                                                )
                                                
                                                # 启动监听
                                                try:
                                                    monitor.start()
                                                except KeyboardInterrupt:
                                                    print("程序已通过键盘中断退出")
                                            
                                            if __name__ == "__main__":
                                                main() 

                                        {
    "monitor": {
        "target_folder": "D:\\我的资料库\\Documents\\Downloads",
        "interval": 60,
        "record_file": "processed_records.json"
    },
    "api": {
        "url": ""
    },
    "database": {
        "host": "",
        "user": "",
        "password": "",
        "name": "",
        "port": ""
    }
} 
                                        
                                        
                                        
                                        
响应示例:
{
                                    API响应: {'download_url': 'http://123.56.188.163:5000/download/20250601130644/combined.sql', 'file_count': 3, 'filename': 'ruoyi_12_20250601130644.zip', 'sql_execution': {'message': '成功执行SQL文件,执行时间: 0.06秒,执行语句数: 22', 'success': True}, 'success': True}
}
下游服务器
服务器信息
  • 地址: 123.56.188.163:8077
  • 状态: 未检查
  • 用途: 提供扩展API和资源文件
可用操作
访问服务器