Source code for fluidlab.interfaces.serial_inter

"""Interfaces with serial (:mod:`fluidlab.interfaces.serial_inter`)
===================================================================

Provides:

.. autoclass:: SerialInterface
   :members:
   :private-members:

"""

import serial
import io

from time import sleep

from fluidlab.interfaces import QueryInterface


[docs]class SerialInterface(QueryInterface): def __init__( self, port, baudrate=9600, bytesize=8, parity="N", stopbits=1, timeout=1, xonxoff=False, rtscts=False, dsrdtr=False, eol=None, multilines=False, autoremove_eol=False, use_readlines=True, **kwargs, ): """ if eol is not None, the serial port is wrapped into TextIOWrapper to allow readline to wait for an eol which is not \n Example: if eol='\r\n', '\n' on input is translated to '\r\n' before sending to the device, and read newlines are translated into '\r\n' To automatically add '\n' on writes, and remove '\r\n' on reads, set autoremove_eol to True """ super().__init__() self.port = port self.baudrate = baudrate self.bytesize = bytesize self.parity = parity self.stopbits = stopbits self.timeout = timeout self.xonxoff = xonxoff self.rtscts = rtscts self.dsrdtr = dsrdtr self.eol = eol self.multilines = multilines self.autoremove_eol = autoremove_eol self.use_readlines = use_readlines def __str__(self): return f'SerialInterface("{self.port:}")' def __repr__(self): return str(self) def _open(self): # open serial port sp = serial.Serial( port=self.port, baudrate=self.baudrate, bytesize=self.bytesize, parity=self.parity, stopbits=self.stopbits, timeout=self.timeout, xonxoff=self.xonxoff, rtscts=self.rtscts, dsrdtr=self.dsrdtr, ) self._lowlevel = self.serial_port = sp self._close = sp.close if self.eol is not None and self.use_readlines: self.ser_io = io.TextIOWrapper( io.BufferedRWPair(sp, sp, 1), newline=self.eol, line_buffering=True, ) def _write(self, *args): if self.autoremove_eol: args = [a.strip() + "\n" for a in args] # print('->', repr(args[0])) if self.eol is not None: if self.use_readlines: return self.ser_io.write(*args) else: for a in args: self.serial_port.write( a.encode("ascii") + self.eol.encode("ascii") ) else: # ensure no unicode strings sent to serial_port.write args = [a.encode("ascii") if isinstance(a, str) else a for a in args] return self.serial_port.write(*args) def readline(self, *args): if self.eol is not None: result = self.ser_io.readline(*args) if self.autoremove_eol: n = len(self.eol) return result[:-n] else: return result return self.serial_port.readline(*args) def readlines(self, *args): if self.eol is not None: result = self.ser_io.readlines(*args) if self.autoremove_eol: n = len(self.eol) return [r[:-n] for r in result] else: return result return self.serial_port.readlines(*args) def _read(self): if not self.use_readlines: iw = self.serial_port.inWaiting() data = self.serial_port.read(iw).decode("ascii") return data if self.multilines: result = self.readlines() return "\n".join(result) else: result = self.readline() if isinstance(result, str): # print("<-", repr(result)) return result # print("<-", repr(b"\n".join(result.splitlines()))) return b"\n".join(result.splitlines())
if __name__ == "__main__": with SerialInterface("/dev/ttyUSB0") as interface: print(interface.query(b"*IDN?\r\n", time_delay=1)) print(interface.query(b"*IDN?\r\n")) print(interface.query(b"ISET1?\r\n")) print(interface.query(b"HELP?\r\n", time_delay=0))