Loading... ## 代码如下 ```python #!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2025/3/12 09:14 # @Author : huoyu # @email : 2319899766@qq.com # @File : onstove_thread.py # @Project : verifi_number # @Software: PyCharm import configparser import threading import multiprocessing import onstove_pc import os import queue from ColorInfo import ColorLogger # 读取配置文件 config = configparser.ConfigParser() config.read('config.ini') logger = ColorLogger() threads_count = int(config['Settings']['threads']) processes_count = int(config['Settings']['processes']) def load_data(file_path, data_queue): # 从文件中读取数据并将其放入队列 with open(file_path, 'r', encoding="utf-8") as file: for line in file: data_queue.put(line.strip()) # 去除行末的换行符,并将数据放入队列 # 标记数据读取完成 data_queue.put(None) def task(data_queue, save_data_queue): while True: data = data_queue.get() if data is None: # 读取到None标记时退出 data_queue.put(None) # 传递None到队列,让其他线程也能退出 break username = data.split('----')[0] passwd = data.split('----')[1] result = onstove_pc.signin_old(username, passwd) print(f"处理数据: {data}") if result == None: result = "" result = data + "----" + result + "\n" save_data_queue.put(result) print(f"数据处理完成: {data}") def worker_process(data_queue, save_data_queue): print(f"进程 {multiprocessing.current_process().name} 启动") threads = [] for _ in range(threads_count): thread = threading.Thread(target=task, args=(data_queue, save_data_queue)) threads.append(thread) thread.start() for thread in threads: thread.join() print(f"进程 {multiprocessing.current_process().name} 完成任务") def save_data(save_data_queue, process_event): # 确保'record'文件夹存在 os.makedirs("./record", exist_ok=True) count = 1 # 打开文件句柄 with open("./record/user_power_error.txt", "a", encoding="utf-8") as user_error_file, \ open("./record/captcha_error.txt", "a", encoding="utf-8") as captcha_error_file, \ open("./record/out_time.txt", "a", encoding="utf-8") as out_time_file, \ open("./record/Blocked_account.txt", "a", encoding="utf-8") as Blocked_account_file, \ open("./record/login_success.txt", "a", encoding="utf-8") as success_file, \ open("./record/other_error.txt", "a", encoding="utf-8") as other_error_file: # 实时监控数据队列,保存数据 while True: try: data = save_data_queue.get(timeout=1) # 每当有新数据就获取,超时时间设为1秒 if data is None: # 读取到None标记时退出 if save_data_queue.empty(): print("所有数据保存完毕") break else: save_data_queue.put(None) else: logger.info(data) print("已经执行了完第" + str(count) + "条") count += 1 # logger.debug("3", "4") # logger.warning("5") # logger.error("6", "7", "yes") # print(f"保存数据: {data}") # 根据不同错误类型保存到对应文件 if "incorrect" in data: # 账号密码错误 user_error_file.write(data) user_error_file.flush() elif "Blocked account" in data: # 验证码错误 Blocked_account_file.write(data) Blocked_account_file.flush() elif "captcha" in data: # 验证码错误 captcha_error_file.write(data) captcha_error_file.flush() elif "timed out after" in data or "BoringSSL SSL_connect" in data: # 超时 out_time_file.write(data) out_time_file.flush() elif "huoyu" in data: # 成功 success_file.write(data) success_file.flush() elif "CONNECT tunnel failed, response 407" in data: print("代理到期") data_queue.put(None) break else: # 其他错误 other_error_file.write(data) other_error_file.flush() except queue.Empty: # 当数据队列在 1 秒内没有新数据时,检查进程是否都完成 if process_event.is_set() and save_data_queue.empty(): print("所有数据保存完毕") break # 如果所有进程任务都完成,且队列为空,退出保存数据线程 if __name__ == '__main__': manager = multiprocessing.Manager() data_queue = manager.Queue() # 创建一个共享队列 save_data_queue = manager.Queue() # 创建一个共享队列 process_event = multiprocessing.Event() # 创建一个事件标志 processes = [] # 启动数据加载线程 data_loader_thread = threading.Thread(target=load_data, args=('资料.txt', data_queue)) data_loader_thread.start() # 启动数据保存线程 data_saver_thread = threading.Thread(target=save_data, args=(save_data_queue, process_event)) data_saver_thread.start() # 启动工作进程,传递 save_data_queue 参数 for _ in range(processes_count): process = multiprocessing.Process(target=worker_process, args=(data_queue, save_data_queue)) processes.append(process) process.start() # 等待数据加载线程完成 data_loader_thread.join() # 等待所有进程完成 for process in processes: process.join() # 所有进程完成后设置事件标志 process_event.set() # 等待数据保存线程完成 data_saver_thread.join() print("程序结束") ``` 数据传入和传出都用的消息队列,保证数据安全性。 最后修改:2025 年 03 月 12 日 © 允许规范转载 打赏 赞赏作者 支付宝微信 赞 如果觉得我的文章对你有用,请随意赞赏