VOS智能控制并发 - 奕蛋壳

VOS智能控制并发

AI摘要:

VOS智能控制并发工具可自动监控和调整网关账号的并发容量,保持主、附属账号通话比例在设定范围内。主要功能包括自动监控、调整容量、独立设置检查间隔等。使用方法简单,需安装Python及依赖,修改配置文件后运行程序。配置说明详细,包含全局和账号配置参数。注意事项涵盖程序启动、退出及调整规则。日志输出提供运行状态和操作信息。

Powered by 奕蛋壳.

网关容量自动调整工具

功能说明

本工具用于自动监控和调整网关账号的并发容量,确保主账号和附属账号之间的通话比例保持在设定范围内。

主要功能

  1. 自动监控多个账号对的通话状态
  2. 根据设定的比例自动调整主账号容量
  3. 支持为每个账号对设置独立的检查间隔、目标比例和阈值
  4. 当主账号通话数低于阈值时自动重置容量
  5. 程序启动和退出时自动重置所有账号容量

使用方法

  1. 确保已安装Python 3.6或更高版本
  2. 安装所需依赖:

    pip install requests
  3. 修改gateway_config.json配置文件(详见配置说明)
  4. 运行程序:

    python VOS智能并发.py

配置说明

配置文件gateway_config.json包含以下参数:

全局配置

  • gateway_ip: 网关服务器地址和端口,格式为"IP:端口"
  • ratio_error: 允许的比例误差范围,例如:目标比例是20:1,实际比例在19.5:1到20.5:1之间都是可接受的
  • default_capacity: 默认并发容量,程序启动、退出或重置时,主账号会被设置为此容量值

账号配置

accounts数组包含多个账号对配置,每个账号对包含:

  • main_account: 主账号名称,附属账号会自动命名为"主账号名-k"
  • target_ratio: 目标比例,表示主账号:附属账号的比例,例如20表示20:1
  • check_interval: 检查间隔(秒),每隔指定秒数检查一次该账号对的状态
  • min_current_call: 最小通话数阈值,当主账号当前通话数低于此值时,会重置主账号容量为默认值,并将附属账号容量设为0

配置文件示例

{
    "gateway_ip": "192.168.1.1:1810",
    "ratio_error": 0.5,
    "default_capacity": 5000,
    "accounts": [
        {
            "main_account": "网关1",
            "target_ratio": 20,
            "check_interval": 30,
            "min_current_call": 100
        },
        {
            "main_account": "网关2",
            "target_ratio": 15,
            "check_interval": 45,
            "min_current_call": 150
        }
    ]
}

注意事项

  1. 程序启动时会自动重置所有账号容量
  2. 程序退出时会自动重置所有账号容量
  3. 当主账号通话数低于阈值时,会自动重置容量
  4. 建议根据实际需求调整检查间隔,避免过于频繁的请求
  5. 确保配置文件中的账号名称正确无误
  6. 配置文件必须使用UTF-8编码保存
  7. JSON格式必须正确,不能包含注释
  8. 每个账号对可以设置不同的阈值,以适应不同的业务需求

容量调整规则

  1. 当主账号通话数低于阈值时:

    • 主账号容量重置为默认容量
    • 附属账号容量设置为0
  2. 当需要调整比例时:

    • 主账号容量根据比例计算调整
    • 附属账号容量设置为5000
  3. 程序退出时:

    • 主账号容量重置为默认容量
    • 附属账号容量设置为0

日志说明

程序运行时会输出以下信息:

  • 配置加载状态
  • 每个账号对的当前状态(包括目标比例和阈值)
  • 容量调整操作
  • 错误信息(如果有)

示例输出

加载配置成功:
网关地址: 192.168.1.1:1810
比例误差范围: ±0.5
默认容量: 5000
监控账号对: 2个

检查账号 网关去1 (检查间隔: 30秒)
账号: 网关1
当前通话数: 150
限制并发数: 5000
------------------------------
账号: 网关1-k
当前通话数: 8
限制并发数: 0
------------------------------
import requests
import time
import json
import os
from datetime import datetime, timedelta
import atexit
from concurrent.futures import ThreadPoolExecutor
import threading
import signal
import sys

# 配置文件路径
CONFIG_FILE = "gateway_config.json"
# 最大重试次数
MAX_RETRIES = 3
# 重试间隔(秒)
RETRY_INTERVAL = 2
# 最大并行线程数
MAX_WORKERS = 5
# 批量处理大小
BATCH_SIZE = 10
# 配置文件检查间隔(秒)
CONFIG_CHECK_INTERVAL = 5

# 全局变量,用于存储配置
global_config = None
config_last_modified = 0
config_lock = threading.Lock()

def signal_handler(signum, frame):
    """处理信号"""
    print(f"\n收到信号 {signum},正在关闭程序...")
    if global_config:
        reset_account_capacities(global_config)
    sys.exit(0)

def check_config_changed():
    """检查配置文件是否被修改"""
    global config_last_modified
    try:
        current_mtime = os.path.getmtime(CONFIG_FILE)
        if current_mtime > config_last_modified:
            config_last_modified = current_mtime
            return True
    except:
        pass
    return False

def reload_config():
    """重新加载配置文件"""
    global global_config
    try:
        with config_lock:
            new_config = load_config()
            if new_config:
                print("\n检测到配置文件已更新,正在重新加载...")
                print(f"网关地址: {new_config['gateway_ip']}")
                print(f"比例误差范围: ±{new_config['ratio_error']}")
                print(f"默认容量: {new_config['default_capacity']}")
                print(f"监控账号对: {len(new_config['accounts'])}个")
                global_config = new_config
                return True
    except Exception as e:
        print(f"重新加载配置文件失败: {e}")
    return False

def load_config():
    if not os.path.exists(CONFIG_FILE):
        raise FileNotFoundError(f"配置文件 {CONFIG_FILE} 不存在,请先创建配置文件")
    
    try:
        with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
            config = json.load(f)
            
        # 验证必要配置项
        required_fields = ['gateway_ip', 'ratio_error', 'default_capacity', 'accounts']
        for field in required_fields:
            if field not in config:
                raise ValueError(f"配置文件中缺少必要字段: {field}")
                
        # 验证账号配置
        if not isinstance(config['accounts'], list) or len(config['accounts']) == 0:
            raise ValueError("accounts 必须是非空数组")
            
        for account in config['accounts']:
            required_account_fields = ['main_account', 'target_ratio', 'check_interval', 'min_current_call']
            for field in required_account_fields:
                if field not in account:
                    raise ValueError(f"账号配置中缺少必要字段: {field}")
                    
        return config
        
    except json.JSONDecodeError:
        raise ValueError("配置文件格式错误,请检查JSON格式")
    except Exception as e:
        raise ValueError(f"加载配置文件时发生错误: {str(e)}")

def get_gateway_status_batch(names, gateway_ip):
    """批量获取账号状态"""
    url = f"http://{gateway_ip}/external/server/GetGatewayMappingOnline"
    data = {"names": names}
    for retry in range(MAX_RETRIES):
        try:
            response = requests.post(url, json=data, timeout=10)
            response.raise_for_status()
            result = response.json()
            if "infoGatewayMappingOnlines" in result:
                return {status['name']: status for status in result["infoGatewayMappingOnlines"]}
            return {}
        except requests.RequestException as e:
            if retry == MAX_RETRIES - 1:
                print(f"获取账号状态失败: {e}")
                return {}
            time.sleep(RETRY_INTERVAL)
    return {}

def modify_gateway_capacity_batch(updates, gateway_ip):
    """批量修改账号容量"""
    url = f"http://{gateway_ip}/external/server/ModifyGatewayMapping"
    results = {}
    for name, capacity in updates.items():
        for retry in range(MAX_RETRIES):
            try:
                data = {"name": name, "capacity": capacity}
                response = requests.post(url, json=data, timeout=10)
                response.raise_for_status()
                results[name] = True
                break
            except requests.RequestException as e:
                if retry == MAX_RETRIES - 1:
                    print(f"修改账号 {name} 容量失败: {e}")
                    results[name] = False
                time.sleep(RETRY_INTERVAL)
    return results

def calculate_new_capacity(main_current, sub_current, target_ratio):
    total_current = main_current + sub_current
    # 计算新的主账号容量,保持目标比例
    ratio_factor = target_ratio / (target_ratio + 1)
    new_main_capacity = int(total_current * ratio_factor)
    print(f"总话单数: {total_current}")
    print(f"按{target_ratio}:1比例分配:")
    print(f"主账号应处理: {new_main_capacity}")
    print(f"附属账号应处理: {total_current - new_main_capacity}")
    return new_main_capacity

def print_status(account_name, status):
    print(f"账号: {status['name']}")
    print(f"当前通话数: {status['currentCall']}")
    print(f"限制并发数: {status['capacity']}")
    print("-" * 30)

def process_account_batch(account_batch, config, last_check_times):
    """批量处理一组账号"""
    current_time = datetime.now()
    names = []
    account_info = {}
    
    # 收集需要检查的账号
    for account_pair in account_batch:
        main_account = account_pair["main_account"]
        if current_time - last_check_times[main_account] >= timedelta(seconds=account_pair["check_interval"]):
            names.extend([main_account, f"{main_account}-k"])
            account_info[main_account] = account_pair
            last_check_times[main_account] = current_time
    
    if not names:
        return
    
    # 批量获取状态
    statuses = get_gateway_status_batch(names, config["gateway_ip"])
    if not statuses:
        return
    
    # 处理每个账号
    updates = {}
    for main_account, account_pair in account_info.items():
        sub_account = f"{main_account}-k"
        if main_account not in statuses or sub_account not in statuses:
            continue
            
        main_status = statuses[main_account]
        sub_status = statuses[sub_account]
        target_ratio = account_pair["target_ratio"]
        min_current_call = account_pair["min_current_call"]
        
        print(f"\n{main_account} 状态 (目标比例: {target_ratio}:1, 阈值: {min_current_call}):")
        print_status(main_account, main_status)
        print_status(sub_account, sub_status)
        
        # 如果主账号currentCall低于阈值,重置为默认容量,并将附属账号容量设为0
        if main_status["currentCall"] < min_current_call:
            print(f"主账号 {main_account} currentCall {main_status['currentCall']} 低于阈值 {min_current_call},重置容量")
            updates[main_account] = config["default_capacity"]
            updates[sub_account] = 0
            continue
        
        # 检查比例
        current_ratio = main_status["currentCall"] / sub_status["currentCall"] if sub_status["currentCall"] > 0 else float('inf')
        
        # 如果比例超出误差范围,就进行调整
        if abs(current_ratio - target_ratio) > config["ratio_error"]:
            new_capacity = calculate_new_capacity(main_status["currentCall"], sub_status["currentCall"], target_ratio)
            print(f"当前比例 {current_ratio:.2f}:1 超出误差范围,调整主账号容量到 {new_capacity}")
            updates[main_account] = new_capacity
            updates[sub_account] = 5000
    
    # 批量更新容量
    if updates:
        results = modify_gateway_capacity_batch(updates, config["gateway_ip"])
        # 根据更新结果调整等待时间
        wait_time = 5 if any(results.values()) else 1
        time.sleep(wait_time)

def reset_account_capacities(config):
    """重置所有账号容量"""
    print("\n正在重置所有账号容量...")
    updates = {}
    for account_pair in config["accounts"]:
        main_account = account_pair["main_account"]
        sub_account = f"{main_account}-k"
        updates[main_account] = config["default_capacity"]
        updates[sub_account] = 0
    
    # 最多重试3次
    for retry in range(3):
        try:
            results = modify_gateway_capacity_batch(updates, config["gateway_ip"])
            if all(results.values()):
                print("所有账号容量重置完成")
                return True
            else:
                print(f"部分账号容量重置失败,正在重试 ({retry + 1}/3)")
                time.sleep(2)
        except Exception as e:
            print(f"重置容量时发生错误: {e}")
            time.sleep(2)
    
    print("重置容量失败,请手动检查")
    return False

def main():
    global global_config, config_last_modified
    
    try:
        # 注册信号处理器
        signal.signal(signal.SIGINT, signal_handler)  # Ctrl+C
        signal.signal(signal.SIGTERM, signal_handler)  # 终止信号
        
        # 加载配置
        config = load_config()
        global_config = config  # 保存到全局变量
        config_last_modified = os.path.getmtime(CONFIG_FILE)
        
        print("加载配置成功:")
        print(f"网关地址: {config['gateway_ip']}")
        print(f"比例误差范围: ±{config['ratio_error']}")
        print(f"默认容量: {config['default_capacity']}")
        print(f"监控账号对: {len(config['accounts'])}个")
        
        # 注册程序退出时的处理函数
        atexit.register(reset_account_capacities, config)
        
        # 初始化所有账号的容量
        reset_account_capacities(config)
        
        # 初始化每个账号的上次检查时间
        last_check_times = {account_pair["main_account"]: datetime.now() for account_pair in config["accounts"]}
        
        # 将账号分成多个批次
        account_batches = [config["accounts"][i:i + BATCH_SIZE] for i in range(0, len(config["accounts"]), BATCH_SIZE)]
        
        last_config_check = datetime.now()
        
        while True:
            # 检查配置文件是否更新
            current_time = datetime.now()
            if (current_time - last_config_check).total_seconds() >= CONFIG_CHECK_INTERVAL:
                if check_config_changed():
                    if reload_config():
                        # 更新账号批次
                        account_batches = [global_config["accounts"][i:i + BATCH_SIZE] 
                                         for i in range(0, len(global_config["accounts"]), BATCH_SIZE)]
                        # 更新检查时间
                        last_check_times = {account_pair["main_account"]: datetime.now() 
                                          for account_pair in global_config["accounts"]}
                last_config_check = current_time
            
            with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
                futures = []
                for batch in account_batches:
                    futures.append(executor.submit(process_account_batch, batch, global_config, last_check_times))
                for future in futures:
                    future.result()
            
            # 等待1秒后继续检查
            time.sleep(1)
            
    except KeyboardInterrupt:
        print("\n程序被用户中断")
    except Exception as e:
        print(f"\n程序发生错误: {str(e)}")
    finally:
        try:
            # 程序结束时重置所有账号容量
            if global_config:
                reset_account_capacities(global_config)
        except Exception as e:
            print(f"重置容量时发生错误: {e}")

if __name__ == "__main__":
    main()
评论区
头像