ZIP文件处理和文件夹监控解决方案
上传ZIP包中的SQL文件,合并后可一键导入数据库。
以下是可用的API接口及其调用方法:
获取系统健康状态信息
fetch('/api/health')
.then(response => response.json())
.then(data => console.log(data));
{
"status": "ok",
"time": "2023-06-01 12:34:56",
"version": "1.0.0"
}
获取文件监控的当前状态
fetch('/api/monitor/status')
.then(response => response.json())
.then(data => console.log(data));
{
"success": true,
"status": {
"running": true,
"source": "C:\\path\\to\\source",
"destination": "C:\\path\\to\\destination"
}
}
启动文件夹监控功能
source
- 监控源文件夹路径destination
- 目标文件夹路径fetch('/api/monitor/start', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
source: 'C:\\监控源文件夹',
destination: 'C:\\目标文件夹'
})
})
.then(response => response.json())
.then(data => console.log(data));
{
"success": true,
"message": "文件监控已启动",
"status": {
"running": true,
"source": "C:\\监控源文件夹",
"destination": "C:\\目标文件夹"
}
}
停止文件夹监控功能
fetch('/api/monitor/stop', {
method: 'POST'
})
.then(response => response.json())
.then(data => console.log(data));
{
"success": true,
"message": "监控已停止",
"status": {
"running": false,
"source": "",
"destination": ""
}
}
上传ZIP压缩包,自动解压包内SQL文件并导入到目标数据库
zipfile
- ZIP文件 (必需)db_host
- 数据库主机 (可选)db_user
- 数据库用户名 (可选)db_password
- 数据库密码 (可选)db_name
- 数据库名称 (可选)const formData = new FormData();
import os
import time
import json
import requests
class ZipMonitor:
def __init__(self, target_folder, api_url, db_config=None, interval=60, record_file="processed_records.json"):
"""
初始化ZIP文件监听器
参数:
target_folder: 要监听的文件夹路径
api_url: 处理ZIP文件的API地址
db_config: 数据库配置参数(字典)
interval: 扫描间隔(秒)
record_file: 记录已处理文件的JSON文件
"""
self.target_folder = target_folder
self.api_url = api_url
self.interval = interval
self.record_file = record_file
# 设置默认数据库配置
self.db_config = {
'db_host': '',
'db_user': '',
'db_password': '',
'db_name': ''
}
# 如果提供了数据库配置,则更新默认配置
if db_config:
self.db_config.update(db_config)
# 运行状态标志
self.is_running = False
# 加载已处理的文件记录
self.processed_files = self.load_processed_records()
def load_processed_records(self):
"""加载已处理的文件记录"""
try:
if os.path.exists(self.record_file):
with open(self.record_file, 'r', encoding='utf-8') as f:
records = json.load(f)
return {record['file_path']: record for record in records}
else:
return {}
except Exception as e:
print(f"加载记录文件时出错: {str(e)}")
return {}
def save_processed_records(self):
"""保存已处理的文件记录"""
try:
records = list(self.processed_files.values())
with open(self.record_file, 'w', encoding='utf-8') as f:
json.dump(records, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"保存记录文件时出错: {str(e)}")
def get_zip_package_name(self, file_path):
"""从ZIP文件路径中提取包名"""
file_name = os.path.basename(file_path)
# 移除.zip扩展名
package_name = os.path.splitext(file_name)[0]
return package_name
def process_zip_file(self, file_path):
"""处理单个ZIP文件"""
# 检查文件是否是ZIP文件
if not file_path.lower().endswith('.zip'):
return False
# 避免重复处理同一个文件
if file_path in self.processed_files:
return False
package_name = self.get_zip_package_name(file_path)
print(f"检测到新的ZIP文件: {file_path}")
print(f"包名: {package_name}")
# 确保文件存在且可访问
if not os.path.exists(file_path) or not os.access(file_path, os.R_OK):
print(f"文件 {file_path} 不存在或无法访问")
return False
try:
# 上传文件到API
with open(file_path, 'rb') as zip_file:
files = {'zipfile': zip_file}
response = requests.post(self.api_url, files=files, data=self.db_config)
# 检查响应
if response.status_code == 200:
print(f"成功处理ZIP文件: {file_path}")
print(f"API响应: {response.json()}")
# 记录已处理的文件
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
self.processed_files[file_path] = {
"file_path": file_path,
"package_name": package_name,
"processed_time": timestamp,
"size": os.path.getsize(file_path),
"status": "success"
}
# 保存记录
self.save_processed_records()
return True
else:
print(f"API请求失败,状态码: {response.status_code}")
print(f"响应内容: {response.text}")
# 记录失败的处理
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
self.processed_files[file_path] = {
"file_path": file_path,
"package_name": package_name,
"processed_time": timestamp,
"size": os.path.getsize(file_path),
"status": "failed",
"error": f"HTTP {response.status_code}: {response.text}"
}
# 保存记录
self.save_processed_records()
return False
except Exception as e:
print(f"处理文件 {file_path} 时出错: {str(e)}")
# 记录错误
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
self.processed_files[file_path] = {
"file_path": file_path,
"package_name": package_name,
"processed_time": timestamp,
"size": os.path.getsize(file_path) if os.path.exists(file_path) else 0,
"status": "error",
"error": str(e)
}
# 保存记录
self.save_processed_records()
return False
def scan_folder(self):
"""扫描文件夹中的所有ZIP文件"""
processed_count = 0
for filename in os.listdir(self.target_folder):
if filename.lower().endswith('.zip'):
file_path = os.path.join(self.target_folder, filename)
if self.process_zip_file(file_path):
processed_count += 1
return processed_count
def start(self, run_once=False):
"""
启动监听进程
参数:
run_once: 如果为True,则只扫描一次;否则持续扫描
"""
print(f"开始监听文件夹: {self.target_folder}")
print(f"每{self.interval}秒检查一次ZIP文件并上传到: {self.api_url}")
print(f"处理记录将保存到: {self.record_file}")
self.is_running = True
try:
if run_once:
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 扫描文件夹...")
count = self.scan_folder()
print(f"扫描完成,处理了{count}个新文件")
return
while self.is_running:
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 扫描文件夹...")
count = self.scan_folder()
print(f"扫描完成,处理了{count}个新文件,等待{self.interval}秒后再次扫描...")
# 等待指定的间隔时间
time.sleep(self.interval)
except KeyboardInterrupt:
self.stop()
def stop(self):
"""停止监听进程"""
self.is_running = False
print("监听已停止")
import os
import json
from zip_monitor_module import ZipMonitor
def load_config(config_file="config.json"):
"""
从配置文件加载配置
"""
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
print(f"已成功加载配置文件: {config_file}")
return config
except Exception as e:
print(f"加载配置文件出错: {str(e)}")
return None
def main():
# 加载配置文件
config = load_config()
if not config:
print("无法加载配置文件,程序退出")
return
# 从配置中获取参数
target_folder = config['monitor']['target_folder']
interval = config['monitor']['interval']
record_file = config['monitor'].get('record_file', "processed_records.json")
api_url = config['api']['url']
# 构建数据库配置
db_config = {
'db_host': config['database']['host'],
'db_user': config['database']['user'],
'db_password': config['database']['password'],
'db_name': config['database']['name'],
'db_port': config['database']['port']
}
# 显示配置信息
print("已加载以下配置:")
print(f"监听文件夹: {target_folder}")
print(f"扫描间隔: {interval}秒")
print(f"API地址: {api_url}")
print(f"数据库配置: {db_config}")
# 确保目标文件夹存在
if not os.path.exists(target_folder):
print(f"错误: 目标文件夹 '{target_folder}' 不存在")
return
# 创建监听器实例
monitor = ZipMonitor(
target_folder=target_folder,
api_url=api_url,
db_config=db_config,
interval=interval,
record_file=record_file
)
# 启动监听
try:
monitor.start()
except KeyboardInterrupt:
print("程序已通过键盘中断退出")
if __name__ == "__main__":
main()
{
"monitor": {
"target_folder": "D:\\我的资料库\\Documents\\Downloads",
"interval": 60,
"record_file": "processed_records.json"
},
"api": {
"url": ""
},
"database": {
"host": "",
"user": "",
"password": "",
"name": "",
"port": ""
}
}
{
API响应: {'download_url': 'http://123.56.188.163:5000/download/20250601130644/combined.sql', 'file_count': 3, 'filename': 'ruoyi_12_20250601130644.zip', 'sql_execution': {'message': '成功执行SQL文件,执行时间: 0.06秒,执行语句数: 22', 'success': True}, 'success': True}
}