Source code for fluidlab.interfaces.gpib_inter

"""Interfaces with GPIB (:mod:`fluidlab.interfaces.gpib_inter`)
===============================================================

Provides:

.. autoclass:: GPIBInterface
   :members:
   :private-members:

"""

import gpib
import time, sys

from fluidlab.interfaces import QueryInterface

timeout_values = {
    gpib.T1000s: 1000.0,
    gpib.T100ms: 100e-3,
    gpib.T100s: 100.0,
    gpib.T100us: 100e-6,
    gpib.T10ms: 10e-3,
    gpib.T10s: 10.0,
    gpib.T10us: 10e-6,
    gpib.T1ms: 1e-3,
    gpib.T1s: 1.0,
    gpib.T300ms: 300e-3,
    gpib.T300s: 300.0,
    gpib.T300us: 300e-6,
    gpib.T30ms: 30e-3,
    gpib.T30s: 30.0,
    gpib.T30us: 30e-6,
    gpib.T3ms: 3e-3,
    gpib.T3s: 3.0,
}


def closest_timeout(t):
    out = gpib.T1000s
    for k, v in timeout_values.items():
        if v >= t and v <= timeout_values[out]:
            out = k
    return out


[docs]class GPIBInterface(QueryInterface): def __init__(self, board_adress, instrument_adress, timeout=1.0, **kwargs): super().__init__() self.board_adress = board_adress self.instrument_adress = instrument_adress self.default_tmo = closest_timeout(timeout) def __str__(self): return f"GPIBInterface({self.board_adress:d}, {self.instrument_adress:d})" def __repr__(self): return str(self) def _open(self): self.handle = gpib.dev(self.board_adress, self.instrument_adress) gpib.timeout(self.handle, self.default_tmo) def _close(self): gpib.close(self.handle) def _read(self, numbytes=None, verbose=False, tracing=False): if tracing: sys.stdout.write("* <- " + str(self.instrument_adress) + " ") sys.stdout.flush() if numbytes is not None: try: data = gpib.read(self.handle, numbytes) except gpib.GpibError as ge: raise else: try: data = "" while True: chunk_size = 150 chunk = gpib.read(self.handle, chunk_size) data = data + chunk.decode("ascii") if verbose: sys.stdout.write( "{:d} bytes read \r".format(len(data)) ) if len(chunk) < chunk_size: break except gpib.GpibError as ge: pass if verbose: sys.stdout.write("\n") if tracing: sys.stdout.write(data.strip()) sys.stdout.write("\n") return data def _write(self, command, tracing=False): if tracing: print("* ->", self.instrument_adress, command) if isinstance(command, str): command = command.encode("ascii") gpib.write(self.handle, command)
[docs] def wait_for_srq(self, timeout=None): """ timeout is expressed in milliseconds for compatibility with pyvisa """ timeout = float(timeout * 1e-3) # now timeout is in seconds try: if timeout is not None: tmo = closest_timeout(timeout) # print('setting timeout to', timeout_values[tmo]) gpib.timeout(self.handle, tmo) tstart = time.monotonic() while True: sta = gpib.wait(self.board_adress, gpib.TIMO | gpib.SRQI) if (sta & gpib.TIMO) != 0: # Timed out if time.monotonic() - tstart > timeout: print("Timeout occured") break else: # SRQ asserted print("SRQ was asserted") break finally: gpib.timeout(self.handle, self.default_tmo)