diff --git a/environment.yml b/environment.yml index 7c2d3b56..ae353bb0 100644 --- a/environment.yml +++ b/environment.yml @@ -82,7 +82,8 @@ dependencies: # --- Bluesky framework packages - adl2pydm - - apstools >=1.6.16 + # apstools is installed from github by pip until DG645 and SRS570 settling are released + # - apstools >=1.6.16 - area-detector-handlers - bluesky-queueserver - bluesky-queueserver-api @@ -92,6 +93,7 @@ dependencies: # - happi # - hklpy >=1.0.3 # --- linux-64 - ophyd >=1.6.3 + # pcdsdevices is installed from github by pip until multiderived_settling fix hits pyPI - pcdsdevices # For extra signal types - pydm >=1.18.0 - typhos @@ -145,6 +147,7 @@ dependencies: - ophyd-registry >= 0.7 - xraydb >=4.5.0 - pytest-timeout # Get rid of this if tests are not hanging + - git+https://github.com/pcdshub/pcdsdevices # - https://github.com/BCDA-APS/adl2pydm/archive/main.zip # --- optional Bluesky framework packages for evaluation # - bluesky-webclient is NOT Python software, don't install it this way diff --git a/src/conftest.py b/src/conftest.py index 4d803a44..142c2611 100644 --- a/src/conftest.py +++ b/src/conftest.py @@ -7,6 +7,7 @@ # from pydm.data_plugins import plugin_modules, add_plugin import pytest +from apstools.devices.srs570_preamplifier import GainSignal from ophyd import DynamicDeviceComponent as DDC from ophyd import Kind from ophyd.sim import ( @@ -58,7 +59,10 @@ def __init__(self, prefix, **kwargs): super().__init__(f"{prefix}I", write_pv=f"{prefix}O", **kwargs) +# Ophyd uses a cache of signals and their corresponding fakes +# We need to add ours in so they get simulated properly. fake_device_cache[EpicsSignalWithIO] = FakeEpicsSignalWithIO +fake_device_cache[GainSignal] = FakeEpicsSignal @pytest.fixture() @@ -103,6 +107,14 @@ def sim_ion_chamber(sim_registry): prefix="scaler_ioc", name="I00", labels={"ion_chambers"}, ch_num=2 ) sim_registry.register(ion_chamber) + # Set metadata + preamp = ion_chamber.preamp + preamp.sensitivity_value._enum_strs = tuple(preamp.values) + preamp.sensitivity_unit._enum_strs = tuple(preamp.units) + preamp.offset_value._enum_strs = tuple(preamp.values) + preamp.offset_unit._enum_strs = tuple(preamp.offset_units) + preamp.gain_mode._enum_strs = ("LOW NOISE", "HIGH BW", "LOW DRIFT") + preamp.gain_mode.set("LOW NOISE").wait(timeout=3) return ion_chamber diff --git a/src/firefly/application.py b/src/firefly/application.py index f49e8fd2..fe8cea35 100644 --- a/src/firefly/application.py +++ b/src/firefly/application.py @@ -616,7 +616,7 @@ def show_count_plan_window(self): @QtCore.Slot() def show_voltmeters_window(self): return self.show_window( - FireflyMainWindow, ui_dir / "voltmeters.py", name="voltmeters" + PlanMainWindow, ui_dir / "voltmeters.py", name="voltmeters" ) @QtCore.Slot() diff --git a/src/haven/__init__.py b/src/haven/__init__.py index 806f3414..1868f26d 100644 --- a/src/haven/__init__.py +++ b/src/haven/__init__.py @@ -20,7 +20,7 @@ ) from .instrument.device import RegexComponent # noqa: F401 from .instrument.dxp import load_dxp # noqa: F401 -from .instrument.load_instrument import load_instrument # noqa: F401 +from .instrument.load_instrument import aload_instrument, load_instrument # noqa: F401 from .instrument.motor import HavenMotor # noqa: F401 from .instrument.xspress import load_xspress # noqa: F401 from .motor_position import ( # noqa: F401 diff --git a/src/haven/instrument/ion_chamber.py b/src/haven/instrument/ion_chamber.py index 97290a26..10bed3d1 100644 --- a/src/haven/instrument/ion_chamber.py +++ b/src/haven/instrument/ion_chamber.py @@ -6,12 +6,16 @@ import time import warnings from collections import OrderedDict -from typing import Dict, Generator +from typing import Dict, Generator, Optional import numpy as np import pint from aioca import caget from apstools.devices import SRS570_PreAmplifier +from apstools.devices.srs570_preamplifier import ( + SRS570_PreAmplifier, + calculate_settle_time, +) from ophyd import Component as Cpt from ophyd import Device, EpicsSignal, EpicsSignalRO from ophyd import FormattedComponent as FCpt @@ -80,6 +84,32 @@ class Voltmeter(AnalogInput): volts = Cpt(EpicsSignal, ".VAL", kind="normal") +class GainDerivedSignal(MultiDerivedSignal): + """A gain level signal that incorporates dynamic settling time.""" + + def set( + self, + value: OphydDataType, + *, + timeout: Optional[float] = None, + settle_time: Optional[float] = "auto", + ): + # Calculate an auto settling time + if settle_time == "auto": + # Determine the new values that will be set + to_write = self.calculate_on_put(mds=self, value=value) or {} + # Calculate the correct settling time + settle_time_ = calculate_settle_time( + gain_value=to_write[self.parent.sensitivity_value], + gain_unit=to_write[self.parent.sensitivity_unit], + gain_mode=self.parent.gain_mode.get(), + ) + else: + settle_time_ = settle_time + # Call the actual set method to move the gain + return super().set(value, timeout=timeout, settle_time=settle_time_) + + class IonChamberPreAmplifier(SRS570_PreAmplifier): """An SRS-570 pre-amplifier driven by an ion chamber. @@ -96,6 +126,7 @@ class IonChamberPreAmplifier(SRS570_PreAmplifier): values = ["1", "2", "5", "10", "20", "50", "100", "200", "500"] units = ["pA/V", "nA/V", "uA/V", "mA/V"] + offset_units = [s.split("/")[0] for s in units] offset_difference = -3 # How many levels higher should the offset be def __init__(self, *args, **kwargs): @@ -118,20 +149,24 @@ def computed_gain(self): """ Amplifier gain (V/A), as floating-point number. """ - val = float(self.values[self.sensitivity_value.get()]) + # Convert the sensitivity to a proper number + val_idx = int(self.sensitivity_value.get(as_string=False)) + val = float(self.values[val_idx]) + # Determine multiplier based on the gain unit amps = [ 1e-12, # pA 1e-9, # nA - 1e-6, # µA + 1e-6, # μA 1e-3, # mA ] - multiplier = amps[self.sensitivity_unit.get()] + unit_idx = int(self.sensitivity_unit.get(as_string=False)) + multiplier = amps[unit_idx] inverse_gain = val * multiplier return 1 / inverse_gain def update_sensitivity_text(self, *args, obj: OphydObject, **kwargs): - val = self.values[self.sensitivity_value.get()] - unit = self.units[self.sensitivity_unit.get()] + val = self.values[int(self.sensitivity_value.get(as_string=False))] + unit = self.units[int(self.sensitivity_unit.get(as_string=False))] text = f"{val} {unit}" self.sensitivity_text.put(text, internal=True) @@ -139,12 +174,12 @@ def _level_to_value(self, level): return level % len(self.values) def _level_to_unit(self, level): - return int(level / len(self.values)) + return self.units[int(level / len(self.values))] def _get_gain_level(self, mds: MultiDerivedSignal, items: SignalToValue) -> int: - "Given a sensitivity value and unit, transform to the desired gain level." - value = items[self.sensitivity_value] - unit = items[self.sensitivity_unit] + "Given a sensitivity value and unit , transform to the desired level." + value = self.values.index(items[self.sensitivity_value]) + unit = self.units.index(items[self.sensitivity_unit]) # Determine sensitivity level new_level = value + unit * len(self.values) # Convert to gain by inverting @@ -173,14 +208,13 @@ def _put_gain_level( raise exceptions.GainOverflow(msg) # Return calculated gain and offset offset_value = self.values[self._level_to_value(new_offset)] - offset_unit = self.units[self._level_to_unit(new_offset)].split("/")[0] + offset_unit = self._level_to_unit(new_offset).split("/")[0] result = OrderedDict() result.update({self.sensitivity_unit: self._level_to_unit(new_level)}) result.update({self.sensitivity_value: self._level_to_value(new_level)}) result.update({self.offset_value: offset_value}) result.update({self.offset_unit: offset_unit}) - # # set_all=1, - # } + # result[self.set_all] = 1 return result def _get_offset_current( @@ -200,13 +234,15 @@ def _get_offset_current( return 0 return current - # It's easier to calculate gains by enum index, so override the apstools signals - sensitivity_value = Cpt(EpicsSignal, "sens_num", kind="config", string=False) - sensitivity_unit = Cpt(EpicsSignal, "sens_unit", kind="config", string=False) - gain_level = Cpt( - MultiDerivedSignal, - attrs=["sensitivity_value", "sensitivity_unit", "offset_value", "offset_unit"], + GainDerivedSignal, + attrs=[ + "sensitivity_value", + "sensitivity_unit", + "offset_value", + "offset_unit", + "set_all", + ], calculate_on_get=_get_gain_level, calculate_on_put=_put_gain_level, kind=Kind.omitted, @@ -586,7 +622,6 @@ async def load_ion_chamber( ic_idx = ch_num - 2 # 5 pre-amps per labjack lj_num = int(ic_idx / 5) - # Only use even labjack channels since it's a differential signal lj_chan = ic_idx % 5 # Only use this ion chamber if it has a name try: diff --git a/src/haven/instrument/load_instrument.py b/src/haven/instrument/load_instrument.py index 54f43078..357bff51 100644 --- a/src/haven/instrument/load_instrument.py +++ b/src/haven/instrument/load_instrument.py @@ -31,7 +31,9 @@ async def aload_instrument( - registry: InstrumentRegistry = default_registry, config: Mapping = None + registry: InstrumentRegistry = default_registry, + config: Mapping = None, + return_devices: bool = False, ): """Asynchronously load the beamline instrumentation into an instrument registry. @@ -48,9 +50,19 @@ async def aload_instrument( The registry into which the ophyd devices will be placed. config: The beamline configuration read in from TOML files. Mostly - useful for testing. + useful for testing. + return_devices + If true, return the newly loaded devices when complete. """ + # Clear out any existing registry entries + registry.clear() + # Make sure we have the most up-to-date configuration + # load_config.cache_clear() + # Load the configuration + if config is None: + config = load_config() + # Load devices concurrently coros = ( *load_camera_coros(config=config), *load_shutter_coros(config=config), @@ -77,7 +89,12 @@ async def aload_instrument( # motors in the registry extra_motors = await asyncio.gather(*load_all_motor_coros(config=config)) devices.extend(extra_motors) - return devices + # Also import some simulated devices for testing + devices += load_simulated_devices(config=config) + # Filter out devices that couldn't be reached + devices = [d for d in devices if d is not None] + if return_devices: + return devices def load_instrument( @@ -93,6 +110,10 @@ def load_instrument( configuration, it will create Ophyd devices and register them with *registry*. + This function starts the asyncio event loop. If one is already + running (e.g. jupyter notebook), then use ``await + aload_instrument()`` instead. + Parameters ========== registry: @@ -104,23 +125,11 @@ def load_instrument( If true, return the newly loaded devices when complete. """ - # Clear out any existing registry entries - registry.clear() - # Make sure we have the most up-to-date configuration - # load_config.cache_clear() - # Load the configuration - if config is None: - config = load_config() # Import devices concurrently loop = asyncio.get_event_loop() - devices = loop.run_until_complete( - aload_instrument(registry=registry, config=config) - ) - # Also import some simulated devices for testing - devices += load_simulated_devices(config=config) - # Filter out devices that couldn't be reached + coro = aload_instrument(registry=registry, config=config) + devices = loop.run_until_complete(coro) if return_devices: - devices = [d for d in devices if d is not None] return devices diff --git a/src/haven/plans/auto_gain.py b/src/haven/plans/auto_gain.py index a186afbb..26dfb589 100644 --- a/src/haven/plans/auto_gain.py +++ b/src/haven/plans/auto_gain.py @@ -1,6 +1,7 @@ from queue import Queue import numpy as np +import pandas as pd from bluesky_adaptive.per_event import adaptive_plan, recommender_factory from bluesky_adaptive.recommendations import NoRecommendation @@ -12,70 +13,117 @@ class GainRecommender: """A recommendation engine for finding the best ion chamber gain*. - Responds to ion chamber voltage as the dependent variable but + Responds to ion chamber voltage as the dependent variable by changing the gain. If the voltage is above *volts_max* then the - next gain* level will be one higher, and if the voltage is below - *volts_min* then the next gain* level will be one lower. + next gain* level will be higher, and if the voltage is below + *volts_min* then the next gain* level will be lower. This engine + will find all the values between *volts_min* and *volts_max*, + assuming the output of the pre-amp changes monotonically with + gain. """ volts_min: float volts_max: float + target_volts: float + gain_min: int = 0 gain_max: int = 27 last_point: np.ndarray = None + dfs: list = None + big_step: int = 3 - def __init__(self, volts_min: float = 0.5, volts_max: float = 4.5): + def __init__( + self, volts_min: float = 0.5, volts_max: float = 4.5, target_volts: float = 2.5 + ): self.volts_min = volts_min self.volts_max = volts_max + self.target_volts = target_volts def tell(self, gains, volts): - new_gains = np.copy(gains) - # Adjust new_gains for devices that are out of range, with hysteresis - if self.last_point is None: - is_hysteretical = np.full_like(gains, True, dtype=bool) - else: - is_hysteretical = gains < self.last_point self.last_point = gains - is_low = volts < self.volts_min - new_gains[np.logical_or(is_low, is_hysteretical)] -= 1 - is_high = volts > self.volts_max - new_gains[is_high] += 1 - # Ensure we're within the bounds for gain values - new_gains[new_gains < 0] = 0 - new_gains[new_gains > self.gain_max] = self.gain_max - # Check whether we need to move to a new point of not - if np.logical_or(is_low, is_high).any(): - self.next_point = new_gains - else: - self.next_point = None + # Check if dataframes exist yet + if self.dfs is None: + self.dfs = [pd.DataFrame() for i in gains] + # Add this measurement to the dataframes + self.dfs = [ + pd.concat( + [df, pd.DataFrame(data={"gain": [gain], "volts": [volt]})], + ignore_index=True, + ) + for gain, volt, df in zip(gains, volts, self.dfs) + ] def tell_many(self, xs, ys): for x, y in zip(xs, ys): self.tell(x, y) def ask(self, n, tell_pending=True): + """Figure out the next gain point based on the past ones we've measured.""" if n != 1: raise NotImplementedError - if self.next_point is None: + # Get the gains to try next + next_point = [self.next_gain(df) for df in self.dfs] + if np.array_equal(next_point, self.last_point): + # We've already found the best point, so end the scan raise NoRecommendation - return self.next_point + return next_point + + def next_gain(self, df: pd.DataFrame): + """Determine the next gain for this preamp based on past measurements. + + Parameters + ========== + df + The past measurements for this preamp. Expected to have + columns ["gain", "volts"]. + + """ + # We're too low, so go up in gain + if np.all(df.volts < self.volts_max): + # Determine step size + step = self.big_step if np.all(df.volts < self.volts_min) else 1 + # Determine next gain to use + new_gain = df.gain.max() + step + return np.min([new_gain, self.gain_max]) + # We're too high, so go down in gain + if np.all(df.volts > self.volts_min): + step = self.big_step if np.all(df.volts > self.volts_max) else 1 + new_gain = df.gain.min() - step + return np.max([new_gain, self.gain_min]) + # Fill in any missing values through the correct gain + values_in_range = df[(df.volts < self.volts_max) & (df.volts > self.volts_min)] + needed_gains = np.arange( + df[df.volts < self.volts_min].gain.max() + 1, + df[df.volts > self.volts_max].gain.max(), + ) + missing_gains = [ + gain for gain in needed_gains if gain not in values_in_range.gain + ] + if len(missing_gains) > 0: + return max(missing_gains) + # We have all the data we need, now decide on the best gain to use + if len(values_in_range) > 0: + good_vals = values_in_range + else: + good_vals = df + best = good_vals.iloc[(good_vals.volts - self.target_volts).abs().argmin()] + return best.gain def auto_gain( dets="ion_chambers", volts_min: float = 0.5, volts_max: float = 4.5, + prefer: str = "middle", max_count: int = 28, queue: Queue = None, ): """An adaptive Bluesky plan for optimizing pre-amp gains. - At each step, the plan will measure the pre-amp voltage via the - scaler. If the measured voltage for a pre-amp is outside the range - (*volts_min*, *volts_max*), then the gain for the next step will - be adjusted by one level. Once all detectors are within the - optimal range (or *max_count* iterations have been done), the scan - will end. + For each detector, the plan will search for the range of gains + within which the pre-amp output is between *volts_min* and + *volts_max*, and select the gain that produces a voltage closest + to the mid-point between *volts_min* and *volts_max*. Parameters ========== @@ -86,6 +134,9 @@ def auto_gain( The minimum acceptable range for each ion chamber's voltage. volts_max The maximum acceptable range for each ion chamber's voltage. + prefer + Whether to shoot for the "lower", "middle" (default), or "upper" + portion of the voltage range. max_count The scan will end after *max_count* iterations even if an optimal gain has not been found for all pre-amps. @@ -96,8 +147,21 @@ def auto_gain( """ # Resolve the detector list into real devices dets = registry.findall(dets) - # Prepare the recommendation enginer - recommender = GainRecommender(volts_min=volts_min, volts_max=volts_max) + # Prepare the recommendation engine + targets = { + "lower": volts_min, + "middle": (volts_min + volts_max) / 2, + "upper": volts_max, + } + try: + target = targets[prefer] + except KeyError: + raise ValueError( + f"Invalid value for *prefer* {prefer}. Choices are 'lower', 'middle', or 'upper'." + ) + recommender = GainRecommender( + volts_min=volts_min, volts_max=volts_max, target_volts=target + ) ind_keys = [det.preamp.gain_level.name for det in dets] dep_keys = [det.volts.name for det in dets] rr, queue = recommender_factory( diff --git a/src/haven/tests/test_auto_gain_plan.py b/src/haven/tests/test_auto_gain_plan.py index d3bdb728..91305c23 100644 --- a/src/haven/tests/test_auto_gain_plan.py +++ b/src/haven/tests/test_auto_gain_plan.py @@ -1,10 +1,12 @@ from queue import Queue +from unittest.mock import MagicMock import numpy as np import pytest from bluesky_adaptive.recommendations import NoRecommendation from haven import GainRecommender, auto_gain +from haven.plans import auto_gain as auto_gain_module def test_plan_recommendations(sim_ion_chamber): @@ -26,6 +28,24 @@ def test_plan_recommendations(sim_ion_chamber): assert len(trigger_msgs) == 2 +@pytest.mark.parametrize( + "prefer,target_volts", + [("middle", 2.5), ("lower", 0.5), ("upper", 4.5)], +) +def test_plan_prefer_arg(sim_ion_chamber, monkeypatch, prefer, target_volts): + """Check that the *prefer* argument works properly.""" + sim_ion_chamber.preamp.gain_level.set(18).wait() + queue = Queue() + queue.put({sim_ion_chamber.preamp.gain_level.name: 17}) + queue.put(None) + monkeypatch.setattr(auto_gain_module, "GainRecommender", MagicMock()) + plan = auto_gain(dets=[sim_ion_chamber], queue=queue, prefer=prefer) + msgs = list(plan) + auto_gain_module.GainRecommender.assert_called_with( + volts_min=0.5, volts_max=4.5, target_volts=target_volts + ) + + @pytest.fixture() def recommender(): recc = GainRecommender() @@ -33,31 +53,42 @@ def recommender(): def test_recommender_all_low(recommender): + """Does the engine get the right direction when all pre-amp gains are + too low? + + """ # Gain sensitivity levels gains = np.asarray([[10, 13]]) assert gains.shape == (1, 2) # Corresponding volts volts = np.asarray([[0.1, 0.25]]) assert volts.shape == (1, 2) - # Check recommendations + # Check recommendations are lower by 3 recommender.tell_many(gains, volts) - np.testing.assert_equal(recommender.ask(1), (9, 12)) + np.testing.assert_equal(recommender.ask(1), (13, 16)) def test_recommender_some_low(recommender): + """Does the engine get the direction right when some pre-amp gains are + too low and some are just right? + + """ # Gain sensitivity levels gains = np.asarray([[10, 13]]) assert gains.shape == (1, 2) - recommender.last_point = np.asarray([10, 13]) # Corresponding volts volts = np.asarray([[0.1, 2.5]]) assert volts.shape == (1, 2) # Check recommendations recommender.tell_many(gains, volts) - np.testing.assert_equal(recommender.ask(1), (9, 13)) + np.testing.assert_equal(recommender.ask(1), (13, 14)) def test_recommender_all_high(recommender): + """Does the engine get the direction correct when all pre-amp gains + are too high? + + """ # Gain sensitivity levels gains = np.asarray([[10, 13]]) assert gains.shape == (1, 2) @@ -67,58 +98,142 @@ def test_recommender_all_high(recommender): assert volts.shape == (1, 2) # Check recommendations recommender.tell_many(gains, volts) - np.testing.assert_equal(recommender.ask(1), (11, 14)) + np.testing.assert_equal(recommender.ask(1), (7, 10)) def test_recommender_some_high(recommender): # Gain sensitivity levels gains = np.asarray([[10, 13]]) assert gains.shape == (1, 2) - recommender.last_point = np.asarray([10, 13]) # Corresponding volts volts = np.asarray([[5.7, 2.5]]) assert volts.shape == (1, 2) # Check recommendations recommender.tell_many(gains, volts) - np.testing.assert_equal(recommender.ask(1), (11, 13)) + np.testing.assert_equal(recommender.ask(1), (7, 14)) def test_recommender_high_and_low(recommender): # Gain sensitivity levels gains = np.asarray([[10, 13]]) assert gains.shape == (1, 2) - recommender.last_point = np.asarray([10, 13]) # Corresponding volts volts = np.asarray([[5.7, 0.23]]) assert volts.shape == (1, 2) # Check recommendations recommender.tell_many(gains, volts) - np.testing.assert_equal(recommender.ask(1), (11, 12)) + np.testing.assert_equal(recommender.ask(1), (7, 16)) + + +def test_recommender_fill_missing_gains(recommender): + """If we have both high and low gains already done, will the + recommender fill in the missing pieces.""" + gains = [ + [9], + [12], + [10], + ] + volts = [ + [0.3], + [5.2], + [1.5], + ] + recommender.tell_many(gains, volts) + # Does it recommend the missing gain value? + df = recommender.dfs[0] + assert recommender.next_gain(df) == 11 -def test_recommender_no_change(recommender): - # Gain sensitivity levels - gains = np.asarray([[10, 13]]) - assert gains.shape == (1, 2) - # Corresponding volts - volts = np.asarray([[4.1, 2.23]]) - assert volts.shape == (1, 2) - # Check recommendations +def test_recommender_no_high(recommender): + """If the gain happens to be in range first try, we need to get + more data to be sure we have the best point. + + """ + gains = [[8], [9], [10]] + volts = [[0.3], [1.25], [2.7]] recommender.tell_many(gains, volts) - with pytest.raises(NoRecommendation): - recommender.ask(1) + df = recommender.dfs[0] + assert recommender.next_gain(df) == 11 -def test_recommender_hysteresis(recommender): - """Test that we can avoid getting caught at the bottom of the voltage - range.""" - # Gain sensitivity levels - gains = np.asarray([[10, 13]]) - assert gains.shape == (1, 2) - # Corresponding volts - volts = np.asarray([[0.1, 2.3]]) - recommender.last_point = np.asarray([10, 14]) - assert volts.shape == (1, 2) - # Check recommendations, second gain should still go up +def test_recommender_no_low(recommender): + """If the gain happens to be in range first try, we need to get + more data to be sure we have the best point. + + """ + gains = [[8], [9], [10]] + volts = [[1.25], [2.75], [5.4]] recommender.tell_many(gains, volts) - np.testing.assert_equal(recommender.ask(1), (9, 12)) + df = recommender.dfs[0] + assert recommender.next_gain(df) == 7 + + +def test_recommender_no_solution(recommender): + """If the gain profile goes from too low to too high in one step, what should we report?""" + gains = [[9], [10]] + volts = [[0.3], [5.2]] + recommender.tell_many(gains, volts) + # Does it recommend the missing gain value? + df = recommender.dfs[0] + assert recommender.next_gain(df) == 9 + + +@pytest.mark.parametrize("target_volts,gain", [(0.5, 10), (2.5, 9), (4.5, 8)]) +def test_recommender_correct_solution(target_volts, gain): + """If the gain profile goes from too low to too high in one step, what should we report?""" + recommender = GainRecommender(target_volts=target_volts) + gains = [[7], [8], [9], [10], [11]] + volts = [[5.2], [4.1], [2.7], [1.25], [0.4]] + recommender.tell_many(gains, volts) + # Does it recommend the missing gain value? + df = recommender.dfs[0] + assert recommender.next_gain(df) == gain + + +def test_recommender_gain_range_high(recommender): + """Check that the recommender doesn't go outside the allowed range.""" + gains = [[27]] + volts = [[0.3]] + recommender.tell_many(gains, volts) + # Does it recommend the missing gain value? + df = recommender.dfs[0] + assert recommender.next_gain(df) == 27 + + +def test_recommender_gain_range_low(recommender): + """Check that the recommender doesn't go outside the allowed range.""" + gains = [[0]] + volts = [[5.4]] + recommender.tell_many(gains, volts) + # Does it recommend the missing gain value? + df = recommender.dfs[0] + assert recommender.next_gain(df) == 0 + + +def test_recommender_no_change(recommender): + """If we've already found the best solution, we should stop the scan.""" + # Set a history of gain measurements + gains = np.asarray( + [ + [10, 13], + [11, 11], + [12, 14], + [13, 12], + ] + ) + volts = np.asarray( + [ + [7.10, 2.23], + [4.30, 5.30], + [2.10, 0.10], + [0.20, 4.33], + ] + ) + recommender.tell_many(gains, volts) + # Make sure the engine recommends the best solution + assert recommender.ask(1) == [12, 13] + # Pass in results for this new measurement + recommender.tell_many([[12, 13]], [[2.13, 2.19]]) + # Check recommendations + with pytest.raises(NoRecommendation): + recommender.ask(1) diff --git a/src/haven/tests/test_ion_chamber.py b/src/haven/tests/test_ion_chamber.py index 5089e5be..388d94e2 100644 --- a/src/haven/tests/test_ion_chamber.py +++ b/src/haven/tests/test_ion_chamber.py @@ -1,4 +1,5 @@ import time +from unittest import mock import numpy as np import pytest @@ -6,36 +7,55 @@ from haven.instrument import ion_chamber -def test_gain_level(sim_ion_chamber): +@pytest.fixture() +def preamp(sim_ion_chamber): preamp = sim_ion_chamber.preamp - assert isinstance(preamp.sensitivity_value.get(use_monitor=False), int) - assert isinstance(preamp.sensitivity_unit.get(use_monitor=False), int) + return preamp + + +def test_get_gain_level(preamp): # Change the preamp settings - preamp.sensitivity_value.put(4), # 20 uA/V - preamp.sensitivity_unit.put(2), + preamp.sensitivity_value.put("20") + assert preamp.sensitivity_value.get(as_string=True) == "20" + assert preamp.sensitivity_value.get(as_string=False) == 4 + preamp.sensitivity_unit.put("uA/V"), preamp.offset_value.put(1), # 2 uA/V preamp.offset_unit.put(2), # Check that the gain level moved assert preamp.gain_level.get(use_monitor=False) == 5 + + +def test_put_gain_level(preamp): # Move the gain level preamp.gain_level.set(15).wait(timeout=3) # Check that the preamp sensitivities are moved - assert preamp.sensitivity_value.get(use_monitor=False) == 3 # 10 nA/V - assert preamp.sensitivity_unit.get(use_monitor=False) == 1 + assert preamp.sensitivity_value.get(use_monitor=False) == "10" + assert preamp.sensitivity_unit.get(use_monitor=False) == "nA/V" # Check that the preamp sensitivity offsets are moved - assert preamp.offset_value.get(use_monitor=False) == "1" # 1 nA/V + assert preamp.offset_value.get(use_monitor=False) == "1" assert preamp.offset_unit.get(use_monitor=False) == "nA" -def test_gain_signals(sim_ion_chamber): - preamp = sim_ion_chamber.preamp - assert isinstance(preamp.sensitivity_value.get(use_monitor=False), int) - assert isinstance(preamp.sensitivity_unit.get(use_monitor=False), int) +def test_gain_level_settling(preamp, monkeypatch): + # Make it really low to start + preamp.gain_level.set(0).wait(timeout=3) + preamp.gain_mode.set("LOW NOISE").wait(timeout=3) + # Set up patches to watch the real signals getting set + monkeypatch.setattr(preamp.sensitivity_value, "set", mock.MagicMock()) + monkeypatch.setattr(preamp.sensitivity_unit, "set", mock.MagicMock()) + # Now make the gain high so we can check the settle time + preamp.gain_level.set(27) + # Check that the right settle time was used + preamp.sensitivity_value.set.assert_called_with(0, timeout=None, settle_time=3) + preamp.sensitivity_unit.set.assert_called_with("pA/V", timeout=None, settle_time=3) + + +def test_gain_signals(preamp): # Change the preamp settings - preamp.sensitivity_value.put(4) # 20 uA/V - preamp.sensitivity_unit.put(2) - preamp.offset_value.put(1) # 2 uA/V - preamp.offset_unit.put(2) + preamp.sensitivity_value.put("20") + preamp.sensitivity_unit.put("uA/V") + preamp.offset_value.put("2") + preamp.offset_unit.put("uA") # Check the gain and gain_db signals assert preamp.gain.get(use_monitor=False) == pytest.approx(1 / 20e-6) assert preamp.gain_db.get(use_monitor=False) == pytest.approx(46.9897)