"""keithley_2700
================
.. autoclass:: Keithley2700
:members:
:private-members:
"""
__all__ = ["Keithley2700"]
import numpy as np
import time
from fluiddyn.util.terminal_colors import cprint
from fluidlab.instruments.iec60488 import IEC60488
from fluidlab.instruments.features import SuperValue, BoolValue
[docs]class Keithley2700(IEC60488):
"""Driver for the multiplexer Keithley 2700 Series"""
def __init__(self, interface=None):
super().__init__(interface)
self.Range = dict()
self.NPLC = dict()
def set_range(self, *, channelNumber=1, manualRange=False, rangeValue=None):
if not manualRange and channelNumber in self.Range:
del self.Range[channelNumber]
elif manualRange:
self.Range[channelNumber] = rangeValue
[docs] def set_nplc(self, *, channelNumber=1, nplcValue=1.0):
"""This function sets the integration time of the ADC.
:param channelNumber: the channel for which to set the NPLC value
:type channelNumber: int
:param nplcValue: Integration time, expressed in terms of line frequency (PLC). It spans from 0.01 to 10. The default PLC is 1.
:type nplcValue: float
.. note::
Per the documentation, there is a relation between the integration
time, the short name (fast, med, slow) and the effective number of digits
========== ==== =============
Short name NPLC Resolution
========== ==== =============
>Fast 0.01 3 1/2 digits
Fast 0.1 5 1/2 digits
Med 1.0 6 1/2 digits
Slow 5.0
========== ==== =============
"""
nplcValue = float(nplcValue)
self.NPLC[channelNumber] = nplcValue
[docs] def scan(
self, channelList, functionName, samplesPerChan, sampleRate, verbose
):
"""Initiates a scan.
This method is called by the features get method.
:param channelList: channel number or iterable of channel numbers
:type channelList: int or list
:param functionName: measurement function to configure. Some possible values are VOLT:DC, VOLT:AC, FRES, RES, CURR:DC or CURR:AC. Refer to Keithley documentation for other functions.
:type functionName: str
:param samplesPerChan: Number of samples to be acquired on each channel. They are stored in the device buffer during acquisition (maximum 450000). Defaults to 1.
:type samplesPerChan: int
:param sampleRate: frequency of the internal clock used to trigger measurements. The instrument resolution is 1 ms. Defaults to 1 kHz (maximum frequency).
:type sampleRate: float
:param verbose: prints additionnal information for debugging purposes
:type verbose: bool
.. note::
If len(channelList) == 1 and samplesPerChan = 1, a one-shot
measurement is performed, instead of a scan.
"""
# Make sure channelList is iterable
try:
channelList[0]
except TypeError:
channelList = [channelList]
# Check Front/Rear switch
if len(channelList) > 1 and self.front.get() == True:
raise ValueError(
"Cannot get several channels while Front/Read switch is Front"
)
# Check number of points (max memory 450000 data points)
if samplesPerChan * len(channelList) > 450_000:
raise ValueError("Cannot request more than 450000 on Keithley 2700")
# Check sampleRate
if sampleRate:
timeInterval = 1 / sampleRate
if timeInterval < 1e-3:
raise ValueError("The timer resolution is 1 ms")
else:
timeInterval = 1e-3
if channelList == [1]:
# Lecture en face avant
chan = 1
self.clear_status()
self.interface.write("TRAC:CLE") # clear buffer
self.interface.write(
"INIT:CONT OFF"
) # disable continuous initialisation
self.interface.write(f'SENS:FUNC "{functionName}"')
# Set range
if chan in self.Range:
self.interface.write(
"SENS:{func:}:RANG {rang:}".format(
func=functionName, rang=self.Range[chan]
)
)
else:
self.interface.write(f"SENS:{functionName}:RANG:AUTO ON")
# Set NPLC
max_nplc = None
if chan in self.NPLC:
nplc = self.NPLC[chan]
else:
nplc = 1.0 # med (default value)
if max_nplc is None or nplc > max_nplc:
max_nplc = nplc
self.interface.write(f"SENS:{functionName}:NPLC {nplc}")
self.interface.write("FORM:ELEM READ,TST,CHAN")
data = self.interface.query(f"READ?", time_delay=nplc / 50.0)
start = time.monotonic()
total_timeout = 5.0 + nplc / 50
while not data.endswith("\n"):
# time_delay was insufficiant
time.sleep(0.1)
data += self.interface.read()
if time.monotonic() - start > total_timeout:
print("Timeout!")
break
# print(data)
parsed = data.split(",")
values = parsed[0]
return (float(values),)
else:
ListeChan = "(@" + ",".join([str(c) for c in channelList]) + ")"
self.clear_status()
self.interface.write("TRAC:CLE") # clear buffer
self.interface.write(
"INIT:CONT OFF"
) # disable continuous initialisation
# Set up the trigger subsystem
if samplesPerChan > 1:
self.interface.write("TRIG:SOUR TIM")
self.interface.write(f"TRIG:TIM {timeInterval}")
self.interface.write(f"TRIG:COUN {samplesPerChan}")
self.interface.write("SAMP:COUN {:}".format(len(channelList)))
# Measurement subsystem
self.interface.write(f'SENS:FUNC "{functionName}", {ListeChan}')
# Set range on specified channels
for chan in channelList:
if chan in self.Range:
self.interface.write(
"SENS:{func:}:RANG {rang:},(@{chan:})".format(
func=functionName, rang=self.Range[chan], chan=chan
)
)
else:
self.interface.write(
"SENS:{func:}:RANG:AUTO ON,(@{chan:})".format(
func=functionName, chan=chan
)
)
# Set NPLC
max_nplc = None
for chan in channelList:
if chan in self.NPLC:
nplc = self.NPLC[chan]
else:
nplc = 1.0 # med (default value)
if max_nplc is None or nplc > max_nplc:
max_nplc = nplc
self.interface.write(
"SENS:{func:}:NPLC {nplc:},(@{chan:})".format(
func=functionName, nplc=nplc, chan=chan
)
)
# Starts scan
self.interface.write(f"ROUT:SCAN {ListeChan}")
self.interface.write("ROUT:SCAN:TSO IMM")
self.interface.write("ROUT:SCAN:LSEL INT")
if samplesPerChan * len(channelList) > 1:
self.interface.write("TRAC:CLE") # clear buffer
self.interface.write(
"TRAC:POIN {:}".format(samplesPerChan * len(channelList))
)
self.interface.write(
"TRAC:NOT {:}".format(samplesPerChan * len(channelList) - 1)
) # notify on nth reading
self.interface.write("TRAC:FEED SENS; FEED:CONT NEXT")
# self.interface.write("TRIG:COUN 1")
# self.interface.write("SAMP:COUN {:}".format(len(channelList)))
self.interface.write("STAT:PRES") # Reset measure enable bits
self.clear_status() # *CLS
self.interface.write(
"STAT:MEAS:ENAB 64"
) # Enable buffer bits B6 (buffer notify) (, 8, 9, 12, 13)
self.event_status_enable_register.set(0) # *ESE 0
self.status_enable_register.set(1) # *SRE 1
self.interface.write("INIT:IMM")
start_meas = time.monotonic()
# self.wait_till_completion_of_operations() # *OPC
if samplesPerChan > 1:
tmo = 1000 * (samplesPerChan / sampleRate) * 1.5
else:
tmo = 1000 * len(channelList) * max_nplc * (1 / 50) * 1.5
tmo *= 10
if tmo < 10e3:
tmo = 10e3
# print("tmo =", tmo, "ms")
try:
self.interface.wait_for_srq(timeout=tmo)
except:
cprint.red("Error while waiting SRQ")
else:
cprint.green(
"SRQ received after "
f"{time.monotonic() - start_meas:.1f} seconds"
)
# Unassert SRQ
self.clear_status()
# Query number of points in buffer
npoints = int(self.interface.query("TRAC:POIN?"))
print("npoints =", npoints)
# Fetch data
self.interface.write("FORM:ELEM READ,TST,CHAN")
self.interface.write("TRAC:DATA?")
data = ""
start_fetch = time.monotonic()
while True:
try:
data += self.interface.read()
start_fetch = (
time.monotonic()
) # reset if something was returned
except Exception:
print("Timeout reading on interface")
nread = len(data.split(",")) // 3
if nread == npoints:
cprint.green("All datapoints read")
break
time.sleep(0.5)
self.interface.write("TRAC:DATA?")
if time.monotonic() - start_fetch > 15:
cprint.red("Timeout fetching data")
break
else:
# self.interface.write("TRIG:COUN {:}".format(samplesPerChan))
# self.interface.write("SAMP:COUN {:}".format(len(channelList)))
self.interface.write("FORM:ELEM READ,TST,CHAN")
data = self.interface.query("READ?")
npoints = 1
self.interface.write(":ROUT:SCAN:LSEL NONE")
# Parsing data
# print(data.strip())
try:
data = np.array([float(x) for x in data.split(",")])
except ValueError:
print("K2700 returned:", data)
raise
# print(data.size//3, "values returned")
if data.size // 3 != npoints:
raise ValueError("Not all points were fetched")
values = data[::3]
timestamps = data[1::3]
channels = data[2::3]
if values.size != timestamps.size:
raise ValueError("Error while parsing")
if channels.size != timestamps.size:
raise ValueError("Error while parsing")
if samplesPerChan > 1:
# returns timeStamp, value for each channelList
retval = list()
for channum, chan in enumerate(channelList):
this_values = values[channum :: len(channelList)]
this_timestamps = timestamps[channum :: len(channelList)]
this_chans = channels[channum :: len(channelList)]
if not (this_chans == chan).all():
raise ValueError("Error while parsing")
retval.append(this_timestamps)
retval.append(this_values)
else:
# returns values only
retval = values
return retval
class Keithley2700Value(SuperValue):
"""Custom :class:`Value` class for the Keithley 2700 features."""
def __init__(self, name, doc="", function_name=None):
super().__init__(name, doc)
self.function_name = function_name
def _build_driver_class(self, Driver):
name = self._name
function_name = self.function_name
setattr(Driver, name, self)
def get(
self, chanList=[1], samplesPerChan=1, sampleRate=None, verbose=None
):
"""Get """ + name
if verbose is None:
# default is verbose for acquisitions
verbose = samplesPerChan > 1
result = self._driver.scan(
chanList, function_name, samplesPerChan, sampleRate, verbose
)
if len(result) == 1:
result = result[0]
return result
self.get = get.__get__(self, self.__class__)
def set(self, channel, value, warn=True):
"""Set """ + name
# Makes sense for voltage on AO channels only
if name == "vdc":
self._driver.write_vdc(channel, value)
else:
raise ValueError("Specified value cannot be written")
self.set = set.__get__(self, self.__class__)
features = [
BoolValue(
"front",
doc="True if switch Front/Rear is on Front (read-only)",
command_get=":SYST:FRSW?",
),
Keithley2700Value("vdc", doc="DC voltage", function_name="VOLT:DC"),
Keithley2700Value("vrms", doc="RMS voltage", function_name="VOLT:AC"),
Keithley2700Value("ohm_4w", doc="4-wire resistance", function_name="FRES"),
Keithley2700Value("ohm", doc="2-wire resistance", function_name="RES"),
Keithley2700Value("idc", doc="DC current", function_name="CURR:DC"),
Keithley2700Value("irms", doc="RMS current", function_name="CURR:AC"),
]
Keithley2700._build_class_with_features(features)
if __name__ == "__main__":
from fluidlab.interfaces.gpib_inter import GPIBInterface
with Keithley2700(GPIBInterface(0, 16)) as km:
front = km.front.get()
print("Front/Read switch:", front)
if front:
km.set_range(manualRange=False)
km.set_nplc(nplcValue=10.0)
v = km.vdc.get()
print("v =", v)
else:
print("Single channel one-shot measurement")
print(km.vdc.get(101))
print("Multiple channel one-shot measurement")
R1, R2 = km.ohm.get([101, 102])
print(R1, R2)
print("Single channel timeseries")
ts, R = km.ohm.get(101, samplesPerChan=100, sampleRate=10.0)
print("actual frame rate:", 1 / np.mean(ts[1:] - ts[:-1]), "Hz")
import matplotlib.pyplot as plt
plt.plot(ts, R, "o")
plt.show()