"""
Pumps (:mod:`fluidlab.objects.pumps`)
=====================================
.. _pumps:
.. currentmodule:: fluidlab.objects.pumps
Provides:
.. autoclass:: MasterFlexPumps
:members:
:private-members:
"""
from __future__ import division, print_function
import os
import numpy as np
import time
import atexit
from fluiddyn.io import _write_warning
from fluiddyn.io import query
from fluiddyn.io import txt
try:
import serial
except ImportError as exc:
_write_warning("Warning: ImportError\n " + str(exc))
here = os.path.abspath(os.path.dirname(__file__))
path_calib = os.path.join(here, "calibration_pumps.txt")
if not os.path.exists(path_calib):
with open(path_calib, "w") as f:
f.write(
"Calibration pumps, maximum flowrate "
"for pumps 1 (line 1) and 2 (line 2):\n"
"1099.6\n1146.1"
)
def modif_calib_file(pump, flow_rate):
with open(path_calib, "r") as f:
lines = f.readlines()
lines[pump] = f"{flow_rate:.0f}" + "\n"
with open(path_calib, "w") as f:
f.write("".join(lines))
print("New calibration flowrate in the file...")
[docs]class MasterFlexPumps:
"""Represent some Masterflex pumps.
We use Masterflex L/S (model number 7551.00).
Parameters
----------
nb_pumps : {2, int}, optional
Number of pumps
verbose : {False, bool}, optional
If True, more verbose.
Attributes
----------
flow_rates_max: `numpy.ndarray`
Maximum flow rates of the pumps.
pumps: list
List of the pump indexes.
serial: serial.Serial
Object representing the serial port connected to the pumps.
Notes
-----
The pumps and this class are often used with the class
:class:`fluidlab.objects.tanks.StratifiedTank`.
"""
# define some useful ASCII characters
stx = chr(2) # start of text
enq = chr(5) # enquiry
ack = chr(6) # acknoledge
nak = chr(21) # negative acknoledge
cr = chr(13) # carriage return
def __init__(self, nb_pumps=2, verbose=False):
self.rot_per_min_max = 600.0
flow_rates_max = txt.quantities_from_txt_file(path_calib)[0]
if nb_pumps == 1:
self.flow_rates_max = flow_rates_max[0] # (ml/min)
if nb_pumps == 2:
self.flow_rates_max = flow_rates_max[:2] # (ml/min)
else:
self.flow_rates_max = 1000 * np.ones(nb_pumps) # (ml/min)
self.pumps = range(1, nb_pumps + 1)
port = "COM3" # Serial port to which the pump(s) are connected (COM3)
# configure the serial connections (the parameters differs on
# the device you are connecting to)
try:
self.serial = serial.Serial(
port=port,
baudrate=4800,
bytesize=7,
parity=serial.PARITY_ODD,
stopbits=serial.STOPBITS_ONE
# ,
# xonxoff=False, # xon=off,
# rtscts=False # rts=hs
)
except NameError as exc:
self.serial = None
except serial.serialutil.SerialException as exc:
self.serial = None
_write_warning("Warning: SerialException\n " + str(exc))
except OSError as exc:
self.serial = None
_write_warning("Warning: OSError\n " + str(message))
else:
self.stop(pumps=99)
if verbose:
print("serial.write: <ENQ>")
self.serial.write(self.enq)
ret = self.serial.readline()
if verbose:
print("answer pump:", ret)
self._command("")
self._command("I")
self.stop(99)
self.set_rot_per_min(0.1)
self.stop(99)
self._command("Z")
atexit.register(MasterFlexPumps.stop, self)
self.pumps = list(range(1, nb_pumps + 1))
[docs] def _command(self, command, pumps=None, verbose=False):
"""Send a command to some pumps.
Parameters
----------
command : str
The command that has to be send to the pumps.
pumps : int or array_like
The index of one pump or an array_like containing the
indexes of pumps.
verbose : bool
More verbose If equal to ``True``.
Returns
-------
results : list
The results of the commands.
Notes
-----
The command can be for example '', 'G0', 'I', 'z'...
"""
pumps = self._give_list_pumps(pumps)
results = []
for pump in pumps:
key = f"P{pump:02d}"
line_to_write = self.stx + key + command + self.cr
if verbose:
print("serial.write:", line_to_write)
self.serial.write(line_to_write)
result = self.serial.readline()
if len(result) == 0:
if verbose:
print("no answer.")
elif result[0] == self.ack:
if verbose:
print("answer pump: <ACK> (happy)")
elif result[0] == self.nak:
print("answer pump: <NAK> (unhappy)")
elif verbose:
print("answer pump:", result)
results.append(result)
return results
[docs] def go(self, pumps=None):
"""Start some pumps.
Send the command 'G0' to the pumps.
Parameters
----------
pumps : {None, int, array_like}, optional
The index of one pump or an array_like containing the
indexes of pumps. If None, the function uses self.pumps.
"""
pumps = self._give_list_pumps(pumps)
return self._command("G0", pumps=pumps)
[docs] def set_rot_per_min(self, rots_per_min=0, pumps=None):
"""Set the number of rotations per min for some pumps.
Parameters
----------
rots_per_min : number or array_like
The rotation rate(s) in rotations per mimute.
pumps : {None, int, array_like}, optional
The index of one pump or an array_like containing the
indexes of pumps. If None, the function uses self.pumps.
"""
pumps = self._give_list_pumps(pumps)
rots = rots_per_min
if hasattr(rots, "__iter__"):
if len(rots) != len(pumps):
raise ValueError(
"if hasattr(rots, '__iter__'), "
+ "len(rots) should be equal to len(pumps)"
)
for ir, rot in enumerate(rots):
if rot > self.rot_per_min_max + 0.01:
rots[ir] = self.rot_per_min_max
print(
"rot_per_min for pump {:d} too large,".format(pumps[ir]),
"new rots_per_min:",
rots,
)
elif rot < 0.1:
rots[ir] = 0.1
for ip, pump in enumerate(pumps):
ret = self._command("S{:+07.1f}".format(rots[ip]), pumps=pump)
return ret
else:
if rots > self.rot_per_min_max:
rots = self.rot_per_min_max
print(
"The rotation per min is too large "
+ "and has been limited to the maximum rotation"
)
elif rots < 0.1:
rots = 0.1
return self._command(f"S{rots:+07.1f}", pumps=pumps)
[docs] def stop(self, pumps=None):
"""Stop some pumps.
Parameters
----------
pumps : {None, int, array_like}, optional
The index of one pump or an array_like containing the
indexes of pumps. If None, the function uses self.pumps.
"""
pumps = self._give_list_pumps(pumps)
return self._command("H", pumps=pumps)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.stop()
[docs] def _give_list_pumps(self, pumps=None):
"""Return a list of pump indexes.
Parameters
----------
pumps : {None, int, array_like}, optional
The index of one pump or an array_like containing the
indexes of pumps. If None, the function uses self.pumps.
"""
if pumps is None:
pumps = self.pumps
else:
if not hasattr(pumps, "__iter__"):
pumps = [pumps]
return pumps
[docs] def calibrate(self, pumps=None, nb_mins=4):
"""Calibrate the pumps.
Parameters
----------
pumps : {None, int, array_like}, optional
The index of one pump or an array_like containing the
indexes of pumps. If None, the function uses self.pumps.
nb_mins : {4, number}
Number of minutes for calibrating one pumps.
"""
pumps = self._give_list_pumps(pumps)
for pump in pumps:
self.calibrate_one_pump(pump, nb_mins=nb_mins)
def calibrate_one_pump(self, pump=1, nb_mins=None, rot_per_min=None):
ip = pump - 1
if rot_per_min is None:
rot_per_min = self.rot_per_min_max / 2
flow_rate_approx = (
self.flow_rates_max[ip] * rot_per_min / self.rot_per_min_max
)
print(
f"Calibrate pump {pump:d}:\n"
+ f"{nb_mins} min "
+ "with an approximate flow rate of "
+ f"{flow_rate_approx} ml/min;\n"
+ "approximate volume equal to {} ml.".format(
nb_mins * flow_rate_approx
)
)
if query.query_yes_no("Do you have a large enough tank? Ready?"):
pumps.set_rot_per_min(rot_per_min, pumps=pump)
pumps.go(pumps=pump)
time.sleep(nb_mins * 60)
pumps.stop()
volume = query.query_number("How many ml have been pumped?")
flow_rate = volume / nb_mins
print(
f"Then, flow_rates_max for pump {pump:d} "
+ "should be equal to {:6.1f}.".format(
flow_rate * self.rot_per_min_max / rot_per_min
)
)
[docs] def set_flow_rate(self, flow_rates=0, pumps=None):
"""Set the flow rates.
Parameters
----------
flow_rates : {number, array_like}
flow rates in ml/min.
pumps : {None, int, array_like}, optional
The index of one pump or an array_like containing the
indexes of pumps. If None, the function uses self.pumps.
"""
pumps = self._give_list_pumps(pumps)
ipumps = np.array(pumps) - 1
flow_rates_max = self.flow_rates_max[ipumps]
# print('flow_rates_max', flow_rates_max)
rots_per_min = flow_rates / flow_rates_max * self.rot_per_min_max
# print('rots_per_min', rots_per_min)
self.set_rot_per_min(rots_per_min, pumps=pumps)
[docs] def test_one_pump(self, pump=1, vol_to_pump=2000.0, flow_rate_test=None):
"""Test one pump and print actual maximum flowrate.
Pump with the pump with the index *pump* an approximate volume
*vol_to_pump*. Ask for a measure of the volume actually
pumped. Print the "actual" maximum flowrate for the tested pump
(which can be written in the function __init__).
The "actual" maximum flowrate is the one that which has give
a more accurate result, computed from the volume actually
pumped.
Parameters
----------
pump : {1, int}, optional
A pump index.
vol_to_pump : {2000., number}
The volume to pump.
flow_rate_test : {None, number}, optional
The flow rate used for testing. If None, use 2/3 of the
maximum flow rate for the tested pump.
"""
if flow_rate_test is None:
flow_rate_test = self.flow_rates_max[pump - 1] * 2 / 3
nb_mins_to_pump = vol_to_pump / flow_rate_test
print(
f"\nTest pump {pump:d}:\n"
+ "flow_rates_max = {}\n".format(self.flow_rates_max[pump - 1])
+ f"approximate volume equal to {vol_to_pump} ml,\n"
+ "with an approximate flow rate of "
+ f"{flow_rate_test} ml/min,\n"
+ f"Duration of the test: {nb_mins_to_pump:6.2f} min "
)
pumps.set_flow_rate(flow_rate_test, pumps=pump)
pumps.go(pumps=pump)
time.sleep(nb_mins_to_pump * 60)
pumps.stop(pumps=pump)
vol_measured = query.query_number("How many ml have been pumped?")
flow_rate_measured = vol_measured / nb_mins_to_pump
flow_rate_new = (
self.flow_rates_max[pump - 1] * flow_rate_measured / flow_rate_test
)
print(
f"Then flow_rates_max for pump {pump:d} "
+ f"should be equal to {flow_rate_new:5.0f}."
)
ok = query.query_yes_no(
(
"Do you want to update the calibration file and \n"
"the Pumps object with this value?"
),
default="no",
)
if ok:
modif_calib_file(pump, flow_rate_new)
if __name__ == "__main__":
pumps = MasterFlexPumps(nb_pumps=2)
# pumps.set_flow_rate(1000, pumps=[1])
# pumps.go()
# time.sleep(20)
# pumps.stop()
# pumps.calibrate()
pumps.test_one_pump(2, vol_to_pump=400 * 4, flow_rate_test=400)