Files
dell-fans-controller-docker/controller/ipmi.py
lkddi 4f2042cbb4 修复温度和风扇转速读取问题
- 修复温度读取功能,使用正确的正则表达式从IPMI传感器输出中提取温度值
- 通过实验校准确定20%设置对应4800 RPM,实现准确的RPM到百分比转换
- 修改sensor()方法使用ipmitool sdr命令获取更准确的传感器数据
- 添加重试机制和错误处理
- 优化风扇控制逻辑,增加模式切换和状态跟踪
2025-12-02 17:47:51 +08:00

192 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import subprocess
import time
import re
from controller.logger import logger
class IpmiTool:
def __init__(self, host: str, username: str, password: str):
if not host or not username or not password:
raise ValueError("host, username and password must be provided")
self.host = host
self.username = username
self.password = password
def run_cmd(self, cmd: str) -> str:
basecmd = f'ipmitool -H {self.host} -I lanplus -U {self.username} -P {self.password}'
command = f'{basecmd} {cmd}'
retry_count = 3 # 设置重试次数
for attempt in range(retry_count):
try:
# print(f"Executing command: {command}") # 添加调试信息
result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
raise RuntimeError(
f'IPMI 命令执行失败: {cmd}\n错误详情: {result.stderr}' # 更清晰的错误提示
)
# 添加网络和认证排查提示
print("请检查以下内容:")
print("1. 确保 BMC 地址可访问ping 测试或网络配置)。")
print("2. 验证用户名、密码是否正确。")
print("3. 检查目标设备的 IPMI 功能是否启用。")
return result.stdout
except subprocess.TimeoutExpired:
if attempt < retry_count - 1:
logger.warning(f'命令超时,正在重试... (尝试次数 {attempt + 1}/{retry_count})')
time.sleep(5) # 每次重试前等待 5 秒
else:
raise RuntimeError('IPMI 命令超时。请检查网络连接或服务器状态。') # 更明确的错误提示
def mc_info(self) -> str:
"""
execute ipmitool command mc info
:return:
"""
return self.run_cmd(cmd='mc info')
def sensor(self) -> str:
"""
execute ipmitool command sdr to get sensor data
:return:
"""
return self.run_cmd(cmd='sdr')
def temperature(self) -> list:
"""
get current temperature
:return:
"""
data = self.sensor()
temperatures = []
import re
for line in data.splitlines():
if 'Temp' in line and 'degrees C' in line:
# 提取温度值,例如从 " 25 degrees C" 中提取 25
temp_part = line.split('|')[1] # 获取中间列的内容
# 使用正则表达式提取数字
match = re.search(r'(\d+(\.\d+)?)\s+degrees C', temp_part)
if match:
temp_value = float(match.group(1))
temperatures.append(temp_value)
return temperatures
def fan_speeds(self) -> list:
"""
get current fan speeds
:return: list of fan speeds in percentage
"""
data = self.sensor()
fan_speeds = []
for line in data.splitlines():
if 'Fan' in line and 'RPM' in line:
# Extract numeric value from line - format is typically "Fan1 | 1234 | RPM |"
parts = line.split('|')
if len(parts) >= 2:
try:
# Extract the value and convert RPM to percentage if possible
# For Dell servers, we may need to get duty cycle instead
value_str = parts[1].strip()
if value_str.isdigit():
rpm = int(value_str)
# Placeholder: we might need to use raw commands to get duty cycle
# For now, return the raw value
fan_speeds.append(rpm)
except ValueError:
continue
return fan_speeds
def get_fan_duty_cycle(self) -> int:
"""
get current fan duty cycle/percentage
:return: current fan duty cycle in percentage
"""
try:
# Raw command to get current fan duty cycle
result = self.run_cmd('raw 0x30 0x31 0x01')
# Parse the hex result to get duty cycle
result_parts = result.strip().split()
if result_parts and len(result_parts) >= 1:
# The command should return a hex value representing the duty cycle
duty_cycle_hex = result_parts[-1]
duty_cycle = int(duty_cycle_hex, 16)
# Ensure the value is in valid range (0-100)
if 0 <= duty_cycle <= 100 and duty_cycle != 0:
# If we get a reasonable value (not 0), return it
return duty_cycle
elif duty_cycle == 0:
# Value of 0 might indicate auto mode or that raw command doesn't return duty cycle on this system
logger.info('原始命令返回0尝试从RPM估算风扇百分比')
except Exception as e:
logger.warning(f'获取风扇占空比的原始命令失败: {e}')
# If raw command fails or returns 0, get fan speeds from sensor data and convert to approximate percentage
try:
data = self.sensor()
fan_rpm_values = []
import re
for line in data.splitlines():
if 'Fan' in line and 'RPM' in line and 'degrees C' not in line:
# Extract numeric value from "FanX RPM | XXXX RPM | ok" format
parts = line.split('|')
if len(parts) >= 2:
rpm_part = parts[1].strip()
# Use regex to extract RPM value
rpm_match = re.search(r'(\d+)\s+RPM', rpm_part)
if rpm_match:
rpm_value = int(rpm_match.group(1))
fan_rpm_values.append(rpm_value)
if fan_rpm_values:
# Calculate average RPM
avg_rpm = sum(fan_rpm_values) / len(fan_rpm_values)
# Based on calibration: 20% setting results in 4800 RPM
# Therefore, 100% would theoretically be 24000 RPM (4800 * 5)
# This seems high for typical server fans, but we'll use the calibrated ratio
# When 20% = 4800 RPM, the percentage = (current_rpm / 4800) * 20
calibrated_rpm_at_20_percent = 4800
calibrated_percentage = 20 # This is the known setting
# Calculate the theoretical max RPM based on the calibration
theoretical_max_rpm = calibrated_rpm_at_20_percent * (100 // calibrated_percentage) # 100/20 = 5
# Calculate the current percentage
estimated_percentage = min(100, int((avg_rpm / theoretical_max_rpm) * 100))
# Round to nearest 5 to match typical percentage steps
estimated_percentage = round(estimated_percentage / 5) * 5
return min(100, estimated_percentage)
except Exception as e:
logger.warning(f'解析传感器数据获取风扇RPM失败: {e}')
return -1 # Return -1 if unable to determine
def switch_fan_mode(self, auto: bool):
"""
switch the fan mode
:param auto:
:return:
"""
manual_cmd = 'raw 0x30 0x30 0x01 0x00'
auto_cmd = 'raw 0x30 0x30 0x01 0x01'
return self.run_cmd(cmd=auto_cmd) if auto else self.run_cmd(cmd=manual_cmd)
def set_fan_speed(self, speed: int):
"""
set fan speed
:param speed:
:return:
"""
if speed < 10 or speed > 100:
raise ValueError(
'speed must be between 10 and 100'
)
self.switch_fan_mode(auto=False)
base_cmd = 'raw 0x30 0x30 0x02 0xff'
return self.run_cmd(cmd=f'{base_cmd} {hex(speed)}')