AI摘要:VOS智能控制并发工具可自动监控和调整网关账号的并发容量,保持主、附属账号通话比例在设定范围内。主要功能包括自动监控、调整容量、独立设置检查间隔等。使用方法简单,需安装Python及依赖,修改配置文件后运行程序。配置说明详细,包含全局和账号配置参数。注意事项涵盖程序启动、退出及调整规则。日志输出提供运行状态和操作信息。
Powered by 奕蛋壳.
网关容量自动调整工具
功能说明
本工具用于自动监控和调整网关账号的并发容量,确保主账号和附属账号之间的通话比例保持在设定范围内。
主要功能
- 自动监控多个账号对的通话状态
- 根据设定的比例自动调整主账号容量
- 支持为每个账号对设置独立的检查间隔、目标比例和阈值
- 当主账号通话数低于阈值时自动重置容量
- 程序启动和退出时自动重置所有账号容量
使用方法
- 确保已安装Python 3.6或更高版本
安装所需依赖:
pip install requests
- 修改
gateway_config.json
配置文件(详见配置说明) 运行程序:
python VOS智能并发.py
配置说明
配置文件gateway_config.json
包含以下参数:
全局配置
gateway_ip
: 网关服务器地址和端口,格式为"IP:端口"ratio_error
: 允许的比例误差范围,例如:目标比例是20:1,实际比例在19.5:1到20.5:1之间都是可接受的default_capacity
: 默认并发容量,程序启动、退出或重置时,主账号会被设置为此容量值
账号配置
accounts
数组包含多个账号对配置,每个账号对包含:
main_account
: 主账号名称,附属账号会自动命名为"主账号名-k"target_ratio
: 目标比例,表示主账号:附属账号的比例,例如20表示20:1check_interval
: 检查间隔(秒),每隔指定秒数检查一次该账号对的状态min_current_call
: 最小通话数阈值,当主账号当前通话数低于此值时,会重置主账号容量为默认值,并将附属账号容量设为0
配置文件示例
{
"gateway_ip": "192.168.1.1:1810",
"ratio_error": 0.5,
"default_capacity": 5000,
"accounts": [
{
"main_account": "网关1",
"target_ratio": 20,
"check_interval": 30,
"min_current_call": 100
},
{
"main_account": "网关2",
"target_ratio": 15,
"check_interval": 45,
"min_current_call": 150
}
]
}
注意事项
- 程序启动时会自动重置所有账号容量
- 程序退出时会自动重置所有账号容量
- 当主账号通话数低于阈值时,会自动重置容量
- 建议根据实际需求调整检查间隔,避免过于频繁的请求
- 确保配置文件中的账号名称正确无误
- 配置文件必须使用UTF-8编码保存
- JSON格式必须正确,不能包含注释
- 每个账号对可以设置不同的阈值,以适应不同的业务需求
容量调整规则
当主账号通话数低于阈值时:
- 主账号容量重置为默认容量
- 附属账号容量设置为0
当需要调整比例时:
- 主账号容量根据比例计算调整
- 附属账号容量设置为5000
程序退出时:
- 主账号容量重置为默认容量
- 附属账号容量设置为0
日志说明
程序运行时会输出以下信息:
- 配置加载状态
- 每个账号对的当前状态(包括目标比例和阈值)
- 容量调整操作
- 错误信息(如果有)
示例输出
加载配置成功:
网关地址: 192.168.1.1:1810
比例误差范围: ±0.5
默认容量: 5000
监控账号对: 2个
检查账号 网关去1 (检查间隔: 30秒)
账号: 网关1
当前通话数: 150
限制并发数: 5000
------------------------------
账号: 网关1-k
当前通话数: 8
限制并发数: 0
------------------------------
import requests
import time
import json
import os
from datetime import datetime, timedelta
import atexit
from concurrent.futures import ThreadPoolExecutor
import threading
import signal
import sys
# 配置文件路径
CONFIG_FILE = "gateway_config.json"
# 最大重试次数
MAX_RETRIES = 3
# 重试间隔(秒)
RETRY_INTERVAL = 2
# 最大并行线程数
MAX_WORKERS = 5
# 批量处理大小
BATCH_SIZE = 10
# 配置文件检查间隔(秒)
CONFIG_CHECK_INTERVAL = 5
# 全局变量,用于存储配置
global_config = None
config_last_modified = 0
config_lock = threading.Lock()
def signal_handler(signum, frame):
"""处理信号"""
print(f"\n收到信号 {signum},正在关闭程序...")
if global_config:
reset_account_capacities(global_config)
sys.exit(0)
def check_config_changed():
"""检查配置文件是否被修改"""
global config_last_modified
try:
current_mtime = os.path.getmtime(CONFIG_FILE)
if current_mtime > config_last_modified:
config_last_modified = current_mtime
return True
except:
pass
return False
def reload_config():
"""重新加载配置文件"""
global global_config
try:
with config_lock:
new_config = load_config()
if new_config:
print("\n检测到配置文件已更新,正在重新加载...")
print(f"网关地址: {new_config['gateway_ip']}")
print(f"比例误差范围: ±{new_config['ratio_error']}")
print(f"默认容量: {new_config['default_capacity']}")
print(f"监控账号对: {len(new_config['accounts'])}个")
global_config = new_config
return True
except Exception as e:
print(f"重新加载配置文件失败: {e}")
return False
def load_config():
if not os.path.exists(CONFIG_FILE):
raise FileNotFoundError(f"配置文件 {CONFIG_FILE} 不存在,请先创建配置文件")
try:
with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
config = json.load(f)
# 验证必要配置项
required_fields = ['gateway_ip', 'ratio_error', 'default_capacity', 'accounts']
for field in required_fields:
if field not in config:
raise ValueError(f"配置文件中缺少必要字段: {field}")
# 验证账号配置
if not isinstance(config['accounts'], list) or len(config['accounts']) == 0:
raise ValueError("accounts 必须是非空数组")
for account in config['accounts']:
required_account_fields = ['main_account', 'target_ratio', 'check_interval', 'min_current_call']
for field in required_account_fields:
if field not in account:
raise ValueError(f"账号配置中缺少必要字段: {field}")
return config
except json.JSONDecodeError:
raise ValueError("配置文件格式错误,请检查JSON格式")
except Exception as e:
raise ValueError(f"加载配置文件时发生错误: {str(e)}")
def get_gateway_status_batch(names, gateway_ip):
"""批量获取账号状态"""
url = f"http://{gateway_ip}/external/server/GetGatewayMappingOnline"
data = {"names": names}
for retry in range(MAX_RETRIES):
try:
response = requests.post(url, json=data, timeout=10)
response.raise_for_status()
result = response.json()
if "infoGatewayMappingOnlines" in result:
return {status['name']: status for status in result["infoGatewayMappingOnlines"]}
return {}
except requests.RequestException as e:
if retry == MAX_RETRIES - 1:
print(f"获取账号状态失败: {e}")
return {}
time.sleep(RETRY_INTERVAL)
return {}
def modify_gateway_capacity_batch(updates, gateway_ip):
"""批量修改账号容量"""
url = f"http://{gateway_ip}/external/server/ModifyGatewayMapping"
results = {}
for name, capacity in updates.items():
for retry in range(MAX_RETRIES):
try:
data = {"name": name, "capacity": capacity}
response = requests.post(url, json=data, timeout=10)
response.raise_for_status()
results[name] = True
break
except requests.RequestException as e:
if retry == MAX_RETRIES - 1:
print(f"修改账号 {name} 容量失败: {e}")
results[name] = False
time.sleep(RETRY_INTERVAL)
return results
def calculate_new_capacity(main_current, sub_current, target_ratio):
total_current = main_current + sub_current
# 计算新的主账号容量,保持目标比例
ratio_factor = target_ratio / (target_ratio + 1)
new_main_capacity = int(total_current * ratio_factor)
print(f"总话单数: {total_current}")
print(f"按{target_ratio}:1比例分配:")
print(f"主账号应处理: {new_main_capacity}")
print(f"附属账号应处理: {total_current - new_main_capacity}")
return new_main_capacity
def print_status(account_name, status):
print(f"账号: {status['name']}")
print(f"当前通话数: {status['currentCall']}")
print(f"限制并发数: {status['capacity']}")
print("-" * 30)
def process_account_batch(account_batch, config, last_check_times):
"""批量处理一组账号"""
current_time = datetime.now()
names = []
account_info = {}
# 收集需要检查的账号
for account_pair in account_batch:
main_account = account_pair["main_account"]
if current_time - last_check_times[main_account] >= timedelta(seconds=account_pair["check_interval"]):
names.extend([main_account, f"{main_account}-k"])
account_info[main_account] = account_pair
last_check_times[main_account] = current_time
if not names:
return
# 批量获取状态
statuses = get_gateway_status_batch(names, config["gateway_ip"])
if not statuses:
return
# 处理每个账号
updates = {}
for main_account, account_pair in account_info.items():
sub_account = f"{main_account}-k"
if main_account not in statuses or sub_account not in statuses:
continue
main_status = statuses[main_account]
sub_status = statuses[sub_account]
target_ratio = account_pair["target_ratio"]
min_current_call = account_pair["min_current_call"]
print(f"\n{main_account} 状态 (目标比例: {target_ratio}:1, 阈值: {min_current_call}):")
print_status(main_account, main_status)
print_status(sub_account, sub_status)
# 如果主账号currentCall低于阈值,重置为默认容量,并将附属账号容量设为0
if main_status["currentCall"] < min_current_call:
print(f"主账号 {main_account} currentCall {main_status['currentCall']} 低于阈值 {min_current_call},重置容量")
updates[main_account] = config["default_capacity"]
updates[sub_account] = 0
continue
# 检查比例
current_ratio = main_status["currentCall"] / sub_status["currentCall"] if sub_status["currentCall"] > 0 else float('inf')
# 如果比例超出误差范围,就进行调整
if abs(current_ratio - target_ratio) > config["ratio_error"]:
new_capacity = calculate_new_capacity(main_status["currentCall"], sub_status["currentCall"], target_ratio)
print(f"当前比例 {current_ratio:.2f}:1 超出误差范围,调整主账号容量到 {new_capacity}")
updates[main_account] = new_capacity
updates[sub_account] = 5000
# 批量更新容量
if updates:
results = modify_gateway_capacity_batch(updates, config["gateway_ip"])
# 根据更新结果调整等待时间
wait_time = 5 if any(results.values()) else 1
time.sleep(wait_time)
def reset_account_capacities(config):
"""重置所有账号容量"""
print("\n正在重置所有账号容量...")
updates = {}
for account_pair in config["accounts"]:
main_account = account_pair["main_account"]
sub_account = f"{main_account}-k"
updates[main_account] = config["default_capacity"]
updates[sub_account] = 0
# 最多重试3次
for retry in range(3):
try:
results = modify_gateway_capacity_batch(updates, config["gateway_ip"])
if all(results.values()):
print("所有账号容量重置完成")
return True
else:
print(f"部分账号容量重置失败,正在重试 ({retry + 1}/3)")
time.sleep(2)
except Exception as e:
print(f"重置容量时发生错误: {e}")
time.sleep(2)
print("重置容量失败,请手动检查")
return False
def main():
global global_config, config_last_modified
try:
# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
signal.signal(signal.SIGTERM, signal_handler) # 终止信号
# 加载配置
config = load_config()
global_config = config # 保存到全局变量
config_last_modified = os.path.getmtime(CONFIG_FILE)
print("加载配置成功:")
print(f"网关地址: {config['gateway_ip']}")
print(f"比例误差范围: ±{config['ratio_error']}")
print(f"默认容量: {config['default_capacity']}")
print(f"监控账号对: {len(config['accounts'])}个")
# 注册程序退出时的处理函数
atexit.register(reset_account_capacities, config)
# 初始化所有账号的容量
reset_account_capacities(config)
# 初始化每个账号的上次检查时间
last_check_times = {account_pair["main_account"]: datetime.now() for account_pair in config["accounts"]}
# 将账号分成多个批次
account_batches = [config["accounts"][i:i + BATCH_SIZE] for i in range(0, len(config["accounts"]), BATCH_SIZE)]
last_config_check = datetime.now()
while True:
# 检查配置文件是否更新
current_time = datetime.now()
if (current_time - last_config_check).total_seconds() >= CONFIG_CHECK_INTERVAL:
if check_config_changed():
if reload_config():
# 更新账号批次
account_batches = [global_config["accounts"][i:i + BATCH_SIZE]
for i in range(0, len(global_config["accounts"]), BATCH_SIZE)]
# 更新检查时间
last_check_times = {account_pair["main_account"]: datetime.now()
for account_pair in global_config["accounts"]}
last_config_check = current_time
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = []
for batch in account_batches:
futures.append(executor.submit(process_account_batch, batch, global_config, last_check_times))
for future in futures:
future.result()
# 等待1秒后继续检查
time.sleep(1)
except KeyboardInterrupt:
print("\n程序被用户中断")
except Exception as e:
print(f"\n程序发生错误: {str(e)}")
finally:
try:
# 程序结束时重置所有账号容量
if global_config:
reset_account_capacities(global_config)
except Exception as e:
print(f"重置容量时发生错误: {e}")
if __name__ == "__main__":
main()