Source code for fluidlab.interfaces.modbus_inter

"""
Modbus interfaces (:mod:`fluidlab.interfaces.modbus_inter`)
===========================================================

Provides:

.. autofunction:: get_modbus_interface

.. autoclass:: ModbusInterface
   :members:
   :private-members:

.. autoclass:: MinimalModbusInterface
   :members:
   :private-members:

.. autoclass:: PyModbusInterface
   :members:
   :private-members:

"""

import sys
import collections

from fluidlab.interfaces import Interface


[docs]class ModbusInterface(Interface): def __init__(self, port, method="rtu", slave_address=1, timeout=1, **kwargs): self.port = port self.method = method self.slave_address = slave_address self.timeout = timeout super().__init__() def __str__(self): return ( f"{self.__class__.__name__}({self.port:}, {self.method:}, " f"{self.slave_address:}, {self.timeout})" ) def read_readonlybool(self, addresses): raise NotImplementedError def read_bool(self, addresses): raise NotImplementedError def write_bool(self, addresses, values): raise NotImplementedError def read_readonlyint16(self, addresses): raise NotImplementedError def read_int16(self, addresses): raise NotImplementedError def write_int16(self, addresses, values): raise NotImplementedError def read_readonlyfloat32(self, addresses): raise NotImplementedError def read_float32(self, addresses): raise NotImplementedError def write_float32(self, addresses, values): raise NotImplementedError def __repr__(self): return str(self)
[docs]class MinimalModbusInterface(ModbusInterface): def _open(self): import minimalmodbus self._modbus = minimalmodbus.Instrument( self.port, self.slave_address, self.method ) def read_readonlybool(self, addresses): raise NotImplementedError def read_bool(self, addresses): if isinstance(addresses, collections.Iterable): return self._modbus.read_coils(addresses) elif isinstance(addresses, int): return self._modbus.read_coil(addresses) def write_bool(self, addresses, values): raise NotImplementedError def read_readonlyint16(self, addresses): raise NotImplementedError def read_int16(self, addresses): if isinstance(addresses, collections.Iterable) and len(addresses) == 2: return self._modbus.read_registers(addresses[0], addresses[1]) elif isinstance(addresses, int): return self._modbus.read_register(addresses) else: raise ValueError( "`addresses` must be an int or an iterable of length 2" ) def write_int16(self, address, values, signed=False): if isinstance(values, collections.Iterable): self._modbus.write_registers(address, values, signed=signed) elif isinstance(values, int): self._modbus.write_register(address, values, signed=signed) else: raise ValueError("`values` must be an int or an iterable of ints") def read_readonlyfloat32(self, addresses): raise NotImplementedError def read_float32(self, addresses): raise NotImplementedError def write_float32(self, addresses, values): raise NotImplementedError
[docs]class PyModbusInterface(ModbusInterface): def _open(self): try: if sys.version_info.major == 2: from pymodbus.client.sync import ModbusSerialClient else: from pymodbus3.client.sync import ModbusSerialClient except ImportError: if sys.version_info.major == 2: package_name = "pymodbus" else: package_name = "pymodbus3" raise ImportError( f"Can not import {package_name}. " + "Is it installed? If not, try to install it." ) self._modbus = ModbusSerialClient(method=self.method, port=self.port) def read_readonlybool(self, addresses): raise NotImplementedError def read_bool(self, addresses): raise NotImplementedError def write_bool(self, addresses, values): raise NotImplementedError def read_readonlyint16(self, addresses): raise NotImplementedError def read_int16(self, addresses): raise NotImplementedError def write_int16(self, addresses, values): raise NotImplementedError def read_readonlyfloat32(self, addresses): raise NotImplementedError def read_float32(self, addresses): raise NotImplementedError def write_float32(self, addresses, values): raise NotImplementedError
class FalseModbusInterface(ModbusInterface): def _open(self): pass def read_readonlybool(self, addresses): return False def read_bool(self, addresses): return False def write_bool(self, addresses, values): pass def read_readonlyint16(self, addresses): return 0 def read_int16(self, addresses): return 0 def write_int16(self, address, values, signed=False): pass def read_readonlyfloat32(self, addresses): return 0.0 def read_float32(self, addresses): return 0.0 def write_float32(self, addresses, values): pass classes = { "minimalmodbus": MinimalModbusInterface, "pymodbus": PyModbusInterface, "false": FalseModbusInterface, }
[docs]def get_modbus_interface( port, timeout=1, module="minimalmodbus", signed=False, method="rtu", slave_address=1, **kwargs, ): f"""Get a Modbus interface module can be in {list(classes.keys())} """ cls = classes[module] interface = cls( port, method=method, slave_address=slave_address, timeout=timeout, **kwargs, ) interface.open() return interface