To allow DCImanager 6 to interact with the PDU, override the BasePdu class methods:
status — PDU polling;
port_up — enabling the port;
port_down — disabling the port;
statistic — gathering statistics.
For each method, there are argument types and return values that the platform expects. The equipment management service does not use raw json data when communicating with the PDU, but rather its object representation. For example, to enable a port using the port_up method, an object of the PduPortView class with the following properties is passed as a parameter:
identify — PDU port identifier;
power_status — PDU port status in DCImanager 6.
An object of the same class with the current port state in the power_status property is expected in the reply.
When overriding methods, specify the required format of queries and return values. All auxiliary views are described in the project file /pdu_common/helper.py.
Example of handler code
Example of code
from typing import Optional, Union
from common.logger import get_logger
from common.misc import waiter, InversionDict, Oid
from pdu_common.base_snmp_pdu import BaseSnmpPdu
from pdu_common.connection import PduSnmpData
from pdu_common.helper import PduView, PduPortView, EnumPowerStatus, PduStatisticView, PduPortStatisticView
OID_RPCM_ADMINISTRATIVE_STATE = Oid("1.3.6.1.4.1.46235.2.4.2.1.4")
# Amperage of each output port in mA
OID_RPCM_OUTPUT_MILLIAMPS = Oid("1.3.6.1.4.1.46235.2.4.2.1.11")
# Energy of each output port in kW/h
OID_RPCM_OUTPUT_ENERGY_KWH = Oid("1.3.6.1.4.1.46235.2.4.2.1.15")
PORT_POWER_DICT = InversionDict(
{
EnumPowerStatus.UP: 1,
EnumPowerStatus.DOWN: 0,
}
)
def make_handler(pdu_data: PduSnmpData) -> BaseSnmpPdu:
"""
Returns APC PDU handler object
:param pdu_data: Snmp PDU data object
:return:
"""
return Rpcm(pdu_data)
class Rpcm(BaseSnmpPdu):
"""PDU Rpcm work class"""
def status(self) -> PduView:
"""
GetPduPortsStatus
:return: PduView
"""
pdu_info = PduView()
for port in self.snmp_walk(OID_RPCM_ADMINISTRATIVE_STATE):
pdu_info.ports.append(
PduPortView(
identity=port.oid_index,
power_status=PORT_POWER_DICT.get_by_value(port.value_int, EnumPowerStatus.UNKNOWN),
)
)
return pdu_info
def __get_port_status(
self, pdu_port_data: PduPortView, wait_for_status: Optional[EnumPowerStatus] = None
) -> PduPortView:
"""
Get current PDU port status
:param pdu_port_data: PduPortView
:return: Modified PduPortView
"""
logging.info(f"Get status for PDU port '{pdu_port_data.identity}'")
port_status_oid = OID_RPCM_ADMINISTRATIVE_STATE + pdu_port_data.identity
if wait_for_status is not None:
matcher = lambda response: response == wait_for_status
else:
matcher = lambda response: not isinstance(response, Exception)
@waiter
def get_status() -> Union[EnumPowerStatus, Exception]:
"""Waiter for PDU status"""
try:
res = self.snmp_get(oid=port_status_oid).value_int
return PORT_POWER_DICT.get_by_value(res, EnumPowerStatus.UNKNOWN)
except Exception as error:
return error
logging.info(f"Get status '{pdu_port_data.power_status}' for PDU port '{pdu_port_data.identity}'")
# waiting for status change to expected
pdu_port_data.power_status = get_status(matcher=matcher, timeout=10)
return pdu_port_data
def __port_change_status(self, pdu_port_data: PduPortView, power_status: EnumPowerStatus) -> PduPortView:
"""Changing PDU port power status.
Args:
pdu_port_data (PduPortView): port
power_status (EnumPowerStatus): new power status
Returns:
PduPortView: final port status
"""
self.snmp_set(
oid=OID_RPCM_ADMINISTRATIVE_STATE + pdu_port_data.identity,
value=PORT_POWER_DICT[power_status],
)
return self.__get_port_status(pdu_port_data, power_status)
def port_up(self, pdu_port_data: PduPortView) -> PduPortView:
"""Port up"""
return self.__port_change_status(pdu_port_data, EnumPowerStatus.UP)
def port_down(self, pdu_port_data: PduPortView) -> PduPortView:
"""Port down"""
return self.__port_change_status(pdu_port_data, EnumPowerStatus.DOWN)
def statistic(self) -> PduStatisticView:
"""
Get PDU statistic.
:return: PduStatisticView
"""
pdu_statistic = PduStatisticView()
logging.info("Get PDU load in Amps...")
pdu_statistic.load = sum(port.value_float for port in self.snmp_walk(OID_RPCM_OUTPUT_MILLIAMPS)) / 1000
ports_consumption = self.snmp_walk(OID_RPCM_OUTPUT_ENERGY_KWH)
logging.info("Get PDU total energy in kWh...")
pdu_statistic.total_consumption = float(sum(port.value_float for port in ports_consumption))
logging.info("Get PDU energy in kWh for every port...")
for port in ports_consumption:
pdu_statistic.ports.append(
PduPortStatisticView(
port_identity=port.oid_index,
port_consumption=port.value_float,
)
)
return pdu_statistic
logging = get_logger("rpcm")
Loading the handler into the platform
To load the handler into the platform:
Create a directory with the following structure:
handler_dir/
├── __init__.py
└── my_handler.py
Comments
handler_dir — directory name. There should not be a handler directory with the same name in the platform
__init__.py — the initialization file. If there is no such file, create an empty file with that name
my_handler.py — the handler file
Create a tar.gz archive with the following directory:
tar -czvf custom_handler.tar.gz handler_dir
Comments to the command
custom_handler.tar.gz — archive name
handler_dir –- the name of the created directory with the handler
Log in to DCImanager 6 with administrator permissions:
domain.com — domain name of the server with DCImanager 6
<handler id> — id of the created handler
<token> — authorization token
custom_handler.tar.gz — name of the archive with the handler
<import_path> - relative path for import. For example, if the handler file my_handler.py is in the handler_dir directory, the relative path would be handler_dir.my_handler
Note
You can also use this command to upload new handler versions to the platform.