From 6c94d5ece89ffd57a1a0313ace454768177090ce Mon Sep 17 00:00:00 2001 From: Emily Arnold Date: Tue, 11 Nov 2025 14:08:49 +0000 Subject: [PATCH 1/7] Wrap plans and plan stubs, write tests --- src/dodal/plan_stubs/__init__.py | 22 +- src/dodal/plan_stubs/wrapped.py | 26 +- src/dodal/plans/__init__.py | 21 +- src/dodal/plans/wrapped.py | 343 ++++++++++++- system_tests/test_adsim.py | 12 +- tests/plan_stubs/test_wrapped_stubs.py | 10 + tests/plans/conftest.py | 7 + tests/plans/test_wrapped.py | 661 ++++++++++++++++++++++++- 8 files changed, 1081 insertions(+), 21 deletions(-) diff --git a/src/dodal/plan_stubs/__init__.py b/src/dodal/plan_stubs/__init__.py index 2c84ded8654..25f7a5e97c4 100644 --- a/src/dodal/plan_stubs/__init__.py +++ b/src/dodal/plan_stubs/__init__.py @@ -1,3 +1,21 @@ -from .wrapped import move, move_relative, set_absolute, set_relative, sleep, wait +from .wrapped import ( + move, + move_relative, + rd, + set_absolute, + set_relative, + sleep, + stop, + wait, +) -__all__ = ["move", "move_relative", "set_absolute", "set_relative", "sleep", "wait"] +__all__ = [ + "move", + "move_relative", + "rd", + "set_absolute", + "set_relative", + "sleep", + "stop", + "wait", +] diff --git a/src/dodal/plan_stubs/wrapped.py b/src/dodal/plan_stubs/wrapped.py index 43555dbbc16..1d134c2fb7c 100644 --- a/src/dodal/plan_stubs/wrapped.py +++ b/src/dodal/plan_stubs/wrapped.py @@ -3,7 +3,7 @@ from typing import Annotated, TypeVar import bluesky.plan_stubs as bps -from bluesky.protocols import Movable +from bluesky.protocols import Movable, Readable, Stoppable from bluesky.utils import MsgGenerator """ @@ -146,3 +146,27 @@ def wait( """ return (yield from bps.wait(group, timeout=timeout)) + + +def rd(readable: Readable) -> MsgGenerator: + """Reads a single-value non-triggered object, wrapper for `bp.rd`. + + Args: + readable (Readable): The device to be read + + Returns: + Iterator[MsgGenerator]: Bluesky messages + """ + return (yield from bps.rd(readable)) + + +def stop(stoppable: Stoppable) -> MsgGenerator: + """Stop a device, wrapper for `bp.stop`. + + Args: + stoppable (Stoppable): Device to be stopped + + Returns: + Iterator[MsgGenerator]: Bluesky messages + """ + return (yield from bps.stop(stoppable)) diff --git a/src/dodal/plans/__init__.py b/src/dodal/plans/__init__.py index 645cf1709d0..0197102b824 100644 --- a/src/dodal/plans/__init__.py +++ b/src/dodal/plans/__init__.py @@ -1,4 +1,21 @@ from .spec_path import spec_scan -from .wrapped import count +from .wrapped import ( + count, + list_rscan, + list_scan, + num_rscan, + num_scan, + step_rscan, + step_scan, +) -__all__ = ["count", "spec_scan"] +__all__ = [ + "count", + "list_rscan", + "list_scan", + "num_rscan", + "num_scan", + "spec_scan", + "step_rscan", + "step_scan", +] diff --git a/src/dodal/plans/wrapped.py b/src/dodal/plans/wrapped.py index 48875c52353..7a9cbb4a07b 100644 --- a/src/dodal/plans/wrapped.py +++ b/src/dodal/plans/wrapped.py @@ -1,11 +1,15 @@ from collections.abc import Sequence +from decimal import Decimal from typing import Annotated, Any import bluesky.plans as bp -from bluesky.protocols import Readable +import numpy as np +from bluesky.protocols import Movable, Readable +from ophyd_async.core import AsyncReadable from pydantic import Field, NonNegativeFloat, validate_call from dodal.common import MsgGenerator +from dodal.devices.motors import Motor from dodal.plan_stubs.data_session import attach_data_session_metadata_decorator """This module wraps plan(s) from bluesky.plans until required handling for them is @@ -27,7 +31,7 @@ @validate_call(config={"arbitrary_types_allowed": True}) def count( detectors: Annotated[ - set[Readable], + Sequence[Readable | AsyncReadable], Field( description="Set of readable devices, will take a reading at each point", min_length=1, @@ -55,3 +59,338 @@ def count( metadata = metadata or {} metadata["shape"] = (num,) yield from bp.count(tuple(detectors), num, delay=delay, md=metadata) + + +def _make_num_scan_args(params: dict[Movable | Motor, list[float | int]]): + shape = [] + for param in params: + if len(params[param]) == 3: + shape.append(params[param][-1]) + args = [] + for param, movable_num in zip(params, range(len(params)), strict=True): + args.append(param) + if movable_num == 0 and len(shape) == 1: + params[param].pop() + args.extend(params[param]) + return args, shape + + +@attach_data_session_metadata_decorator() +@validate_call(config={"arbitrary_types_allowed": True}) +def num_scan( + detectors: Annotated[ + Sequence[Readable | AsyncReadable], + Field( + description="Set of readable devices, will take a reading at each point", + min_length=1, + ), + ], + params: Annotated[ + dict[Movable | Motor, list[float | int]], + Field( + description="Dictionary of 'device: paramater' keys. For concurrent " + "trajectories, provide '{movable1: [start1, stop1, num], movable2: [start2," + "stop2], ... , movableN: [startN, stopN]}'. For independent trajectories," + "provide '{movable1: [start1, stop1, num1], ... , movableN: [startN, stopN," + "numN]}'." + ), + ], + snake_axes: list | bool | None = None, + metadata: dict[str, Any] | None = None, +) -> MsgGenerator: + """Scan over concurrent or independent multi-motor trajectories. + Wraps bluesky.plans.scan(det, *args, num, md=metadata) and + bluesky.plans.grid_scan(det, *args, md=metadata)""" + args, shape = _make_num_scan_args(params) + metadata = metadata or {} + metadata["shape"] = shape + + if len(shape) == 1: + yield from bp.scan(tuple(detectors), *args, num=shape[0], md=metadata) + elif len(shape) > 1: + yield from bp.grid_scan( + tuple(detectors), *args, snake_axes=snake_axes, md=metadata + ) + + +@attach_data_session_metadata_decorator() +@validate_call(config={"arbitrary_types_allowed": True}) +def num_rscan( + detectors: Annotated[ + Sequence[Readable | AsyncReadable], + Field( + description="Set of readable devices, will take a reading at each point", + min_length=1, + ), + ], + params: Annotated[ + dict[Movable | Motor, list[float | int]], + Field( + description="Dictionary of 'device: paramater' keys. For concurrent " + "trajectories, provide '{movable1: [start1, stop1, num], movable2: [start2," + "stop2], ... , movableN: [startN, stopN]}'. For independent trajectories," + "provide '{movable1: [start1, stop1, num1], ... , movableN: [startN, stopN," + "numN]}'." + ), + ], + snake_axes: list | bool | None = None, + metadata: dict[str, Any] | None = None, +) -> MsgGenerator: + """Scan over concurrent or independent trajectories relative to current position. + Wraps bluesky.plans.rel_scan(det, *args, num, md=metadata) and + bluesky.plans.rel_grid_scan(det, *args, md=metadata)""" + args, shape = _make_num_scan_args(params) + metadata = metadata or {} + metadata["shape"] = shape + + if len(shape) == 1: + yield from bp.rel_scan(tuple(detectors), *args, num=shape[0], md=metadata) + elif len(shape) > 1: + yield from bp.rel_grid_scan( + tuple(detectors), *args, snake_axes=snake_axes, md=metadata + ) + + +def _make_list_scan_args(params: dict[Movable | Motor, list[float | int]], grid: bool): + shape = [] + args = [] + for param, num in zip(params, range(len(params)), strict=True): + if num == 0: + shape.append(len(params[param])) + elif num >= 1 and grid: + shape.append(len(params[param])) + args.append(param) + args.append(params[param]) + return args, shape + + +@attach_data_session_metadata_decorator() +@validate_call(config={"arbitrary_types_allowed": True}) +def list_scan( + detectors: Annotated[ + Sequence[Readable | AsyncReadable], + Field( + description="Set of readable devices, will take a reading at each point", + min_length=1, + ), + ], + params: Annotated[ + dict[Movable | Motor, list[float | int]], + Field( + description="Dictionary of 'device: paramater' keys. For all trajectories, " + "provide '{movable1: [point1, point2, ... ], movableN: [point1, point2, " + "...]}'." + ), + ], + grid: bool = False, + snake_axes: bool = False, # Currently specifying axes to snake is not supported + metadata: dict[str, Any] | None = None, +) -> MsgGenerator: + """Scan over concurrent or independent trajectories relative to current position. + To scan over concurrent trajectories, grid = False, and independent trajectories, + grid = True. + Wraps bluesky.plans.list_scan(det, *args, num, md=metadata) and + bluesky.plans.list_grid_scan(det, *args, md=metadata)""" + args, shape = _make_list_scan_args(params=params, grid=grid) + metadata = metadata or {} + metadata["shape"] = shape + + if len(shape) == 1: + yield from bp.list_scan(tuple(detectors), *args, md=metadata) + elif len(shape) > 1: + yield from bp.list_grid_scan( + tuple(detectors), *args, snake_axes=snake_axes, md=metadata + ) + + +@attach_data_session_metadata_decorator() +@validate_call(config={"arbitrary_types_allowed": True}) +def list_rscan( + detectors: Annotated[ + Sequence[Readable | AsyncReadable], + Field( + description="Set of readable devices, will take a reading at each point", + min_length=1, + ), + ], + params: Annotated[ + dict[Movable | Motor, list[float | int]], + Field( + description="Dictionary of 'device: paramater' keys. For all trajectories, " + "provide '{movable1: [point1, point2, ... ], movableN: [point1, point2, " + "...]}'." + ), + ], + grid: bool = False, + snake_axes: bool = False, # Currently specifying axes to snake is not supported + metadata: dict[str, Any] | None = None, +) -> MsgGenerator: + """Scan over concurrent or independent trajectories relative to current position. + To scan over concurrent trajectories, grid = False, and independent trajectories, + grid = True. + Wraps bluesky.plans.rel_list_scan(det, *args, num, md=metadata) and + bluesky.plans.rel_list_grid_scan(det, *args, md=metadata)""" + args, shape = _make_list_scan_args(params=params, grid=grid) + metadata = metadata or {} + metadata["shape"] = shape + + if len(shape) == 1: + yield from bp.rel_list_scan(tuple(detectors), *args, md=metadata) + elif len(shape) > 1: + yield from bp.rel_list_grid_scan( + tuple(detectors), *args, snake_axes=snake_axes, md=metadata + ) + + +def _make_stepped_list( + params: list[Any] | Sequence[Any], + num: int | None = None, +): + def round_list_elements(stepped_list, step): + d = Decimal(str(step)) + exponent = d.as_tuple().exponent + decimal_places = -exponent # type: ignore + return np.round(stepped_list, decimals=decimal_places).tolist() + + start = params[0] + if len(params) == 3: + stop = params[1] + step = params[2] + if abs(step) > abs(stop - start): + step = stop - start + step = abs(step) * np.sign(stop - start) + stepped_list = np.arange(start=start, stop=stop, step=step).tolist() + if abs((stepped_list[-1] + step) - stop) <= abs(step * 0.01): + stepped_list.append(stepped_list[-1] + step) + rounded_stepped_list = round_list_elements(stepped_list=stepped_list, step=step) + elif len(params) == 2 and num: + step = params[1] + stepped_list = [start + (n * step) for n in range(num)] + rounded_stepped_list = round_list_elements(stepped_list=stepped_list, step=step) + else: + raise ValueError( + f"You provided {len(params)}, rather than 3, or 2 and number of points." + ) + + return rounded_stepped_list, len(rounded_stepped_list) + + +def _make_step_scan_args(params: dict[Movable | Motor, list[float | int]]): + args = [] + shape = [] + stepped_list_length = None + for param, movable_num in zip(params, range(len(params)), strict=True): + if movable_num == 0: + if len(params[param]) == 3: + stepped_list, stepped_list_length = _make_stepped_list( + params=params[param] + ) + args.append(param) + args.append(stepped_list) + shape.append(stepped_list_length) + else: + raise ValueError( + f"You provided {len(params[param])} parameters, rather than 3." + ) + elif movable_num >= 1: + if len(params[param]) == 2: + stepped_list, stepped_list_length = _make_stepped_list( + params=params[param], num=stepped_list_length + ) + args.append(param) + args.append(stepped_list) + elif len(params[param]) == 3: + stepped_list, stepped_list_length = _make_stepped_list( + params=params[param] + ) + args.append(param) + args.append(stepped_list) + shape.append(stepped_list_length) + else: + raise ValueError( + f"You provided {len(params[param])} parameters, rather than 2 or 3." + ) + if (len(args) / len(shape)) not in [2, len(args)]: + raise ValueError( + "Incorrect number of parameters, unsure if scan is concurrent/independent." + ) + + return args, shape + + +@attach_data_session_metadata_decorator() +@validate_call(config={"arbitrary_types_allowed": True}) +def step_scan( + detectors: Annotated[ + Sequence[Readable | AsyncReadable], + Field( + description="Set of readable devices, will take a reading at each point", + min_length=1, + ), + ], + params: Annotated[ + dict[Movable | Motor, list[float | int]], + Field( + description="Dictionary of 'device: paramater' keys. For concurrent " + "trajectories, provide '{movable1: [start1, stop1, step1], movable2: " + "[start2, step2], ... , movableN: [startN, stepN]}'. For independent " + "trajectories, provide '{movable1: [start1, stop1, step1], ... , movableN: " + "[startN, stopN, stepN]}'." + ), + ], + snake_axes: bool = False, # Currently specifying axes to snake is not supported + metadata: dict[str, Any] | None = None, +) -> MsgGenerator: + """Scan over multi-motor trajectories with specified step size. + Generates lists of points for each trajectory for + bluesky.plans.list_scan(det, *args, num, md=metadata) and + bluesky.plans.list_grid_scan(det, *args, md=metadata).""" + args, shape = _make_step_scan_args(params) + metadata = metadata or {} + metadata["shape"] = shape + + if len(shape) == 1: + yield from bp.list_scan(tuple(detectors), *args, md=metadata) + elif len(shape) > 1: + yield from bp.list_grid_scan( + tuple(detectors), *args, snake_axes=snake_axes, md=metadata + ) + + +@attach_data_session_metadata_decorator() +@validate_call(config={"arbitrary_types_allowed": True}) +def step_rscan( + detectors: Annotated[ + Sequence[Readable | AsyncReadable], + Field( + description="Set of readable devices, will take a reading at each point", + min_length=1, + ), + ], + params: Annotated[ + dict[Movable | Motor, list[float | int]], + Field( + description="Dictionary of 'device: paramater' keys. For concurrent " + "trajectories, provide '{movable1: [start1, stop1, step1], movable2: " + "[start2, step2], ... , movableN: [startN, stepN]}'. For independent " + "trajectories, provide '{movable1: [start1, stop1, step1], ... , movableN: " + "[startN, stopN, stepN]}'." + ), + ], + snake_axes: bool = False, # Currently specifying axes to snake is not supported + metadata: dict[str, Any] | None = None, +) -> MsgGenerator: + """Scan over multi-motor trajectories with specified step size. + Generates lists of points for each trajectory for + bluesky.plans.list_scan(det, *args, num, md=metadata) and + bluesky.plans.list_grid_scan(det, *args, md=metadata).""" + args, shape = _make_step_scan_args(params) + metadata = metadata or {} + metadata["shape"] = shape + + if len(shape) == 1: + yield from bp.rel_list_scan(tuple(detectors), *args, md=metadata) + elif len(shape) > 1: + yield from bp.rel_list_grid_scan( + tuple(detectors), *args, snake_axes=snake_axes, md=metadata + ) diff --git a/system_tests/test_adsim.py b/system_tests/test_adsim.py index 1de9e4b999b..5940c2c23e6 100644 --- a/system_tests/test_adsim.py +++ b/system_tests/test_adsim.py @@ -90,7 +90,7 @@ def test_plan_produces_expected_start_document( run_engine_documents: Mapping[str, list[DocumentType]], det: StandardDetector, ): - run_engine(count({det}, num=num)) + run_engine(count([det], num=num)) docs = run_engine_documents.get("start") assert docs and len(docs) == 1 @@ -115,7 +115,7 @@ def test_plan_produces_expected_stop_document( run_engine_documents: Mapping[str, list[DocumentType]], det: StandardDetector, ): - run_engine(count({det}, num=num)) + run_engine(count([det], num=num)) docs = run_engine_documents.get("stop") assert docs and len(docs) == 1 @@ -132,7 +132,7 @@ def test_plan_produces_expected_descriptor( run_engine_documents: Mapping[str, list[DocumentType]], det: StandardDetector, ): - run_engine(count({det}, num=num)) + run_engine(count([det], num=num)) docs = run_engine_documents.get("descriptor") assert docs and len(docs) == 1 @@ -151,7 +151,7 @@ def test_plan_produces_expected_events( run_engine_documents: Mapping[str, list[DocumentType]], det: StandardDetector, ): - run_engine(count({det}, num=num)) + run_engine(count([det], num=num)) docs = run_engine_documents.get("event") assert docs and len(docs) == length @@ -169,7 +169,7 @@ def test_plan_produces_expected_resources( run_engine_documents: Mapping[str, list[DocumentType]], det: StandardDetector, ): - run_engine(count({det}, num=num)) + run_engine(count([det], num=num)) docs = run_engine_documents.get("stream_resource") data_keys = [det.name] assert docs and len(docs) == len(data_keys) @@ -192,7 +192,7 @@ def test_plan_produces_expected_datums( run_engine_documents: Mapping[str, list[DocumentType]], det: StandardDetector, ): - run_engine(count({det}, num=num)) + run_engine(count([det], num=num)) docs = cast(list[StreamDatum], run_engine_documents.get("stream_datum")) data_keys = [det.name] # If we enable e.g. Stats plugin add to this assert ( diff --git a/tests/plan_stubs/test_wrapped_stubs.py b/tests/plan_stubs/test_wrapped_stubs.py index 9af2e5d5dd7..cbc77ac9bd2 100644 --- a/tests/plan_stubs/test_wrapped_stubs.py +++ b/tests/plan_stubs/test_wrapped_stubs.py @@ -10,9 +10,11 @@ from dodal.plan_stubs.wrapped import ( move, move_relative, + rd, set_absolute, set_relative, sleep, + stop, wait, ) @@ -147,3 +149,11 @@ def test_wait_group_and_timeout(): assert list(wait("foo", 5.0)) == [ Msg("wait", group="foo", timeout=5.0, error_on_timeout=True, watch=_EMPTY) ] + + +def test_rd(x_axis: SimMotor): + assert list(rd(x_axis)) == [Msg("locate", obj=x_axis)] + + +def test_stop(x_axis: SimMotor): + assert list(stop(x_axis)) == [Msg("stop", obj=x_axis)] diff --git a/tests/plans/conftest.py b/tests/plans/conftest.py index 90128ece4df..6dfa4882163 100644 --- a/tests/plans/conftest.py +++ b/tests/plans/conftest.py @@ -78,6 +78,13 @@ def y_axis() -> SimMotor: return y_axis +@pytest.fixture +def z_axis() -> SimMotor: + with init_devices(mock=True): + z_axis = SimMotor() + return z_axis + + @pytest.fixture def path_provider(static_path_provider: PathProvider): # Prevents issue with leftover state from beamline tests diff --git a/tests/plans/test_wrapped.py b/tests/plans/test_wrapped.py index b6b76dbc88c..6df6b42914f 100644 --- a/tests/plans/test_wrapped.py +++ b/tests/plans/test_wrapped.py @@ -1,5 +1,5 @@ from collections.abc import Sequence -from typing import cast +from typing import Any, cast import pytest from bluesky.protocols import Readable @@ -13,11 +13,25 @@ StreamResource, ) from ophyd_async.core import ( + AsyncReadable, StandardDetector, ) from pydantic import ValidationError -from dodal.plans.wrapped import count +from dodal.devices.motors import Motor +from dodal.plans.wrapped import ( + _make_list_scan_args, + _make_num_scan_args, + _make_step_scan_args, + _make_stepped_list, + count, + list_rscan, + list_scan, + num_rscan, + num_scan, + step_rscan, + step_scan, +) @pytest.fixture @@ -26,7 +40,7 @@ def documents_from_num( ) -> dict[str, list[Document]]: docs: dict[str, list[Document]] = {} run_engine( - count({det}, num=request.param), + count([det], num=request.param), lambda name, doc: docs.setdefault(name, []).append(doc), ) return docs @@ -50,16 +64,16 @@ def test_count_delay_validation(det: StandardDetector, run_engine: RunEngine): } for delay, reason in args.items(): with pytest.raises((ValidationError, AssertionError), match=reason): - run_engine(count({det}, num=3, delay=delay)) + run_engine(count([det], num=3, delay=delay)) print(delay) def test_count_detectors_validation(run_engine: RunEngine): - args: dict[str, set[Readable]] = { + args: dict[str, Sequence[Readable | AsyncReadable]] = { # No device to read - "Set should have at least 1 item after validation, not 0": set(), + "1 validation error for count": set(), # Not Readable - "Input should be an instance of Readable": set("foo"), # type: ignore + "Input should be an instance of Sequence": set("foo"), # type: ignore } for reason, dets in args.items(): with pytest.raises(ValidationError, match=reason): @@ -74,7 +88,7 @@ def test_count_num_validation(det: StandardDetector, run_engine: RunEngine): } for num, reason in args.items(): with pytest.raises(ValidationError, match=reason): - run_engine(count({det}, num=num)) + run_engine(count([det], num=num)) @pytest.mark.parametrize( @@ -157,3 +171,634 @@ def test_plan_produces_expected_datums( docs = documents_from_num.get("stream_datum") data_keys = [det.name, f"{det.name}-sum"] assert docs and len(docs) == len(data_keys) * length + + +@pytest.mark.parametrize( + "x_list, y_list, final_shape, final_length", + ( + [[0.0, 1.1, 3], [2.2, 3.3], [3], 6], + [[0.0, 1.1, 2], [2.2, 3.3, 3], [2, 3], 8], + ), +) +def test_make_num_scan_args( + x_axis: Motor, + y_axis: Motor, + x_list: list[float | int], + y_list: list[float | int], + final_shape: list[int], + final_length: int, +): + args, shape = _make_num_scan_args({x_axis: x_list, y_axis: y_list}) + assert shape == final_shape + assert len(args) == final_length + assert args[0] == x_axis + + +@pytest.mark.parametrize("x_args", ([0.0, 2.2, 5], [1.1, -1.1, 3])) +def test_num_scan_with_one_axis( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_args: list[float | int], +): + run_engine(num_scan(detectors=[det], params={x_axis: x_args})) + + +@pytest.mark.parametrize( + "x_args, y_args", ([[-1.1, 1.1, 5], [2.2, -2.2]], [[0, 1.1, 5], [2.2, 3.3]]) +) +def test_num_scan_with_two_axes_and_concurrent_trajectories( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_args: list[float | int], + y_axis: Motor, + y_args: list[float | int], +): + run_engine( + num_scan( + detectors=[det], + params={x_axis: x_args, y_axis: y_args}, + ) + ) + + +@pytest.mark.parametrize( + "x_args, y_args", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) +) +def test_num_scan_with_two_axes_and_independent_trajectories( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_args: list[float | int], + y_axis: Motor, + y_args: list[float | int], +): + run_engine( + num_scan( + detectors=[det], + params={x_axis: x_args, y_axis: y_args}, + ) + ) + + +@pytest.mark.parametrize( + "x_args, y_args", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) +) +def test_num_scan_with_two_axes_and_independent_trajectories_when_snaking( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_args: list[float | int], + y_axis: Motor, + y_args: list[float | int], +): + run_engine( + num_scan( + detectors=[det], params={x_axis: x_args, y_axis: y_args}, snake_axes=True + ) + ) + + +def test_num_scan_fails_when_given_wrong_number_of_params( + run_engine: RunEngine, det: StandardDetector, x_axis: Motor, y_axis: Motor +): + with pytest.raises(ValueError): + run_engine( + num_scan(detectors=[det], params={x_axis: [0, 1.1, 2], y_axis: [1.1]}) + ) + + +@pytest.mark.parametrize( + "x_args, y_args,", ([[-1, 1, 0], [2, 0]], [[-1, 1, 3.5], [-1, 1]]) +) +def test_num_scan_fails_when_given_bad_info( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_args: list[float | int], + y_axis: Motor, + y_args: list[float | int], +): + with pytest.raises(ValueError): + run_engine( + num_scan( + detectors=[det], + params={x_axis: x_args, y_axis: y_args}, + ) + ) + + +@pytest.mark.parametrize( + "x_args, y_args", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) +) +def test_num_scan_fails_when_asked_to_snake_slow_axis( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_args: list[float | int], + y_axis: Motor, + y_args: list[float | int], +): + with pytest.raises(ValueError): + run_engine( + num_scan( + detectors=[det], + params={x_axis: x_args, y_axis: y_args}, + snake_axes=[x_axis], + ) + ) + + +@pytest.mark.parametrize("x_args", ([0.0, 2.2, 5], [1.1, -1.1, 3])) +def test_num_rscan_with_one_axis( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_args: list[float | int], +): + run_engine(num_rscan(detectors=[det], params={x_axis: x_args})) + + +@pytest.mark.parametrize( + "x_args, y_args", ([[-1.1, 1.1, 5], [2.2, -2.2]], [[0, 1.1, 5], [2.2, 3.3]]) +) +def test_num_rscan_with_two_axes_and_concurrent_trajectories( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_args: list[float | int], + y_axis: Motor, + y_args: list[float | int], +): + run_engine( + num_rscan( + detectors=[det], + params={x_axis: x_args, y_axis: y_args}, + ) + ) + + +@pytest.mark.parametrize( + "x_args, y_args", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) +) +def test_num_rscan_with_two_axes_and_independent_trajectories( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_args: list[float | int], + y_axis: Motor, + y_args: list[float | int], +): + run_engine( + num_rscan( + detectors=[det], + params={x_axis: x_args, y_axis: y_args}, + ) + ) + + +@pytest.mark.parametrize( + "x_args, y_args", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) +) +def test_num_rscan_with_two_axes_and_independent_trajectories_when_snaking( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_args: list[float | int], + y_axis: Motor, + y_args: list[float | int], +): + run_engine( + num_rscan( + detectors=[det], params={x_axis: x_args, y_axis: y_args}, snake_axes=True + ) + ) + + +def test_num_rscan_fails_when_given_wrong_number_of_params( + run_engine: RunEngine, det: StandardDetector, x_axis: Motor, y_axis: Motor +): + with pytest.raises(ValueError): + run_engine( + num_rscan(detectors=[det], params={x_axis: [0, 1.1, 2], y_axis: [1.1]}) + ) + + +@pytest.mark.parametrize( + "x_args, y_args,", ([[-1, 1, 0], [2, 0]], [[-1, 1, 3.5], [-1, 1]]) +) +def test_num_rscan_fails_when_given_bad_info( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_args: list[float | int], + y_axis: Motor, + y_args: list[float | int], +): + with pytest.raises(ValueError): + run_engine( + num_rscan( + detectors=[det], + params={x_axis: x_args, y_axis: y_args}, + ) + ) + + +@pytest.mark.parametrize( + "x_args, y_args", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) +) +def test_num_rscan_fails_when_asked_to_snake_slow_axis( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_args: list[float | int], + y_axis: Motor, + y_args: list[float | int], +): + with pytest.raises(ValueError): + run_engine( + num_rscan( + detectors=[det], + params={x_axis: x_args, y_axis: y_args}, + snake_axes=[x_axis], + ) + ) + + +@pytest.mark.parametrize( + "x_args, y_args, grid, final_shape, final_length", + ([[0, 1, 2], [3, 4, 5], False, [3], 4], [[0, 1, 2], [3, 4, 5, 6], True, [3, 4], 4]), +) +def test_make_list_scan_args( + x_axis: Motor, + x_args: list, + y_axis: Motor, + y_args: list, + grid: bool, + final_shape: list, + final_length: int, +): + args, shape = _make_list_scan_args( + params={x_axis: x_args, y_axis: y_args}, grid=grid + ) + assert len(args) == final_length + assert shape == final_shape + + +@pytest.mark.parametrize("x_list", ([0, 1, 2, 3], [1.1, 2.2, 3.3])) +def test_list_scan( + run_engine: RunEngine, det: StandardDetector, x_axis: Motor, x_list: Any +): + run_engine(list_scan(detectors=[det], params={x_axis: x_list})) + + +@pytest.mark.parametrize( + "x_list, y_list", + ( + [[3, 2, 1], [1, 2, 3]], + [[-1.1, -2.2, -3.3, -4.4, -5.5], [1.1, 2.2, 3.3, 4.4, 5.5]], + ), +) +def test_list_scan_with_two_axes_and_concurrent_trajectories( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_list: list, + y_axis: Motor, + y_list: list, +): + run_engine(list_scan(detectors=[det], params={x_axis: x_list, y_axis: y_list})) + + +def test_list_scan_with_concurrent_trajectories_fails_with_differnt_list_lengths( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + y_axis: Motor, +): + with pytest.raises(ValueError): + run_engine( + list_scan( + detectors=[det], + params={x_axis: [1, 2, 3, 4, 5], y_axis: [1, 2, 3, 4]}, + ) + ) + + +@pytest.mark.parametrize( + "x_list, y_list", + ( + [[3, 2, 1], [1, 2, 3, 4]], + [[-1.1, -2.2, -3.3, -4.4, -5.5], [1.1, 2.2, 3.3, 4.4, 5.5]], + ), +) +def test_list_scan_with_two_axes_and_independent_trajectories( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_list: list, + y_axis: Motor, + y_list: list, +): + run_engine( + list_scan(detectors=[det], params={x_axis: x_list, y_axis: y_list}, grid=True) + ) + + +@pytest.mark.parametrize("x_list", ([0, 1, 2, 3], [1.1, 2.2, 3.3])) +def test_list_rscan( + run_engine: RunEngine, det: StandardDetector, x_axis: Motor, x_list: Any +): + run_engine(list_rscan(detectors=[det], params={x_axis: x_list})) + + +@pytest.mark.parametrize( + "x_list, y_list", + ( + [[3, 2, 1], [1, 2, 3]], + [[-1.1, -2.2, -3.3, -4.4, -5.5], [1.1, 2.2, 3.3, 4.4, 5.5]], + ), +) +def test_list_rscan_with_two_axes_and_concurrent_trajectories( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_list: list, + y_axis: Motor, + y_list: list, +): + run_engine(list_rscan(detectors=[det], params={x_axis: x_list, y_axis: y_list})) + + +def test_list_rscan_with_concurrent_trajectories_fails_with_differnt_list_lengths( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + y_axis: Motor, +): + with pytest.raises(ValueError): + run_engine( + list_rscan( + detectors=[det], + params={x_axis: [1, 2, 3, 4, 5], y_axis: [1, 2, 3, 4]}, + ) + ) + + +@pytest.mark.parametrize( + "x_list, y_list", + ( + [[3, 2, 1], [1, 2, 3, 4]], + [[-1.1, -2.2, -3.3, -4.4, -5.5], [1.1, 2.2, 3.3, 4.4, 5.5]], + ), +) +def test_list_rscan_with_two_axes_and_independent_trajectories( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_list: list, + y_axis: Motor, + y_list: list, +): + run_engine( + list_rscan(detectors=[det], params={x_axis: x_list, y_axis: y_list}, grid=True) + ) + + +@pytest.mark.parametrize( + "params", + ( + [-1, 1, 0.1], + [-2, 2, 0.2], + [1, -1, -0.1], + [2, -2, -0.2], + [1, -1, 0.1], + [2, -2, 0.2], + ), +) +def test_make_stepped_list_when_given_three_params(params: list[Any]): + stepped_list, stepped_list_length = _make_stepped_list(params=params) + assert stepped_list_length == 21 + assert stepped_list[0] / stepped_list[-1] == -1 + assert stepped_list[10] == 0 + + +@pytest.mark.parametrize("params", ([-1, 0.1], [-2, 0.2], [1, -0.1], [2, -0.2])) +def test_make_stepped_list_when_given_two_params(params: list[Any]): + stepped_list, stepped_list_length = _make_stepped_list(params=params, num=21) + assert stepped_list_length == 21 + assert stepped_list[0] / stepped_list[-1] == -1 + assert stepped_list[10] == 0 + + +def test_make_stepped_list_when_given_wrong_number_of_params(): + with pytest.raises(ValueError): + _make_stepped_list(params=[1]) + + +@pytest.mark.parametrize( + "x_args, y_args, final_shape, final_length", + ( + [[0, 1, 0.25], [0, 0.1], [5], 4], + [[0, 1, 0.25], [0, 1, 0.2], [5, 6], 4], + [[0, -1, -0.25], [0, -0.1], [5], 4], + [[0, -1, -0.25], [0, -1, -0.2], [5, 6], 4], + ), +) +def test_make_step_scan_args( + x_axis: Motor, + x_args: list, + y_axis: Motor, + y_args: list, + final_shape: list, + final_length: int, +): + args, shape = _make_step_scan_args(params={x_axis: x_args, y_axis: y_args}) + assert shape == final_shape + assert len(args) == final_length + assert args[0] == x_axis + assert args[2] == y_axis + + +@pytest.mark.parametrize( + "x_args, y_args, z_args", + ( + [[0, 1], [0, 0.2], [0, 1, 0.5]], + [[0, 1, 0.25], [0, 1, 0.2, 1], [0, 1, 0.5]], + [[0, 1, 0.25], [0, 0.2], [0, 1, 0.5]], + [[0, 1, 0.25], [0, 1, 0.2], [0, 0.5]], + ), +) +def test_make_step_scan_args_fails_when_given_incorrect_number_of_parameters( + x_axis: Motor, + x_args: list, + y_axis: Motor, + y_args: list, + z_axis: Motor, + z_args: list, +): + with pytest.raises(ValueError): + _make_step_scan_args(params={x_axis: x_args, y_axis: y_args, z_axis: z_args}) + + +@pytest.mark.parametrize("x_args", ([0, 1, 0.1], [-1, 1, 0.1], [0, 10, 1])) +def test_step_scan( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_args: list[Any], +): + run_engine(step_scan(detectors=[det], params={x_axis: x_args})) + + +@pytest.mark.parametrize( + "x_args, y_args", + ([[0, 1, 0.25], [0, 0.1]], [[-1, 1, 0.25], [-1, 0.1]], [[0, 10, 2.5], [0, 1]]), +) +def test_step_scan_with_multiple_axes_and_concurrent_trajectories( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_args: list, + y_axis: Motor, + y_args: list, +): + run_engine(step_scan(detectors=[det], params={x_axis: x_args, y_axis: y_args})) + + +@pytest.mark.parametrize( + "x_args, y_args", + ( + [[0, 1, 0.25], [0, 2, 0.5]], + [[-1, 1, 0.25], [1, -1, -0.5]], + [[0, 10, 2.5], [0, -10, -2.5]], + ), +) +def test_step_scan_with_multiple_axes_and_independent_trajectories( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_args: list, + y_axis: Motor, + y_args: list, +): + run_engine(step_scan(detectors=[det], params={x_axis: x_args, y_axis: y_args})) + + +@pytest.mark.parametrize( + "x_args, y_args", + ( + [[0, 1, 0.25], [0, 2, 0.5]], + [[-1, 1, 0.25], [1, -1, -0.5]], + ), +) +def test_step_scan_with_multiple_axes_and_independent_trajectories_when_snaking( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_args: list, + y_axis: Motor, + y_args: list, +): + run_engine( + step_scan( + detectors=[det], params={x_axis: x_args, y_axis: y_args}, snake_axes=True + ) + ) + + +@pytest.mark.parametrize( + "x_args, y_args", ([[0, 1, 0.1], [0, 1, 0.1, 1]], [[0, 1, 0.1], [0]]) +) +def test_step_scan_fails_when_given_incorrect_number_of_params( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_args: list, + y_axis: Motor, + y_args: list, +): + with pytest.raises(ValueError): + run_engine(step_scan(detectors=[det], params={x_axis: x_args, y_axis: y_args})) + + +@pytest.mark.parametrize("x_args", ([0, 1, 0.1], [-1, 1, 0.1], [0, 10, 1])) +def test_step_rscan( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_args: list[Any], +): + run_engine(step_rscan(detectors=[det], params={x_axis: x_args})) + + +@pytest.mark.parametrize( + "x_args, y_args", + ([[0, 1, 0.25], [0, 0.1]], [[-1, 1, 0.25], [-1, 0.1]], [[0, 10, 2.5], [0, 1]]), +) +def test_step_rscan_with_multiple_axes_and_concurrent_trajectories( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_args: list, + y_axis: Motor, + y_args: list, +): + run_engine(step_rscan(detectors=[det], params={x_axis: x_args, y_axis: y_args})) + + +@pytest.mark.parametrize( + "x_args, y_args", + ( + [[0, 1, 0.25], [0, 2, 0.5]], + [[-1, 1, 0.25], [1, -1, -0.5]], + [[0, 10, 2.5], [0, -10, -2.5]], + ), +) +def test_step_rscan_with_multiple_axes_and_independent_trajectories( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_args: list, + y_axis: Motor, + y_args: list, +): + run_engine(step_rscan(detectors=[det], params={x_axis: x_args, y_axis: y_args})) + + +@pytest.mark.parametrize( + "x_args, y_args", + ( + [[0, 1, 0.25], [0, 2, 0.5]], + [[-1, 1, 0.25], [1, -1, -0.5]], + ), +) +def test_step_rscan_with_multiple_axes_and_independent_trajectories_when_snaking( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_args: list, + y_axis: Motor, + y_args: list, +): + run_engine( + step_rscan( + detectors=[det], params={x_axis: x_args, y_axis: y_args}, snake_axes=True + ) + ) + + +@pytest.mark.parametrize( + "x_args, y_args", ([[0, 1, 0.1], [0, 1, 0.1, 1]], [[0, 1, 0.1], [0]]) +) +def test_step_rscan_fails_when_given_incorrect_number_of_params( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_args: list, + y_axis: Motor, + y_args: list, +): + with pytest.raises(ValueError): + run_engine(step_rscan(detectors=[det], params={x_axis: x_args, y_axis: y_args})) From e1308aa7edd351e40b72d3924c1ea397e6745b6a Mon Sep 17 00:00:00 2001 From: Emily Arnold Date: Tue, 25 Nov 2025 12:15:52 +0000 Subject: [PATCH 2/7] add test for incorrect step size --- tests/plans/test_wrapped.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/plans/test_wrapped.py b/tests/plans/test_wrapped.py index 6df6b42914f..57fa3528d2e 100644 --- a/tests/plans/test_wrapped.py +++ b/tests/plans/test_wrapped.py @@ -597,6 +597,12 @@ def test_make_stepped_list_when_given_wrong_number_of_params(): _make_stepped_list(params=[1]) +def test_make_stepped_list_when_given_step_larger_than_range(): + stepped_list, stepped_list_length = _make_stepped_list(params=[1, 2, 3]) + assert stepped_list_length == 2 + assert stepped_list == [1, 2] + + @pytest.mark.parametrize( "x_args, y_args, final_shape, final_length", ( From d7649d4e460939deac93a094d9395ce454c54dbb Mon Sep 17 00:00:00 2001 From: Emily Arnold Date: Wed, 26 Nov 2025 16:36:24 +0000 Subject: [PATCH 3/7] Make snaking default, add check that start!=stop --- src/dodal/plans/wrapped.py | 16 ++++++++++------ tests/plans/test_wrapped.py | 21 +++++++++++++-------- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/src/dodal/plans/wrapped.py b/src/dodal/plans/wrapped.py index 7a9cbb4a07b..6a0f1a1302b 100644 --- a/src/dodal/plans/wrapped.py +++ b/src/dodal/plans/wrapped.py @@ -95,7 +95,7 @@ def num_scan( "numN]}'." ), ], - snake_axes: list | bool | None = None, + snake_axes: list | bool = True, metadata: dict[str, Any] | None = None, ) -> MsgGenerator: """Scan over concurrent or independent multi-motor trajectories. @@ -133,7 +133,7 @@ def num_rscan( "numN]}'." ), ], - snake_axes: list | bool | None = None, + snake_axes: list | bool = True, metadata: dict[str, Any] | None = None, ) -> MsgGenerator: """Scan over concurrent or independent trajectories relative to current position. @@ -183,7 +183,7 @@ def list_scan( ), ], grid: bool = False, - snake_axes: bool = False, # Currently specifying axes to snake is not supported + snake_axes: bool = True, # Currently specifying axes to snake is not supported metadata: dict[str, Any] | None = None, ) -> MsgGenerator: """Scan over concurrent or independent trajectories relative to current position. @@ -222,7 +222,7 @@ def list_rscan( ), ], grid: bool = False, - snake_axes: bool = False, # Currently specifying axes to snake is not supported + snake_axes: bool = True, # Currently specifying axes to snake is not supported metadata: dict[str, Any] | None = None, ) -> MsgGenerator: """Scan over concurrent or independent trajectories relative to current position. @@ -256,6 +256,10 @@ def round_list_elements(stepped_list, step): if len(params) == 3: stop = params[1] step = params[2] + if start == stop: + raise ValueError( + f"Start ({start}) and stop ({stop}) values cannot be the same." + ) if abs(step) > abs(stop - start): step = stop - start step = abs(step) * np.sign(stop - start) @@ -338,7 +342,7 @@ def step_scan( "[startN, stopN, stepN]}'." ), ], - snake_axes: bool = False, # Currently specifying axes to snake is not supported + snake_axes: bool = True, # Currently specifying axes to snake is not supported metadata: dict[str, Any] | None = None, ) -> MsgGenerator: """Scan over multi-motor trajectories with specified step size. @@ -377,7 +381,7 @@ def step_rscan( "[startN, stopN, stepN]}'." ), ], - snake_axes: bool = False, # Currently specifying axes to snake is not supported + snake_axes: bool = True, # Currently specifying axes to snake is not supported metadata: dict[str, Any] | None = None, ) -> MsgGenerator: """Scan over multi-motor trajectories with specified step size. diff --git a/tests/plans/test_wrapped.py b/tests/plans/test_wrapped.py index 57fa3528d2e..2c6bb772346 100644 --- a/tests/plans/test_wrapped.py +++ b/tests/plans/test_wrapped.py @@ -245,7 +245,7 @@ def test_num_scan_with_two_axes_and_independent_trajectories( @pytest.mark.parametrize( "x_args, y_args", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) ) -def test_num_scan_with_two_axes_and_independent_trajectories_when_snaking( +def test_num_scan_with_two_axes_and_independent_trajectories_when_not_snaking( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, @@ -255,7 +255,7 @@ def test_num_scan_with_two_axes_and_independent_trajectories_when_snaking( ): run_engine( num_scan( - detectors=[det], params={x_axis: x_args, y_axis: y_args}, snake_axes=True + detectors=[det], params={x_axis: x_args, y_axis: y_args}, snake_axes=False ) ) @@ -361,7 +361,7 @@ def test_num_rscan_with_two_axes_and_independent_trajectories( @pytest.mark.parametrize( "x_args, y_args", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) ) -def test_num_rscan_with_two_axes_and_independent_trajectories_when_snaking( +def test_num_rscan_with_two_axes_and_independent_trajectories_when_not_snaking( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, @@ -371,7 +371,7 @@ def test_num_rscan_with_two_axes_and_independent_trajectories_when_snaking( ): run_engine( num_rscan( - detectors=[det], params={x_axis: x_args, y_axis: y_args}, snake_axes=True + detectors=[det], params={x_axis: x_args, y_axis: y_args}, snake_axes=False ) ) @@ -603,6 +603,11 @@ def test_make_stepped_list_when_given_step_larger_than_range(): assert stepped_list == [1, 2] +def test_make_stepped_list_fails_when_given_equal_start_and_stop_values(): + with pytest.raises(ValueError): + _make_stepped_list(params=[1.1, 1.1, 0.25]) + + @pytest.mark.parametrize( "x_args, y_args, final_shape, final_length", ( @@ -699,7 +704,7 @@ def test_step_scan_with_multiple_axes_and_independent_trajectories( [[-1, 1, 0.25], [1, -1, -0.5]], ), ) -def test_step_scan_with_multiple_axes_and_independent_trajectories_when_snaking( +def test_step_scan_with_multiple_axes_and_independent_trajectories_when_not_snaking( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, @@ -709,7 +714,7 @@ def test_step_scan_with_multiple_axes_and_independent_trajectories_when_snaking( ): run_engine( step_scan( - detectors=[det], params={x_axis: x_args, y_axis: y_args}, snake_axes=True + detectors=[det], params={x_axis: x_args, y_axis: y_args}, snake_axes=False ) ) @@ -780,7 +785,7 @@ def test_step_rscan_with_multiple_axes_and_independent_trajectories( [[-1, 1, 0.25], [1, -1, -0.5]], ), ) -def test_step_rscan_with_multiple_axes_and_independent_trajectories_when_snaking( +def test_step_rscan_with_multiple_axes_and_independent_trajectories_when_not_snaking( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, @@ -790,7 +795,7 @@ def test_step_rscan_with_multiple_axes_and_independent_trajectories_when_snaking ): run_engine( step_rscan( - detectors=[det], params={x_axis: x_args, y_axis: y_args}, snake_axes=True + detectors=[det], params={x_axis: x_args, y_axis: y_args}, snake_axes=False ) ) From 747ff681828b844a4bfcab1a3cee69397ba1641b Mon Sep 17 00:00:00 2001 From: Emily Arnold Date: Tue, 6 Jan 2026 09:50:03 +0000 Subject: [PATCH 4/7] Change wrapped plans from inferred grids to explicit grids --- src/dodal/plans/__init__.py | 12 ++ src/dodal/plans/wrapped.py | 398 +++++++++++++++++++++++++----------- tests/plans/test_wrapped.py | 262 +++++++++++++----------- 3 files changed, 441 insertions(+), 231 deletions(-) diff --git a/src/dodal/plans/__init__.py b/src/dodal/plans/__init__.py index 0197102b824..50b6a976229 100644 --- a/src/dodal/plans/__init__.py +++ b/src/dodal/plans/__init__.py @@ -1,21 +1,33 @@ from .spec_path import spec_scan from .wrapped import ( count, + list_grid_rscan, + list_grid_scan, list_rscan, list_scan, + num_grid_rscan, + num_grid_scan, num_rscan, num_scan, + step_grid_rscan, + step_grid_scan, step_rscan, step_scan, ) __all__ = [ "count", + "list_grid_rscan", + "list_grid_scan", "list_rscan", "list_scan", + "num_grid_rscan", + "num_grid_scan", "num_rscan", "num_scan", "spec_scan", + "step_grid_rscan", + "step_grid_scan", "step_rscan", "step_scan", ] diff --git a/src/dodal/plans/wrapped.py b/src/dodal/plans/wrapped.py index 6a0f1a1302b..de7d95bdf6d 100644 --- a/src/dodal/plans/wrapped.py +++ b/src/dodal/plans/wrapped.py @@ -61,16 +61,29 @@ def count( yield from bp.count(tuple(detectors), num, delay=delay, md=metadata) -def _make_num_scan_args(params: dict[Movable | Motor, list[float | int]]): +def _make_num_scan_args( + params: dict[Movable | Motor, list[float | int]], num: int | None = None +): shape = [] - for param in params: - if len(params[param]) == 3: - shape.append(params[param][-1]) + if num: + shape = [num] + for param in params: + if len(params[param]) == 2: + pass + else: + raise ValueError("You must provide 'start stop' for each motor.") + else: + for param in params: + if len(params[param]) == 3: + shape.append(params[param][-1]) + else: + raise ValueError( + "You must provide 'start stop step' for each motor in a grid scan." + ) + args = [] - for param, movable_num in zip(params, range(len(params)), strict=True): + for param in params: args.append(param) - if movable_num == 0 and len(shape) == 1: - params[param].pop() args.extend(params[param]) return args, shape @@ -88,29 +101,53 @@ def num_scan( params: Annotated[ dict[Movable | Motor, list[float | int]], Field( - description="Dictionary of 'device: paramater' keys. For concurrent " - "trajectories, provide '{movable1: [start1, stop1, num], movable2: [start2," - "stop2], ... , movableN: [startN, stopN]}'. For independent trajectories," - "provide '{movable1: [start1, stop1, num1], ... , movableN: [startN, stopN," - "numN]}'." + description="Dictionary of 'device: parameter' keys. For concurrent " + "trajectories, provide '{movable1: [start1, stop1], movable2: [start2," + "stop2], ... , movableN: [startN, stopN]}'." + ), + ], + num: int | None = None, + metadata: dict[str, Any] | None = None, +) -> MsgGenerator: + """Scan concurrent single or multi-motor trajector(y/ies). + The scan is defined by number of points along scan trajector(y/ies). + Wraps bluesky.plans.scan(det, *args, num, md=metadata).""" + args, shape = _make_num_scan_args(params, num) + metadata = metadata or {} + metadata["shape"] = shape + + yield from bp.scan(tuple(detectors), *args, num=num, md=metadata) + + +@attach_data_session_metadata_decorator() +@validate_call(config={"arbitrary_types_allowed": True}) +def num_grid_scan( + detectors: Annotated[ + Sequence[Readable | AsyncReadable], + Field( + description="Set of readable devices, will take a reading at each point", + min_length=1, + ), + ], + params: Annotated[ + dict[Movable | Motor, list[float | int]], + Field( + description="Dictionary of 'device: parameter' keys. For independent " + "trajectories, provide '{movable1: [start1, stop1, num1], ... , movableN: " + "[startN, stopN, numN]}'." ), ], snake_axes: list | bool = True, metadata: dict[str, Any] | None = None, ) -> MsgGenerator: - """Scan over concurrent or independent multi-motor trajectories. - Wraps bluesky.plans.scan(det, *args, num, md=metadata) and - bluesky.plans.grid_scan(det, *args, md=metadata)""" + """Scan independent multi-motor trajectories. + The scan is defined by number of points along scan trajectories. + Wraps bluesky.plans.grid_scan(det, *args, snake_axes, md=metadata).""" args, shape = _make_num_scan_args(params) metadata = metadata or {} metadata["shape"] = shape - if len(shape) == 1: - yield from bp.scan(tuple(detectors), *args, num=shape[0], md=metadata) - elif len(shape) > 1: - yield from bp.grid_scan( - tuple(detectors), *args, snake_axes=snake_axes, md=metadata - ) + yield from bp.grid_scan(tuple(detectors), *args, snake_axes=snake_axes, md=metadata) @attach_data_session_metadata_decorator() @@ -126,41 +163,72 @@ def num_rscan( params: Annotated[ dict[Movable | Motor, list[float | int]], Field( - description="Dictionary of 'device: paramater' keys. For concurrent " - "trajectories, provide '{movable1: [start1, stop1, num], movable2: [start2," - "stop2], ... , movableN: [startN, stopN]}'. For independent trajectories," - "provide '{movable1: [start1, stop1, num1], ... , movableN: [startN, stopN," - "numN]}'." + description="Dictionary of 'device: parameter' keys. For concurrent " + "trajectories, provide '{movable1: [start1, stop1], movable2: [start2," + "stop2], ... , movableN: [startN, stopN]}'." + ), + ], + num: int | None = None, + metadata: dict[str, Any] | None = None, +) -> MsgGenerator: + """Scan concurrent trajector(y/ies), relative to current position(s). + The scan is defined by number of points along scan trajector(y/ies). + Wraps bluesky.plans.rel_scan(det, *args, num, md=metadata).""" + args, shape = _make_num_scan_args(params, num) + metadata = metadata or {} + metadata["shape"] = shape + + yield from bp.rel_scan(tuple(detectors), *args, num=num, md=metadata) + + +@attach_data_session_metadata_decorator() +@validate_call(config={"arbitrary_types_allowed": True}) +def num_grid_rscan( + detectors: Annotated[ + Sequence[Readable | AsyncReadable], + Field( + description="Set of readable devices, will take a reading at each point", + min_length=1, + ), + ], + params: Annotated[ + dict[Movable | Motor, list[float | int]], + Field( + description="Dictionary of 'device: parameter' keys. For independent " + "trajectories, provide '{movable1: [start1, stop1, num1], ... , movableN: " + "[startN, stopN, numN]}'." ), ], snake_axes: list | bool = True, metadata: dict[str, Any] | None = None, ) -> MsgGenerator: - """Scan over concurrent or independent trajectories relative to current position. - Wraps bluesky.plans.rel_scan(det, *args, num, md=metadata) and - bluesky.plans.rel_grid_scan(det, *args, md=metadata)""" + """Scan independent trajectories, relative to current positions. + The scan is defined by number of points along scan trajectories. + Wraps bluesky.plans.rel_grid_scan(det, *args, md=metadata).""" args, shape = _make_num_scan_args(params) metadata = metadata or {} metadata["shape"] = shape - if len(shape) == 1: - yield from bp.rel_scan(tuple(detectors), *args, num=shape[0], md=metadata) - elif len(shape) > 1: - yield from bp.rel_grid_scan( - tuple(detectors), *args, snake_axes=snake_axes, md=metadata - ) + yield from bp.rel_grid_scan( + tuple(detectors), *args, snake_axes=snake_axes, md=metadata + ) -def _make_list_scan_args(params: dict[Movable | Motor, list[float | int]], grid: bool): +def _make_list_scan_args( + params: dict[Movable | Motor, list[float | int]], grid: bool | None = None +): shape = [] args = [] - for param, num in zip(params, range(len(params)), strict=True): - if num == 0: - shape.append(len(params[param])) - elif num >= 1 and grid: - shape.append(len(params[param])) + for param in params: + shape.append(len(params[param])) args.append(param) args.append(params[param]) + + if not grid: + shape = list(set(shape)) + if len(shape) > 1: + raise ValueError("Lists of motor positions are not equal in length.") + return args, shape @@ -177,30 +245,54 @@ def list_scan( params: Annotated[ dict[Movable | Motor, list[float | int]], Field( - description="Dictionary of 'device: paramater' keys. For all trajectories, " + description="Dictionary of 'device: parameter' keys. For all trajectories, " + "provide '{movable1: [point1, point2, ... ], movableN: [point1, point2, " + "...]}'. Number of points for each movable must be equal." + ), + ], + metadata: dict[str, Any] | None = None, +) -> MsgGenerator: + """Scan concurrent single or multi-motor trajector(y/ies). + The scan is defined by providing a list of points for each scan trajectory. + Wraps bluesky.plans.list_scan(det, *args, md=metadata).""" + args, shape = _make_list_scan_args(params=params) + metadata = metadata or {} + metadata["shape"] = shape + + yield from bp.list_scan(tuple(detectors), *args, md=metadata) + + +@attach_data_session_metadata_decorator() +@validate_call(config={"arbitrary_types_allowed": True}) +def list_grid_scan( + detectors: Annotated[ + Sequence[Readable | AsyncReadable], + Field( + description="Set of readable devices, will take a reading at each point", + min_length=1, + ), + ], + params: Annotated[ + dict[Movable | Motor, list[float | int]], + Field( + description="Dictionary of 'device: parameter' keys. For all trajectories, " "provide '{movable1: [point1, point2, ... ], movableN: [point1, point2, " "...]}'." ), ], - grid: bool = False, snake_axes: bool = True, # Currently specifying axes to snake is not supported metadata: dict[str, Any] | None = None, ) -> MsgGenerator: - """Scan over concurrent or independent trajectories relative to current position. - To scan over concurrent trajectories, grid = False, and independent trajectories, - grid = True. - Wraps bluesky.plans.list_scan(det, *args, num, md=metadata) and - bluesky.plans.list_grid_scan(det, *args, md=metadata)""" - args, shape = _make_list_scan_args(params=params, grid=grid) + """Scan independent trajectories. + The scan is defined by providing a list of points for each scan trajectory. + Wraps bluesky.plans.list_grid_scan(det, *args, md=metadata).""" + args, shape = _make_list_scan_args(params=params, grid=True) metadata = metadata or {} metadata["shape"] = shape - if len(shape) == 1: - yield from bp.list_scan(tuple(detectors), *args, md=metadata) - elif len(shape) > 1: - yield from bp.list_grid_scan( - tuple(detectors), *args, snake_axes=snake_axes, md=metadata - ) + yield from bp.list_grid_scan( + tuple(detectors), *args, snake_axes=snake_axes, md=metadata + ) @attach_data_session_metadata_decorator() @@ -216,30 +308,54 @@ def list_rscan( params: Annotated[ dict[Movable | Motor, list[float | int]], Field( - description="Dictionary of 'device: paramater' keys. For all trajectories, " + description="Dictionary of 'device: parameter' keys. For all trajectories, " + "provide '{movable1: [point1, point2, ... ], movableN: [point1, point2, " + "...]}'. Number of points for each movable must be equal." + ), + ], + metadata: dict[str, Any] | None = None, +) -> MsgGenerator: + """Scan concurrent trajector(y/ies), relative to current position. + The scan is defined by providing a list of points for each scan trajectory. + Wraps bluesky.plans.rel_list_scan(det, *args, md=metadata).""" + args, shape = _make_list_scan_args(params=params) + metadata = metadata or {} + metadata["shape"] = shape + + yield from bp.rel_list_scan(tuple(detectors), *args, md=metadata) + + +@attach_data_session_metadata_decorator() +@validate_call(config={"arbitrary_types_allowed": True}) +def list_grid_rscan( + detectors: Annotated[ + Sequence[Readable | AsyncReadable], + Field( + description="Set of readable devices, will take a reading at each point", + min_length=1, + ), + ], + params: Annotated[ + dict[Movable | Motor, list[float | int]], + Field( + description="Dictionary of 'device: parameter' keys. For all trajectories, " "provide '{movable1: [point1, point2, ... ], movableN: [point1, point2, " "...]}'." ), ], - grid: bool = False, snake_axes: bool = True, # Currently specifying axes to snake is not supported metadata: dict[str, Any] | None = None, ) -> MsgGenerator: - """Scan over concurrent or independent trajectories relative to current position. - To scan over concurrent trajectories, grid = False, and independent trajectories, - grid = True. - Wraps bluesky.plans.rel_list_scan(det, *args, num, md=metadata) and - bluesky.plans.rel_list_grid_scan(det, *args, md=metadata)""" - args, shape = _make_list_scan_args(params=params, grid=grid) + """Scan independent trajectories, relative to current positions. + The scan is defined by providing a list of points for each scan trajectory. + Wraps bluesky.plans.rel_list_grid_scan(det, *args, md=metadata).""" + args, shape = _make_list_scan_args(params=params, grid=True) metadata = metadata or {} metadata["shape"] = shape - if len(shape) == 1: - yield from bp.rel_list_scan(tuple(detectors), *args, md=metadata) - elif len(shape) > 1: - yield from bp.rel_list_grid_scan( - tuple(detectors), *args, snake_axes=snake_axes, md=metadata - ) + yield from bp.rel_list_grid_scan( + tuple(detectors), *args, snake_axes=snake_axes, md=metadata + ) def _make_stepped_list( @@ -264,7 +380,7 @@ def round_list_elements(stepped_list, step): step = stop - start step = abs(step) * np.sign(stop - start) stepped_list = np.arange(start=start, stop=stop, step=step).tolist() - if abs((stepped_list[-1] + step) - stop) <= abs(step * 0.01): + if abs((stepped_list[-1] + step) - stop) <= abs(step * 0.05): stepped_list.append(stepped_list[-1] + step) rounded_stepped_list = round_list_elements(stepped_list=stepped_list, step=step) elif len(params) == 2 and num: @@ -279,7 +395,9 @@ def round_list_elements(stepped_list, step): return rounded_stepped_list, len(rounded_stepped_list) -def _make_step_scan_args(params: dict[Movable | Motor, list[float | int]]): +def _make_step_scan_args( + params: dict[Movable | Motor, list[float | int]], grid: bool | None = None +): args = [] shape = [] stepped_list_length = None @@ -297,27 +415,29 @@ def _make_step_scan_args(params: dict[Movable | Motor, list[float | int]]): f"You provided {len(params[param])} parameters, rather than 3." ) elif movable_num >= 1: - if len(params[param]) == 2: - stepped_list, stepped_list_length = _make_stepped_list( - params=params[param], num=stepped_list_length - ) - args.append(param) - args.append(stepped_list) - elif len(params[param]) == 3: - stepped_list, stepped_list_length = _make_stepped_list( - params=params[param] - ) - args.append(param) - args.append(stepped_list) - shape.append(stepped_list_length) + if grid: + if len(params[param]) == 3: + stepped_list, stepped_list_length = _make_stepped_list( + params=params[param] + ) + args.append(param) + args.append(stepped_list) + shape.append(stepped_list_length) + else: + raise ValueError( + f"You provided {len(params[param])} parameters, rather than 3." + ) else: - raise ValueError( - f"You provided {len(params[param])} parameters, rather than 2 or 3." - ) - if (len(args) / len(shape)) not in [2, len(args)]: - raise ValueError( - "Incorrect number of parameters, unsure if scan is concurrent/independent." - ) + if len(params[param]) == 2: + stepped_list, stepped_list_length = _make_stepped_list( + params=params[param], num=stepped_list_length + ) + args.append(param) + args.append(stepped_list) + else: + raise ValueError( + f"You provided {len(params[param])} parameters, rather than 2." + ) return args, shape @@ -335,9 +455,37 @@ def step_scan( params: Annotated[ dict[Movable | Motor, list[float | int]], Field( - description="Dictionary of 'device: paramater' keys. For concurrent " + description="Dictionary of 'device: parameter' keys. For concurrent " "trajectories, provide '{movable1: [start1, stop1, step1], movable2: " - "[start2, step2], ... , movableN: [startN, stepN]}'. For independent " + "[start2, step2], ... , movableN: [startN, stepN]}'." + ), + ], + metadata: dict[str, Any] | None = None, +) -> MsgGenerator: + """Scan concurrent trajectories with specified step size. + Generates list(s) of points for each trajectory, used with + bluesky.plans.list_scan(det, *args, md=metadata).""" + args, shape = _make_step_scan_args(params) + metadata = metadata or {} + metadata["shape"] = shape + + yield from bp.list_scan(tuple(detectors), *args, md=metadata) + + +@attach_data_session_metadata_decorator() +@validate_call(config={"arbitrary_types_allowed": True}) +def step_grid_scan( + detectors: Annotated[ + Sequence[Readable | AsyncReadable], + Field( + description="Set of readable devices, will take a reading at each point", + min_length=1, + ), + ], + params: Annotated[ + dict[Movable | Motor, list[float | int]], + Field( + description="Dictionary of 'device: parameter' keys. For independent " "trajectories, provide '{movable1: [start1, stop1, step1], ... , movableN: " "[startN, stopN, stepN]}'." ), @@ -345,20 +493,16 @@ def step_scan( snake_axes: bool = True, # Currently specifying axes to snake is not supported metadata: dict[str, Any] | None = None, ) -> MsgGenerator: - """Scan over multi-motor trajectories with specified step size. - Generates lists of points for each trajectory for - bluesky.plans.list_scan(det, *args, num, md=metadata) and + """Scan independent trajectories with specified step size. + Generates list(s) of points for each trajectory, used with bluesky.plans.list_grid_scan(det, *args, md=metadata).""" - args, shape = _make_step_scan_args(params) + args, shape = _make_step_scan_args(params, grid=True) metadata = metadata or {} metadata["shape"] = shape - if len(shape) == 1: - yield from bp.list_scan(tuple(detectors), *args, md=metadata) - elif len(shape) > 1: - yield from bp.list_grid_scan( - tuple(detectors), *args, snake_axes=snake_axes, md=metadata - ) + yield from bp.list_grid_scan( + tuple(detectors), *args, snake_axes=snake_axes, md=metadata + ) @attach_data_session_metadata_decorator() @@ -374,9 +518,37 @@ def step_rscan( params: Annotated[ dict[Movable | Motor, list[float | int]], Field( - description="Dictionary of 'device: paramater' keys. For concurrent " + description="Dictionary of 'device: parameter' keys. For concurrent " "trajectories, provide '{movable1: [start1, stop1, step1], movable2: " - "[start2, step2], ... , movableN: [startN, stepN]}'. For independent " + "[start2, step2], ... , movableN: [startN, stepN]}'." + ), + ], + metadata: dict[str, Any] | None = None, +) -> MsgGenerator: + """Scan concurrent trajectories with specified step size, relative to position. + Generates list(s) of points for each trajectory, used with + bluesky.plans.rel_list_scan(det, *args, md=metadata).""" + args, shape = _make_step_scan_args(params) + metadata = metadata or {} + metadata["shape"] = shape + + yield from bp.rel_list_scan(tuple(detectors), *args, md=metadata) + + +@attach_data_session_metadata_decorator() +@validate_call(config={"arbitrary_types_allowed": True}) +def step_grid_rscan( + detectors: Annotated[ + Sequence[Readable | AsyncReadable], + Field( + description="Set of readable devices, will take a reading at each point", + min_length=1, + ), + ], + params: Annotated[ + dict[Movable | Motor, list[float | int]], + Field( + description="Dictionary of 'device: parameter' keys. For independent " "trajectories, provide '{movable1: [start1, stop1, step1], ... , movableN: " "[startN, stopN, stepN]}'." ), @@ -384,17 +556,13 @@ def step_rscan( snake_axes: bool = True, # Currently specifying axes to snake is not supported metadata: dict[str, Any] | None = None, ) -> MsgGenerator: - """Scan over multi-motor trajectories with specified step size. - Generates lists of points for each trajectory for - bluesky.plans.list_scan(det, *args, num, md=metadata) and + """Scan independent trajectories with specified step size, relative to position. + Generates list(s) of points for each trajectory, used with bluesky.plans.list_grid_scan(det, *args, md=metadata).""" - args, shape = _make_step_scan_args(params) + args, shape = _make_step_scan_args(params, grid=True) metadata = metadata or {} metadata["shape"] = shape - if len(shape) == 1: - yield from bp.rel_list_scan(tuple(detectors), *args, md=metadata) - elif len(shape) > 1: - yield from bp.rel_list_grid_scan( - tuple(detectors), *args, snake_axes=snake_axes, md=metadata - ) + yield from bp.rel_list_grid_scan( + tuple(detectors), *args, snake_axes=snake_axes, md=metadata + ) diff --git a/tests/plans/test_wrapped.py b/tests/plans/test_wrapped.py index 2c6bb772346..5ce7d8aaf86 100644 --- a/tests/plans/test_wrapped.py +++ b/tests/plans/test_wrapped.py @@ -1,5 +1,5 @@ from collections.abc import Sequence -from typing import Any, cast +from typing import cast import pytest from bluesky.protocols import Readable @@ -25,10 +25,16 @@ _make_step_scan_args, _make_stepped_list, count, + list_grid_rscan, + list_grid_scan, list_rscan, list_scan, + num_grid_rscan, + num_grid_scan, num_rscan, num_scan, + step_grid_rscan, + step_grid_scan, step_rscan, step_scan, ) @@ -174,10 +180,10 @@ def test_plan_produces_expected_datums( @pytest.mark.parametrize( - "x_list, y_list, final_shape, final_length", + "x_list, y_list, num, final_shape, final_length", ( - [[0.0, 1.1, 3], [2.2, 3.3], [3], 6], - [[0.0, 1.1, 2], [2.2, 3.3, 3], [2, 3], 8], + [[0.0, 1.1], [2.2, 3.3], 3, [3], 6], + [[0.0, 1.1, 2], [2.2, 3.3, 3], None, [2, 3], 8], ), ) def test_make_num_scan_args( @@ -185,48 +191,59 @@ def test_make_num_scan_args( y_axis: Motor, x_list: list[float | int], y_list: list[float | int], + num: int | None, final_shape: list[int], final_length: int, ): - args, shape = _make_num_scan_args({x_axis: x_list, y_axis: y_list}) + args, shape = _make_num_scan_args({x_axis: x_list, y_axis: y_list}, num=num) assert shape == final_shape assert len(args) == final_length assert args[0] == x_axis -@pytest.mark.parametrize("x_args", ([0.0, 2.2, 5], [1.1, -1.1, 3])) +@pytest.mark.parametrize("x_args, num", ([[0.0, 2.2], 5], [[1.1, -1.1], 3])) def test_num_scan_with_one_axis( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, x_args: list[float | int], + num: int, ): - run_engine(num_scan(detectors=[det], params={x_axis: x_args})) + run_engine(num_scan(detectors=[det], params={x_axis: x_args}, num=num)) @pytest.mark.parametrize( - "x_args, y_args", ([[-1.1, 1.1, 5], [2.2, -2.2]], [[0, 1.1, 5], [2.2, 3.3]]) + "x_args, y_args, num", ([[-1.1, 1.1], [2.2, -2.2], 5], [[0, 1.1], [2.2, 3.3], 5]) ) -def test_num_scan_with_two_axes_and_concurrent_trajectories( +def test_num_scan_with_two_axes( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, x_args: list[float | int], y_axis: Motor, y_args: list[float | int], + num: int, ): run_engine( num_scan( detectors=[det], params={x_axis: x_args, y_axis: y_args}, + num=num, ) ) +def test_num_scan_fails_when_given_wrong_number_of_params( + run_engine: RunEngine, det: StandardDetector, x_axis: Motor, y_axis: Motor +): + with pytest.raises(ValueError): + run_engine(num_scan(detectors=[det], params={x_axis: [-1, 1, 5]}, num=5)) + + @pytest.mark.parametrize( - "x_args, y_args", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) + "x_args, y_args,", ([[-1, 1, 0], [2, 0]], [[-1, 1, 3.5], [-1, 1]]) ) -def test_num_scan_with_two_axes_and_independent_trajectories( +def test_num_scan_fails_when_given_bad_info( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, @@ -234,18 +251,19 @@ def test_num_scan_with_two_axes_and_independent_trajectories( y_axis: Motor, y_args: list[float | int], ): - run_engine( - num_scan( - detectors=[det], - params={x_axis: x_args, y_axis: y_args}, + with pytest.raises(ValueError): + run_engine( + num_scan( + detectors=[det], + params={x_axis: x_args, y_axis: y_args}, + ) ) - ) @pytest.mark.parametrize( "x_args, y_args", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) ) -def test_num_scan_with_two_axes_and_independent_trajectories_when_not_snaking( +def test_num_grid_scan( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, @@ -254,38 +272,37 @@ def test_num_scan_with_two_axes_and_independent_trajectories_when_not_snaking( y_args: list[float | int], ): run_engine( - num_scan( - detectors=[det], params={x_axis: x_args, y_axis: y_args}, snake_axes=False + num_grid_scan( + detectors=[det], + params={x_axis: x_args, y_axis: y_args}, ) ) -def test_num_scan_fails_when_given_wrong_number_of_params( - run_engine: RunEngine, det: StandardDetector, x_axis: Motor, y_axis: Motor -): - with pytest.raises(ValueError): - run_engine( - num_scan(detectors=[det], params={x_axis: [0, 1.1, 2], y_axis: [1.1]}) - ) - - @pytest.mark.parametrize( - "x_args, y_args,", ([[-1, 1, 0], [2, 0]], [[-1, 1, 3.5], [-1, 1]]) + "x_args, y_args", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) ) -def test_num_scan_fails_when_given_bad_info( +def test_num_grid_scan_when_not_snaking( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, x_args: list[float | int], y_axis: Motor, y_args: list[float | int], +): + run_engine( + num_grid_scan( + detectors=[det], params={x_axis: x_args, y_axis: y_args}, snake_axes=False + ) + ) + + +def test_num_grid_scan_fails_when_given_wrong_number_of_params( + run_engine: RunEngine, det: StandardDetector, x_axis: Motor, y_axis: Motor ): with pytest.raises(ValueError): run_engine( - num_scan( - detectors=[det], - params={x_axis: x_args, y_axis: y_args}, - ) + num_grid_scan(detectors=[det], params={x_axis: [0, 1.1, 2], y_axis: [1.1]}) ) @@ -302,7 +319,7 @@ def test_num_scan_fails_when_asked_to_snake_slow_axis( ): with pytest.raises(ValueError): run_engine( - num_scan( + num_grid_scan( detectors=[det], params={x_axis: x_args, y_axis: y_args}, snake_axes=[x_axis], @@ -310,58 +327,60 @@ def test_num_scan_fails_when_asked_to_snake_slow_axis( ) -@pytest.mark.parametrize("x_args", ([0.0, 2.2, 5], [1.1, -1.1, 3])) -def test_num_rscan_with_one_axis( +@pytest.mark.parametrize("x_args, num", ([[0.0, 2.2], 5], [[1.1, -1.1], 3])) +def test_num_rscan( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, x_args: list[float | int], + num: int, ): - run_engine(num_rscan(detectors=[det], params={x_axis: x_args})) + run_engine(num_rscan(detectors=[det], params={x_axis: x_args}, num=num)) @pytest.mark.parametrize( - "x_args, y_args", ([[-1.1, 1.1, 5], [2.2, -2.2]], [[0, 1.1, 5], [2.2, 3.3]]) + "x_args, y_args, num", ([[-1.1, 1.1], [2.2, -2.2], 5], [[0, 1.1], [2.2, 3.3], 5]) ) -def test_num_rscan_with_two_axes_and_concurrent_trajectories( +def test_num_rscan_with_two_axes( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, x_args: list[float | int], y_axis: Motor, y_args: list[float | int], + num: int, ): run_engine( - num_rscan( - detectors=[det], - params={x_axis: x_args, y_axis: y_args}, - ) + num_rscan(detectors=[det], params={x_axis: x_args, y_axis: y_args}, num=num) ) @pytest.mark.parametrize( - "x_args, y_args", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) + "x_args, y_args, num", ([[-1, 1], [2, 0], 0], [[-1, 1], [-1, 1], 3.5]) ) -def test_num_rscan_with_two_axes_and_independent_trajectories( +def test_num_rscan_fails_when_given_bad_info( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, x_args: list[float | int], y_axis: Motor, y_args: list[float | int], + num: int, ): - run_engine( - num_rscan( - detectors=[det], - params={x_axis: x_args, y_axis: y_args}, + with pytest.raises(ValueError): + run_engine( + num_rscan( + detectors=[det], + params={x_axis: x_args, y_axis: y_args}, + num=num, + ) ) - ) @pytest.mark.parametrize( "x_args, y_args", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) ) -def test_num_rscan_with_two_axes_and_independent_trajectories_when_not_snaking( +def test_num_grid_rscan( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, @@ -370,25 +389,17 @@ def test_num_rscan_with_two_axes_and_independent_trajectories_when_not_snaking( y_args: list[float | int], ): run_engine( - num_rscan( - detectors=[det], params={x_axis: x_args, y_axis: y_args}, snake_axes=False + num_grid_rscan( + detectors=[det], + params={x_axis: x_args, y_axis: y_args}, ) ) -def test_num_rscan_fails_when_given_wrong_number_of_params( - run_engine: RunEngine, det: StandardDetector, x_axis: Motor, y_axis: Motor -): - with pytest.raises(ValueError): - run_engine( - num_rscan(detectors=[det], params={x_axis: [0, 1.1, 2], y_axis: [1.1]}) - ) - - @pytest.mark.parametrize( - "x_args, y_args,", ([[-1, 1, 0], [2, 0]], [[-1, 1, 3.5], [-1, 1]]) + "x_args, y_args", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) ) -def test_num_rscan_fails_when_given_bad_info( +def test_num_grid_rscan_when_not_snaking( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, @@ -396,19 +407,17 @@ def test_num_rscan_fails_when_given_bad_info( y_axis: Motor, y_args: list[float | int], ): - with pytest.raises(ValueError): - run_engine( - num_rscan( - detectors=[det], - params={x_axis: x_args, y_axis: y_args}, - ) + run_engine( + num_grid_rscan( + detectors=[det], params={x_axis: x_args, y_axis: y_args}, snake_axes=False ) + ) @pytest.mark.parametrize( "x_args, y_args", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) ) -def test_num_rscan_fails_when_asked_to_snake_slow_axis( +def test_num_grid_rscan_fails_when_asked_to_snake_slow_axis( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, @@ -418,7 +427,7 @@ def test_num_rscan_fails_when_asked_to_snake_slow_axis( ): with pytest.raises(ValueError): run_engine( - num_rscan( + num_grid_rscan( detectors=[det], params={x_axis: x_args, y_axis: y_args}, snake_axes=[x_axis], @@ -428,7 +437,7 @@ def test_num_rscan_fails_when_asked_to_snake_slow_axis( @pytest.mark.parametrize( "x_args, y_args, grid, final_shape, final_length", - ([[0, 1, 2], [3, 4, 5], False, [3], 4], [[0, 1, 2], [3, 4, 5, 6], True, [3, 4], 4]), + ([[0, 1, 2], [3, 4, 5], None, [3], 4], [[0, 1, 2], [3, 4, 5, 6], True, [3, 4], 4]), ) def test_make_list_scan_args( x_axis: Motor, @@ -446,9 +455,20 @@ def test_make_list_scan_args( assert shape == final_shape +def test_make_list_scan_args_fails_when_lists_are_different_lengths( + x_axis: Motor, + y_axis: Motor, +): + with pytest.raises(ValueError): + _make_list_scan_args(params={x_axis: [0, 1, 2], y_axis: [0, 1, 2, 3]}) + + @pytest.mark.parametrize("x_list", ([0, 1, 2, 3], [1.1, 2.2, 3.3])) def test_list_scan( - run_engine: RunEngine, det: StandardDetector, x_axis: Motor, x_list: Any + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_list: list, ): run_engine(list_scan(detectors=[det], params={x_axis: x_list})) @@ -460,7 +480,7 @@ def test_list_scan( [[-1.1, -2.2, -3.3, -4.4, -5.5], [1.1, 2.2, 3.3, 4.4, 5.5]], ), ) -def test_list_scan_with_two_axes_and_concurrent_trajectories( +def test_list_scan_with_two_axes( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, @@ -471,7 +491,7 @@ def test_list_scan_with_two_axes_and_concurrent_trajectories( run_engine(list_scan(detectors=[det], params={x_axis: x_list, y_axis: y_list})) -def test_list_scan_with_concurrent_trajectories_fails_with_differnt_list_lengths( +def test_list_scan_fails_with_differnt_list_lengths( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, @@ -493,7 +513,7 @@ def test_list_scan_with_concurrent_trajectories_fails_with_differnt_list_lengths [[-1.1, -2.2, -3.3, -4.4, -5.5], [1.1, 2.2, 3.3, 4.4, 5.5]], ), ) -def test_list_scan_with_two_axes_and_independent_trajectories( +def test_list_grid_scan( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, @@ -501,14 +521,12 @@ def test_list_scan_with_two_axes_and_independent_trajectories( y_axis: Motor, y_list: list, ): - run_engine( - list_scan(detectors=[det], params={x_axis: x_list, y_axis: y_list}, grid=True) - ) + run_engine(list_grid_scan(detectors=[det], params={x_axis: x_list, y_axis: y_list})) @pytest.mark.parametrize("x_list", ([0, 1, 2, 3], [1.1, 2.2, 3.3])) def test_list_rscan( - run_engine: RunEngine, det: StandardDetector, x_axis: Motor, x_list: Any + run_engine: RunEngine, det: StandardDetector, x_axis: Motor, x_list: list ): run_engine(list_rscan(detectors=[det], params={x_axis: x_list})) @@ -520,7 +538,7 @@ def test_list_rscan( [[-1.1, -2.2, -3.3, -4.4, -5.5], [1.1, 2.2, 3.3, 4.4, 5.5]], ), ) -def test_list_rscan_with_two_axes_and_concurrent_trajectories( +def test_list_rscan_with_two_axes( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, @@ -531,7 +549,7 @@ def test_list_rscan_with_two_axes_and_concurrent_trajectories( run_engine(list_rscan(detectors=[det], params={x_axis: x_list, y_axis: y_list})) -def test_list_rscan_with_concurrent_trajectories_fails_with_differnt_list_lengths( +def test_list_rscan_fails_with_differnt_list_lengths( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, @@ -553,7 +571,7 @@ def test_list_rscan_with_concurrent_trajectories_fails_with_differnt_list_length [[-1.1, -2.2, -3.3, -4.4, -5.5], [1.1, 2.2, 3.3, 4.4, 5.5]], ), ) -def test_list_rscan_with_two_axes_and_independent_trajectories( +def test_list_grid_rscan( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, @@ -562,7 +580,7 @@ def test_list_rscan_with_two_axes_and_independent_trajectories( y_list: list, ): run_engine( - list_rscan(detectors=[det], params={x_axis: x_list, y_axis: y_list}, grid=True) + list_grid_rscan(detectors=[det], params={x_axis: x_list, y_axis: y_list}) ) @@ -577,7 +595,7 @@ def test_list_rscan_with_two_axes_and_independent_trajectories( [2, -2, 0.2], ), ) -def test_make_stepped_list_when_given_three_params(params: list[Any]): +def test_make_stepped_list_when_given_three_params(params: list): stepped_list, stepped_list_length = _make_stepped_list(params=params) assert stepped_list_length == 21 assert stepped_list[0] / stepped_list[-1] == -1 @@ -585,7 +603,7 @@ def test_make_stepped_list_when_given_three_params(params: list[Any]): @pytest.mark.parametrize("params", ([-1, 0.1], [-2, 0.2], [1, -0.1], [2, -0.2])) -def test_make_stepped_list_when_given_two_params(params: list[Any]): +def test_make_stepped_list_when_given_two_params(params: list): stepped_list, stepped_list_length = _make_stepped_list(params=params, num=21) assert stepped_list_length == 21 assert stepped_list[0] / stepped_list[-1] == -1 @@ -609,12 +627,12 @@ def test_make_stepped_list_fails_when_given_equal_start_and_stop_values(): @pytest.mark.parametrize( - "x_args, y_args, final_shape, final_length", + "x_args, y_args, grid, final_shape, final_length", ( - [[0, 1, 0.25], [0, 0.1], [5], 4], - [[0, 1, 0.25], [0, 1, 0.2], [5, 6], 4], - [[0, -1, -0.25], [0, -0.1], [5], 4], - [[0, -1, -0.25], [0, -1, -0.2], [5, 6], 4], + [[0, 1, 0.25], [0, 0.1], None, [5], 4], + [[0, 1, 0.25], [0, 1, 0.2], True, [5, 6], 4], + [[0, -1, -0.25], [0, -0.1], None, [5], 4], + [[0, -1, -0.25], [0, -1, -0.2], True, [5, 6], 4], ), ) def test_make_step_scan_args( @@ -622,10 +640,13 @@ def test_make_step_scan_args( x_args: list, y_axis: Motor, y_args: list, + grid: bool | None, final_shape: list, final_length: int, ): - args, shape = _make_step_scan_args(params={x_axis: x_args, y_axis: y_args}) + args, shape = _make_step_scan_args( + params={x_axis: x_args, y_axis: y_args}, grid=grid + ) assert shape == final_shape assert len(args) == final_length assert args[0] == x_axis @@ -633,12 +654,12 @@ def test_make_step_scan_args( @pytest.mark.parametrize( - "x_args, y_args, z_args", + "x_args, y_args, z_args, grid", ( - [[0, 1], [0, 0.2], [0, 1, 0.5]], - [[0, 1, 0.25], [0, 1, 0.2, 1], [0, 1, 0.5]], - [[0, 1, 0.25], [0, 0.2], [0, 1, 0.5]], - [[0, 1, 0.25], [0, 1, 0.2], [0, 0.5]], + [[0, 1], [0, 0.2], [0, 0.5], None], + [[0, 1, 0.25], [0, 0.2], [0, 1, 0.2, 0.5], None], + [[0, 1, 0.25], [0, 0.2], [0, 1, 0.5], True], + [[0, 1, 0.25], [0, 1, 0.2], [0, 0.5], True], ), ) def test_make_step_scan_args_fails_when_given_incorrect_number_of_parameters( @@ -648,9 +669,12 @@ def test_make_step_scan_args_fails_when_given_incorrect_number_of_parameters( y_args: list, z_axis: Motor, z_args: list, + grid: bool | None, ): with pytest.raises(ValueError): - _make_step_scan_args(params={x_axis: x_args, y_axis: y_args, z_axis: z_args}) + _make_step_scan_args( + params={x_axis: x_args, y_axis: y_args, z_axis: z_args}, grid=grid + ) @pytest.mark.parametrize("x_args", ([0, 1, 0.1], [-1, 1, 0.1], [0, 10, 1])) @@ -658,7 +682,7 @@ def test_step_scan( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, - x_args: list[Any], + x_args: list, ): run_engine(step_scan(detectors=[det], params={x_axis: x_args})) @@ -667,7 +691,7 @@ def test_step_scan( "x_args, y_args", ([[0, 1, 0.25], [0, 0.1]], [[-1, 1, 0.25], [-1, 0.1]], [[0, 10, 2.5], [0, 1]]), ) -def test_step_scan_with_multiple_axes_and_concurrent_trajectories( +def test_step_scan_with_multiple_axes( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, @@ -686,7 +710,7 @@ def test_step_scan_with_multiple_axes_and_concurrent_trajectories( [[0, 10, 2.5], [0, -10, -2.5]], ), ) -def test_step_scan_with_multiple_axes_and_independent_trajectories( +def test_step_grid_scan( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, @@ -694,7 +718,7 @@ def test_step_scan_with_multiple_axes_and_independent_trajectories( y_axis: Motor, y_args: list, ): - run_engine(step_scan(detectors=[det], params={x_axis: x_args, y_axis: y_args})) + run_engine(step_grid_scan(detectors=[det], params={x_axis: x_args, y_axis: y_args})) @pytest.mark.parametrize( @@ -704,7 +728,7 @@ def test_step_scan_with_multiple_axes_and_independent_trajectories( [[-1, 1, 0.25], [1, -1, -0.5]], ), ) -def test_step_scan_with_multiple_axes_and_independent_trajectories_when_not_snaking( +def test_step_grid_scan_when_not_snaking( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, @@ -713,7 +737,7 @@ def test_step_scan_with_multiple_axes_and_independent_trajectories_when_not_snak y_args: list, ): run_engine( - step_scan( + step_grid_scan( detectors=[det], params={x_axis: x_args, y_axis: y_args}, snake_axes=False ) ) @@ -722,7 +746,7 @@ def test_step_scan_with_multiple_axes_and_independent_trajectories_when_not_snak @pytest.mark.parametrize( "x_args, y_args", ([[0, 1, 0.1], [0, 1, 0.1, 1]], [[0, 1, 0.1], [0]]) ) -def test_step_scan_fails_when_given_incorrect_number_of_params( +def test_step_grid_scan_fails_when_given_incorrect_number_of_params( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, @@ -731,7 +755,9 @@ def test_step_scan_fails_when_given_incorrect_number_of_params( y_args: list, ): with pytest.raises(ValueError): - run_engine(step_scan(detectors=[det], params={x_axis: x_args, y_axis: y_args})) + run_engine( + step_grid_scan(detectors=[det], params={x_axis: x_args, y_axis: y_args}) + ) @pytest.mark.parametrize("x_args", ([0, 1, 0.1], [-1, 1, 0.1], [0, 10, 1])) @@ -739,7 +765,7 @@ def test_step_rscan( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, - x_args: list[Any], + x_args: list, ): run_engine(step_rscan(detectors=[det], params={x_axis: x_args})) @@ -748,7 +774,7 @@ def test_step_rscan( "x_args, y_args", ([[0, 1, 0.25], [0, 0.1]], [[-1, 1, 0.25], [-1, 0.1]], [[0, 10, 2.5], [0, 1]]), ) -def test_step_rscan_with_multiple_axes_and_concurrent_trajectories( +def test_step_rscan_with_multiple_axes( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, @@ -767,7 +793,7 @@ def test_step_rscan_with_multiple_axes_and_concurrent_trajectories( [[0, 10, 2.5], [0, -10, -2.5]], ), ) -def test_step_rscan_with_multiple_axes_and_independent_trajectories( +def test_step_grid_rscan( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, @@ -775,7 +801,9 @@ def test_step_rscan_with_multiple_axes_and_independent_trajectories( y_axis: Motor, y_args: list, ): - run_engine(step_rscan(detectors=[det], params={x_axis: x_args, y_axis: y_args})) + run_engine( + step_grid_rscan(detectors=[det], params={x_axis: x_args, y_axis: y_args}) + ) @pytest.mark.parametrize( @@ -785,7 +813,7 @@ def test_step_rscan_with_multiple_axes_and_independent_trajectories( [[-1, 1, 0.25], [1, -1, -0.5]], ), ) -def test_step_rscan_with_multiple_axes_and_independent_trajectories_when_not_snaking( +def test_step_grid_rscan_when_not_snaking( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, @@ -794,7 +822,7 @@ def test_step_rscan_with_multiple_axes_and_independent_trajectories_when_not_sna y_args: list, ): run_engine( - step_rscan( + step_grid_rscan( detectors=[det], params={x_axis: x_args, y_axis: y_args}, snake_axes=False ) ) @@ -803,7 +831,7 @@ def test_step_rscan_with_multiple_axes_and_independent_trajectories_when_not_sna @pytest.mark.parametrize( "x_args, y_args", ([[0, 1, 0.1], [0, 1, 0.1, 1]], [[0, 1, 0.1], [0]]) ) -def test_step_rscan_fails_when_given_incorrect_number_of_params( +def test_step_grid_rscan_fails_when_given_incorrect_number_of_params( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, @@ -812,4 +840,6 @@ def test_step_rscan_fails_when_given_incorrect_number_of_params( y_args: list, ): with pytest.raises(ValueError): - run_engine(step_rscan(detectors=[det], params={x_axis: x_args, y_axis: y_args})) + run_engine( + step_grid_rscan(detectors=[det], params={x_axis: x_args, y_axis: y_args}) + ) From 3d395a798efd4efd99e988b770783597402874f3 Mon Sep 17 00:00:00 2001 From: Emily Arnold Date: Tue, 6 Jan 2026 12:05:48 +0000 Subject: [PATCH 5/7] make tests for wrapped plans more useful --- tests/plans/test_wrapped.py | 536 ++++++++++++++++++++++++++++-------- 1 file changed, 427 insertions(+), 109 deletions(-) diff --git a/tests/plans/test_wrapped.py b/tests/plans/test_wrapped.py index 5ce7d8aaf86..64eb58036a4 100644 --- a/tests/plans/test_wrapped.py +++ b/tests/plans/test_wrapped.py @@ -1,3 +1,4 @@ +from collections import defaultdict from collections.abc import Sequence from typing import cast @@ -16,6 +17,7 @@ AsyncReadable, StandardDetector, ) +from ophyd_async.testing import assert_emitted from pydantic import ValidationError from dodal.devices.motors import Motor @@ -201,37 +203,63 @@ def test_make_num_scan_args( assert args[0] == x_axis -@pytest.mark.parametrize("x_args, num", ([[0.0, 2.2], 5], [[1.1, -1.1], 3])) +@pytest.mark.parametrize("x_list, num", ([[0.0, 2.2], 5], [[1.1, -1.1], 3])) def test_num_scan_with_one_axis( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, - x_args: list[float | int], + x_list: list[float | int], num: int, ): - run_engine(num_scan(detectors=[det], params={x_axis: x_args}, num=num)) + docs = defaultdict(list) + run_engine.subscribe(lambda name, doc: docs[name].append(doc)) + + run_engine(num_scan(detectors=[det], params={x_axis: x_list}, num=num)) + + assert_emitted( + docs, + start=1, + descriptor=1, + stream_resource=2, + stream_datum=num * 2, + event=num, + stop=1, + ) @pytest.mark.parametrize( - "x_args, y_args, num", ([[-1.1, 1.1], [2.2, -2.2], 5], [[0, 1.1], [2.2, 3.3], 5]) + "x_list, y_list, num", ([[-1.1, 1.1], [2.2, -2.2], 5], [[0, 1.1], [2.2, 3.3], 5]) ) def test_num_scan_with_two_axes( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, - x_args: list[float | int], + x_list: list[float | int], y_axis: Motor, - y_args: list[float | int], + y_list: list[float | int], num: int, ): + docs = defaultdict(list) + run_engine.subscribe(lambda name, doc: docs[name].append(doc)) + run_engine( num_scan( detectors=[det], - params={x_axis: x_args, y_axis: y_args}, + params={x_axis: x_list, y_axis: y_list}, num=num, ) ) + assert_emitted( + docs, + start=1, + descriptor=1, + stream_resource=2, + stream_datum=num * 2, + event=num, + stop=1, + ) + def test_num_scan_fails_when_given_wrong_number_of_params( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, y_axis: Motor @@ -241,61 +269,89 @@ def test_num_scan_fails_when_given_wrong_number_of_params( @pytest.mark.parametrize( - "x_args, y_args,", ([[-1, 1, 0], [2, 0]], [[-1, 1, 3.5], [-1, 1]]) + "x_list, y_list,", ([[-1, 1, 0], [2, 0]], [[-1, 1, 3.5], [-1, 1]]) ) def test_num_scan_fails_when_given_bad_info( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, - x_args: list[float | int], + x_list: list[float | int], y_axis: Motor, - y_args: list[float | int], + y_list: list[float | int], ): with pytest.raises(ValueError): run_engine( num_scan( detectors=[det], - params={x_axis: x_args, y_axis: y_args}, + params={x_axis: x_list, y_axis: y_list}, ) ) @pytest.mark.parametrize( - "x_args, y_args", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) + "x_list, y_list", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) ) def test_num_grid_scan( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, - x_args: list[float | int], + x_list: list[float | int], y_axis: Motor, - y_args: list[float | int], + y_list: list[float | int], ): + docs = defaultdict(list) + run_engine.subscribe(lambda name, doc: docs[name].append(doc)) + num = int(x_list[-1] * y_list[-1]) + run_engine( num_grid_scan( detectors=[det], - params={x_axis: x_args, y_axis: y_args}, + params={x_axis: x_list, y_axis: y_list}, ) ) + assert_emitted( + docs, + start=1, + descriptor=1, + stream_resource=2, + stream_datum=num * 2, + event=num, + stop=1, + ) + @pytest.mark.parametrize( - "x_args, y_args", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) + "x_list, y_list", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) ) def test_num_grid_scan_when_not_snaking( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, - x_args: list[float | int], + x_list: list[float | int], y_axis: Motor, - y_args: list[float | int], + y_list: list[float | int], ): + docs = defaultdict(list) + run_engine.subscribe(lambda name, doc: docs[name].append(doc)) + num = int(x_list[-1] * y_list[-1]) + run_engine( num_grid_scan( - detectors=[det], params={x_axis: x_args, y_axis: y_args}, snake_axes=False + detectors=[det], params={x_axis: x_list, y_axis: y_list}, snake_axes=False ) ) + assert_emitted( + docs, + start=1, + descriptor=1, + stream_resource=2, + stream_datum=num * 2, + event=num, + stop=1, + ) + def test_num_grid_scan_fails_when_given_wrong_number_of_params( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, y_axis: Motor @@ -307,149 +363,203 @@ def test_num_grid_scan_fails_when_given_wrong_number_of_params( @pytest.mark.parametrize( - "x_args, y_args", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) + "x_list, y_list", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) ) def test_num_scan_fails_when_asked_to_snake_slow_axis( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, - x_args: list[float | int], + x_list: list[float | int], y_axis: Motor, - y_args: list[float | int], + y_list: list[float | int], ): with pytest.raises(ValueError): run_engine( num_grid_scan( detectors=[det], - params={x_axis: x_args, y_axis: y_args}, + params={x_axis: x_list, y_axis: y_list}, snake_axes=[x_axis], ) ) -@pytest.mark.parametrize("x_args, num", ([[0.0, 2.2], 5], [[1.1, -1.1], 3])) +@pytest.mark.parametrize("x_list, num", ([[0.0, 2.2], 5], [[1.1, -1.1], 3])) def test_num_rscan( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, - x_args: list[float | int], + x_list: list[float | int], num: int, ): - run_engine(num_rscan(detectors=[det], params={x_axis: x_args}, num=num)) + docs = defaultdict(list) + run_engine.subscribe(lambda name, doc: docs[name].append(doc)) + + run_engine(num_rscan(detectors=[det], params={x_axis: x_list}, num=num)) + + assert_emitted( + docs, + start=1, + descriptor=1, + stream_resource=2, + stream_datum=num * 2, + event=num, + stop=1, + ) @pytest.mark.parametrize( - "x_args, y_args, num", ([[-1.1, 1.1], [2.2, -2.2], 5], [[0, 1.1], [2.2, 3.3], 5]) + "x_list, y_list, num", ([[-1.1, 1.1], [2.2, -2.2], 5], [[0, 1.1], [2.2, 3.3], 5]) ) def test_num_rscan_with_two_axes( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, - x_args: list[float | int], + x_list: list[float | int], y_axis: Motor, - y_args: list[float | int], + y_list: list[float | int], num: int, ): + docs = defaultdict(list) + run_engine.subscribe(lambda name, doc: docs[name].append(doc)) + run_engine( - num_rscan(detectors=[det], params={x_axis: x_args, y_axis: y_args}, num=num) + num_rscan(detectors=[det], params={x_axis: x_list, y_axis: y_list}, num=num) + ) + + assert_emitted( + docs, + start=1, + descriptor=1, + stream_resource=2, + stream_datum=num * 2, + event=num, + stop=1, ) @pytest.mark.parametrize( - "x_args, y_args, num", ([[-1, 1], [2, 0], 0], [[-1, 1], [-1, 1], 3.5]) + "x_list, y_list, num", ([[-1, 1], [2, 0], 0], [[-1, 1], [-1, 1], 3.5]) ) def test_num_rscan_fails_when_given_bad_info( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, - x_args: list[float | int], + x_list: list[float | int], y_axis: Motor, - y_args: list[float | int], + y_list: list[float | int], num: int, ): with pytest.raises(ValueError): run_engine( num_rscan( detectors=[det], - params={x_axis: x_args, y_axis: y_args}, + params={x_axis: x_list, y_axis: y_list}, num=num, ) ) @pytest.mark.parametrize( - "x_args, y_args", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) + "x_list, y_list", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) ) def test_num_grid_rscan( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, - x_args: list[float | int], + x_list: list[float | int], y_axis: Motor, - y_args: list[float | int], + y_list: list[float | int], ): + docs = defaultdict(list) + run_engine.subscribe(lambda name, doc: docs[name].append(doc)) + num = int(x_list[-1] * y_list[-1]) + run_engine( num_grid_rscan( detectors=[det], - params={x_axis: x_args, y_axis: y_args}, + params={x_axis: x_list, y_axis: y_list}, ) ) + assert_emitted( + docs, + start=1, + descriptor=1, + stream_resource=2, + stream_datum=num * 2, + event=num, + stop=1, + ) + @pytest.mark.parametrize( - "x_args, y_args", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) + "x_list, y_list", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) ) def test_num_grid_rscan_when_not_snaking( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, - x_args: list[float | int], + x_list: list[float | int], y_axis: Motor, - y_args: list[float | int], + y_list: list[float | int], ): + docs = defaultdict(list) + run_engine.subscribe(lambda name, doc: docs[name].append(doc)) + num = int(x_list[-1] * y_list[-1]) + run_engine( num_grid_rscan( - detectors=[det], params={x_axis: x_args, y_axis: y_args}, snake_axes=False + detectors=[det], params={x_axis: x_list, y_axis: y_list}, snake_axes=False ) ) + assert_emitted( + docs, + start=1, + descriptor=1, + stream_resource=2, + stream_datum=num * 2, + event=num, + stop=1, + ) + @pytest.mark.parametrize( - "x_args, y_args", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) + "x_list, y_list", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) ) def test_num_grid_rscan_fails_when_asked_to_snake_slow_axis( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, - x_args: list[float | int], + x_list: list[float | int], y_axis: Motor, - y_args: list[float | int], + y_list: list[float | int], ): with pytest.raises(ValueError): run_engine( num_grid_rscan( detectors=[det], - params={x_axis: x_args, y_axis: y_args}, + params={x_axis: x_list, y_axis: y_list}, snake_axes=[x_axis], ) ) @pytest.mark.parametrize( - "x_args, y_args, grid, final_shape, final_length", + "x_list, y_list, grid, final_shape, final_length", ([[0, 1, 2], [3, 4, 5], None, [3], 4], [[0, 1, 2], [3, 4, 5, 6], True, [3, 4], 4]), ) def test_make_list_scan_args( x_axis: Motor, - x_args: list, + x_list: list, y_axis: Motor, - y_args: list, + y_list: list, grid: bool, final_shape: list, final_length: int, ): args, shape = _make_list_scan_args( - params={x_axis: x_args, y_axis: y_args}, grid=grid + params={x_axis: x_list, y_axis: y_list}, grid=grid ) assert len(args) == final_length assert shape == final_shape @@ -470,8 +580,22 @@ def test_list_scan( x_axis: Motor, x_list: list, ): + docs = defaultdict(list) + run_engine.subscribe(lambda name, doc: docs[name].append(doc)) + num = int(len(x_list)) + run_engine(list_scan(detectors=[det], params={x_axis: x_list})) + assert_emitted( + docs, + start=1, + descriptor=1, + stream_resource=2, + stream_datum=num * 2, + event=num, + stop=1, + ) + @pytest.mark.parametrize( "x_list, y_list", @@ -488,8 +612,22 @@ def test_list_scan_with_two_axes( y_axis: Motor, y_list: list, ): + docs = defaultdict(list) + run_engine.subscribe(lambda name, doc: docs[name].append(doc)) + num = int(len(x_list)) + run_engine(list_scan(detectors=[det], params={x_axis: x_list, y_axis: y_list})) + assert_emitted( + docs, + start=1, + descriptor=1, + stream_resource=2, + stream_datum=num * 2, + event=num, + stop=1, + ) + def test_list_scan_fails_with_differnt_list_lengths( run_engine: RunEngine, @@ -521,15 +659,43 @@ def test_list_grid_scan( y_axis: Motor, y_list: list, ): + docs = defaultdict(list) + run_engine.subscribe(lambda name, doc: docs[name].append(doc)) + num = int(len(x_list) * len(y_list)) + run_engine(list_grid_scan(detectors=[det], params={x_axis: x_list, y_axis: y_list})) + assert_emitted( + docs, + start=1, + descriptor=1, + stream_resource=2, + stream_datum=num * 2, + event=num, + stop=1, + ) + @pytest.mark.parametrize("x_list", ([0, 1, 2, 3], [1.1, 2.2, 3.3])) def test_list_rscan( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, x_list: list ): + docs = defaultdict(list) + run_engine.subscribe(lambda name, doc: docs[name].append(doc)) + num = int(len(x_list)) + run_engine(list_rscan(detectors=[det], params={x_axis: x_list})) + assert_emitted( + docs, + start=1, + descriptor=1, + stream_resource=2, + stream_datum=num * 2, + event=num, + stop=1, + ) + @pytest.mark.parametrize( "x_list, y_list", @@ -546,8 +712,22 @@ def test_list_rscan_with_two_axes( y_axis: Motor, y_list: list, ): + docs = defaultdict(list) + run_engine.subscribe(lambda name, doc: docs[name].append(doc)) + num = int(len(x_list)) + run_engine(list_rscan(detectors=[det], params={x_axis: x_list, y_axis: y_list})) + assert_emitted( + docs, + start=1, + descriptor=1, + stream_resource=2, + stream_datum=num * 2, + event=num, + stop=1, + ) + def test_list_rscan_fails_with_differnt_list_lengths( run_engine: RunEngine, @@ -579,10 +759,24 @@ def test_list_grid_rscan( y_axis: Motor, y_list: list, ): + docs = defaultdict(list) + run_engine.subscribe(lambda name, doc: docs[name].append(doc)) + num = int(len(x_list) * len(y_list)) + run_engine( list_grid_rscan(detectors=[det], params={x_axis: x_list, y_axis: y_list}) ) + assert_emitted( + docs, + start=1, + descriptor=1, + stream_resource=2, + stream_datum=num * 2, + event=num, + stop=1, + ) + @pytest.mark.parametrize( "params", @@ -627,7 +821,7 @@ def test_make_stepped_list_fails_when_given_equal_start_and_stop_values(): @pytest.mark.parametrize( - "x_args, y_args, grid, final_shape, final_length", + "x_list, y_list, grid, final_shape, final_length", ( [[0, 1, 0.25], [0, 0.1], None, [5], 4], [[0, 1, 0.25], [0, 1, 0.2], True, [5, 6], 4], @@ -637,15 +831,15 @@ def test_make_stepped_list_fails_when_given_equal_start_and_stop_values(): ) def test_make_step_scan_args( x_axis: Motor, - x_args: list, + x_list: list, y_axis: Motor, - y_args: list, + y_list: list, grid: bool | None, final_shape: list, final_length: int, ): args, shape = _make_step_scan_args( - params={x_axis: x_args, y_axis: y_args}, grid=grid + params={x_axis: x_list, y_axis: y_list}, grid=grid ) assert shape == final_shape assert len(args) == final_length @@ -654,7 +848,7 @@ def test_make_step_scan_args( @pytest.mark.parametrize( - "x_args, y_args, z_args, grid", + "x_list, y_list, z_list, grid", ( [[0, 1], [0, 0.2], [0, 0.5], None], [[0, 1, 0.25], [0, 0.2], [0, 1, 0.2, 0.5], None], @@ -664,182 +858,306 @@ def test_make_step_scan_args( ) def test_make_step_scan_args_fails_when_given_incorrect_number_of_parameters( x_axis: Motor, - x_args: list, + x_list: list, y_axis: Motor, - y_args: list, + y_list: list, z_axis: Motor, - z_args: list, + z_list: list, grid: bool | None, ): with pytest.raises(ValueError): _make_step_scan_args( - params={x_axis: x_args, y_axis: y_args, z_axis: z_args}, grid=grid + params={x_axis: x_list, y_axis: y_list, z_axis: z_list}, grid=grid ) -@pytest.mark.parametrize("x_args", ([0, 1, 0.1], [-1, 1, 0.1], [0, 10, 1])) +@pytest.mark.parametrize( + "x_list, num", ([[0, 1, 0.1], 11], [[-1, 1, 0.1], 21], [[0, 10, 1], 11]) +) def test_step_scan( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, - x_args: list, + x_list: list, + num, ): - run_engine(step_scan(detectors=[det], params={x_axis: x_args})) + docs = defaultdict(list) + run_engine.subscribe(lambda name, doc: docs[name].append(doc)) + + run_engine(step_scan(detectors=[det], params={x_axis: x_list})) + + assert_emitted( + docs, + start=1, + descriptor=1, + stream_resource=2, + stream_datum=num * 2, + event=num, + stop=1, + ) @pytest.mark.parametrize( - "x_args, y_args", - ([[0, 1, 0.25], [0, 0.1]], [[-1, 1, 0.25], [-1, 0.1]], [[0, 10, 2.5], [0, 1]]), + "x_list, y_list, num", + ( + [[0, 1, 0.25], [0, 0.1], 5], + [[-1, 1, 0.25], [-1, 0.1], 9], + [[0, 10, 2.5], [0, 1], 5], + ), ) def test_step_scan_with_multiple_axes( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, - x_args: list, + x_list: list, y_axis: Motor, - y_args: list, + y_list: list, + num, ): - run_engine(step_scan(detectors=[det], params={x_axis: x_args, y_axis: y_args})) + docs = defaultdict(list) + run_engine.subscribe(lambda name, doc: docs[name].append(doc)) + + run_engine(step_scan(detectors=[det], params={x_axis: x_list, y_axis: y_list})) + + assert_emitted( + docs, + start=1, + descriptor=1, + stream_resource=2, + stream_datum=num * 2, + event=num, + stop=1, + ) @pytest.mark.parametrize( - "x_args, y_args", + "x_list, y_list, num", ( - [[0, 1, 0.25], [0, 2, 0.5]], - [[-1, 1, 0.25], [1, -1, -0.5]], - [[0, 10, 2.5], [0, -10, -2.5]], + [[0, 1, 0.25], [0, 2, 0.5], 25], + [[-1, 1, 0.25], [1, -1, -0.5], 45], + [[0, 10, 2.5], [0, -10, -2.5], 25], ), ) def test_step_grid_scan( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, - x_args: list, + x_list: list, y_axis: Motor, - y_args: list, + y_list: list, + num, ): - run_engine(step_grid_scan(detectors=[det], params={x_axis: x_args, y_axis: y_args})) + docs = defaultdict(list) + run_engine.subscribe(lambda name, doc: docs[name].append(doc)) + + run_engine(step_grid_scan(detectors=[det], params={x_axis: x_list, y_axis: y_list})) + + assert_emitted( + docs, + start=1, + descriptor=1, + stream_resource=2, + stream_datum=num * 2, + event=num, + stop=1, + ) @pytest.mark.parametrize( - "x_args, y_args", + "x_list, y_list, num", ( - [[0, 1, 0.25], [0, 2, 0.5]], - [[-1, 1, 0.25], [1, -1, -0.5]], + [[0, 1, 0.25], [0, 2, 0.5], 25], + [[-1, 1, 0.25], [1, -1, -0.5], 45], ), ) def test_step_grid_scan_when_not_snaking( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, - x_args: list, + x_list: list, y_axis: Motor, - y_args: list, + y_list: list, + num, ): + docs = defaultdict(list) + run_engine.subscribe(lambda name, doc: docs[name].append(doc)) + run_engine( step_grid_scan( - detectors=[det], params={x_axis: x_args, y_axis: y_args}, snake_axes=False + detectors=[det], params={x_axis: x_list, y_axis: y_list}, snake_axes=False ) ) + assert_emitted( + docs, + start=1, + descriptor=1, + stream_resource=2, + stream_datum=num * 2, + event=num, + stop=1, + ) + @pytest.mark.parametrize( - "x_args, y_args", ([[0, 1, 0.1], [0, 1, 0.1, 1]], [[0, 1, 0.1], [0]]) + "x_list, y_list", ([[0, 1, 0.1], [0, 1, 0.1, 1]], [[0, 1, 0.1], [0]]) ) def test_step_grid_scan_fails_when_given_incorrect_number_of_params( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, - x_args: list, + x_list: list, y_axis: Motor, - y_args: list, + y_list: list, ): with pytest.raises(ValueError): run_engine( - step_grid_scan(detectors=[det], params={x_axis: x_args, y_axis: y_args}) + step_grid_scan(detectors=[det], params={x_axis: x_list, y_axis: y_list}) ) -@pytest.mark.parametrize("x_args", ([0, 1, 0.1], [-1, 1, 0.1], [0, 10, 1])) +@pytest.mark.parametrize( + "x_list, num", ([[0, 1, 0.1], 11], [[-1, 1, 0.1], 21], [[0, 10, 1], 11]) +) def test_step_rscan( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, - x_args: list, + x_list: list, + num, ): - run_engine(step_rscan(detectors=[det], params={x_axis: x_args})) + docs = defaultdict(list) + run_engine.subscribe(lambda name, doc: docs[name].append(doc)) + + run_engine(step_rscan(detectors=[det], params={x_axis: x_list})) + + assert_emitted( + docs, + start=1, + descriptor=1, + stream_resource=2, + stream_datum=num * 2, + event=num, + stop=1, + ) @pytest.mark.parametrize( - "x_args, y_args", - ([[0, 1, 0.25], [0, 0.1]], [[-1, 1, 0.25], [-1, 0.1]], [[0, 10, 2.5], [0, 1]]), + "x_list, y_list, num", + ( + [[0, 1, 0.25], [0, 0.1], 5], + [[-1, 1, 0.25], [-1, 0.1], 9], + [[0, 10, 2.5], [0, 1], 5], + ), ) def test_step_rscan_with_multiple_axes( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, - x_args: list, + x_list: list, y_axis: Motor, - y_args: list, + y_list: list, + num, ): - run_engine(step_rscan(detectors=[det], params={x_axis: x_args, y_axis: y_args})) + docs = defaultdict(list) + run_engine.subscribe(lambda name, doc: docs[name].append(doc)) + + run_engine(step_rscan(detectors=[det], params={x_axis: x_list, y_axis: y_list})) + + assert_emitted( + docs, + start=1, + descriptor=1, + stream_resource=2, + stream_datum=num * 2, + event=num, + stop=1, + ) @pytest.mark.parametrize( - "x_args, y_args", + "x_list, y_list, num", ( - [[0, 1, 0.25], [0, 2, 0.5]], - [[-1, 1, 0.25], [1, -1, -0.5]], - [[0, 10, 2.5], [0, -10, -2.5]], + [[0, 1, 0.25], [0, 2, 0.5], 25], + [[-1, 1, 0.25], [1, -1, -0.5], 45], + [[0, 10, 2.5], [0, -10, -2.5], 25], ), ) def test_step_grid_rscan( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, - x_args: list, + x_list: list, y_axis: Motor, - y_args: list, + y_list: list, + num, ): + docs = defaultdict(list) + run_engine.subscribe(lambda name, doc: docs[name].append(doc)) + run_engine( - step_grid_rscan(detectors=[det], params={x_axis: x_args, y_axis: y_args}) + step_grid_rscan(detectors=[det], params={x_axis: x_list, y_axis: y_list}) + ) + + assert_emitted( + docs, + start=1, + descriptor=1, + stream_resource=2, + stream_datum=num * 2, + event=num, + stop=1, ) @pytest.mark.parametrize( - "x_args, y_args", + "x_list, y_list, num", ( - [[0, 1, 0.25], [0, 2, 0.5]], - [[-1, 1, 0.25], [1, -1, -0.5]], + [[0, 1, 0.25], [0, 2, 0.5], 25], + [[-1, 1, 0.25], [1, -1, -0.5], 45], ), ) def test_step_grid_rscan_when_not_snaking( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, - x_args: list, + x_list: list, y_axis: Motor, - y_args: list, + y_list: list, + num, ): + docs = defaultdict(list) + run_engine.subscribe(lambda name, doc: docs[name].append(doc)) + run_engine( step_grid_rscan( - detectors=[det], params={x_axis: x_args, y_axis: y_args}, snake_axes=False + detectors=[det], params={x_axis: x_list, y_axis: y_list}, snake_axes=False ) ) + assert_emitted( + docs, + start=1, + descriptor=1, + stream_resource=2, + stream_datum=num * 2, + event=num, + stop=1, + ) + @pytest.mark.parametrize( - "x_args, y_args", ([[0, 1, 0.1], [0, 1, 0.1, 1]], [[0, 1, 0.1], [0]]) + "x_list, y_list", ([[0, 1, 0.1], [0, 1, 0.1, 1]], [[0, 1, 0.1], [0]]) ) def test_step_grid_rscan_fails_when_given_incorrect_number_of_params( run_engine: RunEngine, det: StandardDetector, x_axis: Motor, - x_args: list, + x_list: list, y_axis: Motor, - y_args: list, + y_list: list, ): with pytest.raises(ValueError): run_engine( - step_grid_rscan(detectors=[det], params={x_axis: x_args, y_axis: y_args}) + step_grid_rscan(detectors=[det], params={x_axis: x_list, y_axis: y_list}) ) From 87c4d7604dc2b7d686a9aa1afef7cb659ccd35b0 Mon Sep 17 00:00:00 2001 From: Emily Arnold Date: Tue, 6 Jan 2026 14:35:46 +0000 Subject: [PATCH 6/7] tidy up, set default wait to True for plan_stubs/wrapped.py --- src/dodal/plan_stubs/wrapped.py | 8 ++-- src/dodal/plans/wrapped.py | 80 ++++++++++++++++++--------------- 2 files changed, 48 insertions(+), 40 deletions(-) diff --git a/src/dodal/plan_stubs/wrapped.py b/src/dodal/plan_stubs/wrapped.py index 1d134c2fb7c..da80b118315 100644 --- a/src/dodal/plan_stubs/wrapped.py +++ b/src/dodal/plan_stubs/wrapped.py @@ -16,7 +16,7 @@ def set_absolute( - movable: Movable[T], value: T, group: Group | None = None, wait: bool = False + movable: Movable[T], value: T, group: Group | None = None, wait: bool = True ) -> MsgGenerator: """ Set a device, wrapper for `bp.abs_set`. @@ -27,7 +27,7 @@ def set_absolute( group (Group | None, optional): The message group to associate with the setting, for sequencing. Defaults to None. wait (bool, optional): The group should wait until all setting is complete - (e.g. a motor has finished moving). Defaults to False. + (e.g. a motor has finished moving). Defaults to True. Returns: MsgGenerator: Plan @@ -39,7 +39,7 @@ def set_absolute( def set_relative( - movable: Movable[T], value: T, group: Group | None = None, wait: bool = False + movable: Movable[T], value: T, group: Group | None = None, wait: bool = True ) -> MsgGenerator: """ Change a device, wrapper for `bp.rel_set`. @@ -50,7 +50,7 @@ def set_relative( group (Group | None, optional): The message group to associate with the setting, for sequencing. Defaults to None. wait (bool, optional): The group should wait until all setting is complete - (e.g. a motor has finished moving). Defaults to False. + (e.g. a motor has finished moving). Defaults to True. Returns: MsgGenerator: Plan diff --git a/src/dodal/plans/wrapped.py b/src/dodal/plans/wrapped.py index de7d95bdf6d..6ab0373a092 100644 --- a/src/dodal/plans/wrapped.py +++ b/src/dodal/plans/wrapped.py @@ -101,9 +101,9 @@ def num_scan( params: Annotated[ dict[Movable | Motor, list[float | int]], Field( - description="Dictionary of 'device: parameter' keys. For concurrent " - "trajectories, provide '{movable1: [start1, stop1], movable2: [start2," - "stop2], ... , movableN: [startN, stopN]}'." + description="Dictionary of 'device: parameter' keys. For concurrent \ + trajectories, provide '{movable1: [start1, stop1], movable2: [start2, \ + stop2], ... , movableN: [startN, stopN]}'." ), ], num: int | None = None, @@ -112,6 +112,7 @@ def num_scan( """Scan concurrent single or multi-motor trajector(y/ies). The scan is defined by number of points along scan trajector(y/ies). Wraps bluesky.plans.scan(det, *args, num, md=metadata).""" + # TODO: move to using Range spec and spec_scan when stable and tested at v1.0 args, shape = _make_num_scan_args(params, num) metadata = metadata or {} metadata["shape"] = shape @@ -132,9 +133,9 @@ def num_grid_scan( params: Annotated[ dict[Movable | Motor, list[float | int]], Field( - description="Dictionary of 'device: parameter' keys. For independent " - "trajectories, provide '{movable1: [start1, stop1, num1], ... , movableN: " - "[startN, stopN, numN]}'." + description="Dictionary of 'device: parameter' keys. For independent \ + trajectories, provide '{movable1: [start1, stop1, num1], ... , movableN: \ + [startN, stopN, numN]}'." ), ], snake_axes: list | bool = True, @@ -143,6 +144,7 @@ def num_grid_scan( """Scan independent multi-motor trajectories. The scan is defined by number of points along scan trajectories. Wraps bluesky.plans.grid_scan(det, *args, snake_axes, md=metadata).""" + # TODO: move to using Range spec and spec_scan when stable and tested at v1.0 args, shape = _make_num_scan_args(params) metadata = metadata or {} metadata["shape"] = shape @@ -163,9 +165,9 @@ def num_rscan( params: Annotated[ dict[Movable | Motor, list[float | int]], Field( - description="Dictionary of 'device: parameter' keys. For concurrent " - "trajectories, provide '{movable1: [start1, stop1], movable2: [start2," - "stop2], ... , movableN: [startN, stopN]}'." + description="Dictionary of 'device: parameter' keys. For concurrent \ + trajectories, provide '{movable1: [start1, stop1], movable2: [start2, \ + stop2], ... , movableN: [startN, stopN]}'." ), ], num: int | None = None, @@ -174,6 +176,7 @@ def num_rscan( """Scan concurrent trajector(y/ies), relative to current position(s). The scan is defined by number of points along scan trajector(y/ies). Wraps bluesky.plans.rel_scan(det, *args, num, md=metadata).""" + # TODO: move to using Range spec and spec_scan when stable and tested at v1.0 args, shape = _make_num_scan_args(params, num) metadata = metadata or {} metadata["shape"] = shape @@ -194,9 +197,9 @@ def num_grid_rscan( params: Annotated[ dict[Movable | Motor, list[float | int]], Field( - description="Dictionary of 'device: parameter' keys. For independent " - "trajectories, provide '{movable1: [start1, stop1, num1], ... , movableN: " - "[startN, stopN, numN]}'." + description="Dictionary of 'device: parameter' keys. For independent \ + trajectories, provide '{movable1: [start1, stop1, num1], ... , movableN: \ + [startN, stopN, numN]}'." ), ], snake_axes: list | bool = True, @@ -205,6 +208,7 @@ def num_grid_rscan( """Scan independent trajectories, relative to current positions. The scan is defined by number of points along scan trajectories. Wraps bluesky.plans.rel_grid_scan(det, *args, md=metadata).""" + # TODO: move to using Range spec and spec_scan when stable and tested at v1.0 args, shape = _make_num_scan_args(params) metadata = metadata or {} metadata["shape"] = shape @@ -245,9 +249,9 @@ def list_scan( params: Annotated[ dict[Movable | Motor, list[float | int]], Field( - description="Dictionary of 'device: parameter' keys. For all trajectories, " - "provide '{movable1: [point1, point2, ... ], movableN: [point1, point2, " - "...]}'. Number of points for each movable must be equal." + description="Dictionary of 'device: parameter' keys. For all trajectories, \ + provide '{movable1: [point1, point2, ... ], movableN: [point1, point2, \ + ...]}'. Number of points for each movable must be equal." ), ], metadata: dict[str, Any] | None = None, @@ -275,9 +279,9 @@ def list_grid_scan( params: Annotated[ dict[Movable | Motor, list[float | int]], Field( - description="Dictionary of 'device: parameter' keys. For all trajectories, " - "provide '{movable1: [point1, point2, ... ], movableN: [point1, point2, " - "...]}'." + description="Dictionary of 'device: parameter' keys. For all trajectories, \ + provide '{movable1: [point1, point2, ... ], movableN: [point1, point2, \ + ...]}'." ), ], snake_axes: bool = True, # Currently specifying axes to snake is not supported @@ -308,9 +312,9 @@ def list_rscan( params: Annotated[ dict[Movable | Motor, list[float | int]], Field( - description="Dictionary of 'device: parameter' keys. For all trajectories, " - "provide '{movable1: [point1, point2, ... ], movableN: [point1, point2, " - "...]}'. Number of points for each movable must be equal." + description="Dictionary of 'device: parameter' keys. For all trajectories, \ + provide '{movable1: [point1, point2, ... ], movableN: [point1, point2, \ + ...]}'. Number of points for each movable must be equal." ), ], metadata: dict[str, Any] | None = None, @@ -338,9 +342,9 @@ def list_grid_rscan( params: Annotated[ dict[Movable | Motor, list[float | int]], Field( - description="Dictionary of 'device: parameter' keys. For all trajectories, " - "provide '{movable1: [point1, point2, ... ], movableN: [point1, point2, " - "...]}'." + description="Dictionary of 'device: parameter' keys. For all trajectories, \ + provide '{movable1: [point1, point2, ... ], movableN: [point1, point2, \ + ...]}'." ), ], snake_axes: bool = True, # Currently specifying axes to snake is not supported @@ -455,9 +459,9 @@ def step_scan( params: Annotated[ dict[Movable | Motor, list[float | int]], Field( - description="Dictionary of 'device: parameter' keys. For concurrent " - "trajectories, provide '{movable1: [start1, stop1, step1], movable2: " - "[start2, step2], ... , movableN: [startN, stepN]}'." + description="Dictionary of 'device: parameter' keys. For concurrent \ + trajectories, provide '{movable1: [start1, stop1, step1], movable2: \ + [start2, step2], ... , movableN: [startN, stepN]}'." ), ], metadata: dict[str, Any] | None = None, @@ -465,6 +469,7 @@ def step_scan( """Scan concurrent trajectories with specified step size. Generates list(s) of points for each trajectory, used with bluesky.plans.list_scan(det, *args, md=metadata).""" + # TODO: move to using Linspace spec and spec_scan when stable and tested at v1.0 args, shape = _make_step_scan_args(params) metadata = metadata or {} metadata["shape"] = shape @@ -485,9 +490,9 @@ def step_grid_scan( params: Annotated[ dict[Movable | Motor, list[float | int]], Field( - description="Dictionary of 'device: parameter' keys. For independent " - "trajectories, provide '{movable1: [start1, stop1, step1], ... , movableN: " - "[startN, stopN, stepN]}'." + description="Dictionary of 'device: parameter' keys. For independent \ + trajectories, provide '{movable1: [start1, stop1, step1], ... , movableN: \ + [startN, stopN, stepN]}'." ), ], snake_axes: bool = True, # Currently specifying axes to snake is not supported @@ -496,6 +501,7 @@ def step_grid_scan( """Scan independent trajectories with specified step size. Generates list(s) of points for each trajectory, used with bluesky.plans.list_grid_scan(det, *args, md=metadata).""" + # TODO: move to using Linspace spec and spec_scan when stable and tested at v1.0 args, shape = _make_step_scan_args(params, grid=True) metadata = metadata or {} metadata["shape"] = shape @@ -518,9 +524,9 @@ def step_rscan( params: Annotated[ dict[Movable | Motor, list[float | int]], Field( - description="Dictionary of 'device: parameter' keys. For concurrent " - "trajectories, provide '{movable1: [start1, stop1, step1], movable2: " - "[start2, step2], ... , movableN: [startN, stepN]}'." + description="Dictionary of 'device: parameter' keys. For concurrent \ + trajectories, provide '{movable1: [start1, stop1, step1], movable2: \ + [start2, step2], ... , movableN: [startN, stepN]}'." ), ], metadata: dict[str, Any] | None = None, @@ -528,6 +534,7 @@ def step_rscan( """Scan concurrent trajectories with specified step size, relative to position. Generates list(s) of points for each trajectory, used with bluesky.plans.rel_list_scan(det, *args, md=metadata).""" + # TODO: move to using Linspace spec and spec_scan when stable and tested at v1.0 args, shape = _make_step_scan_args(params) metadata = metadata or {} metadata["shape"] = shape @@ -548,9 +555,9 @@ def step_grid_rscan( params: Annotated[ dict[Movable | Motor, list[float | int]], Field( - description="Dictionary of 'device: parameter' keys. For independent " - "trajectories, provide '{movable1: [start1, stop1, step1], ... , movableN: " - "[startN, stopN, stepN]}'." + description="Dictionary of 'device: parameter' keys. For independent \ + trajectories, provide '{movable1: [start1, stop1, step1], ... , movableN: \ + [startN, stopN, stepN]}'." ), ], snake_axes: bool = True, # Currently specifying axes to snake is not supported @@ -559,6 +566,7 @@ def step_grid_rscan( """Scan independent trajectories with specified step size, relative to position. Generates list(s) of points for each trajectory, used with bluesky.plans.list_grid_scan(det, *args, md=metadata).""" + # TODO: move to using Linspace spec and spec_scan when stable and tested at v1.0 args, shape = _make_step_scan_args(params, grid=True) metadata = metadata or {} metadata["shape"] = shape From 615f17475ba25291674f435490bad4137c524009 Mon Sep 17 00:00:00 2001 From: Emily Arnold Date: Tue, 6 Jan 2026 16:00:11 +0000 Subject: [PATCH 7/7] fix tests for plan_stubs/wrapped --- tests/plan_stubs/test_wrapped_stubs.py | 40 ++++++++++++++------------ 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/tests/plan_stubs/test_wrapped_stubs.py b/tests/plan_stubs/test_wrapped_stubs.py index cbc77ac9bd2..d2b2c3fa95a 100644 --- a/tests/plan_stubs/test_wrapped_stubs.py +++ b/tests/plan_stubs/test_wrapped_stubs.py @@ -36,33 +36,36 @@ def y_axis() -> SimMotor: def test_set_absolute(x_axis: SimMotor): - assert list(set_absolute(x_axis, 0.5)) == [Msg("set", x_axis, 0.5, group=None)] + msgs = list(set_absolute(x_axis, 0.5, wait=True)) + assert len(msgs) == 2 + assert msgs[0] == Msg("set", x_axis, 0.5, group=ANY) + assert msgs[1] == Msg("wait", group=msgs[0].kwargs["group"]) -def test_set_absolute_with_group(x_axis: SimMotor): - assert list(set_absolute(x_axis, 0.5, group="foo")) == [ - Msg("set", x_axis, 0.5, group="foo") +def test_set_absolute_without_wait(x_axis: SimMotor): + assert list(set_absolute(x_axis, 0.5, wait=False)) == [ + Msg("set", x_axis, 0.5, group=None) ] -def test_set_absolute_with_wait(x_axis: SimMotor): - msgs = list(set_absolute(x_axis, 0.5, wait=True)) +def test_set_absolute_with_group(x_axis: SimMotor): + msgs = list(set_absolute(x_axis, 0.5, group="foo")) assert len(msgs) == 2 - assert msgs[0] == Msg("set", x_axis, 0.5, group=ANY) + assert msgs[0] == Msg("set", x_axis, 0.5, group="foo") assert msgs[1] == Msg("wait", group=msgs[0].kwargs["group"]) -def test_set_absolute_with_group_and_wait(x_axis: SimMotor): - assert list(set_absolute(x_axis, 0.5, group="foo", wait=True)) == [ - Msg("set", x_axis, 0.5, group="foo"), - Msg("wait", group="foo"), +def test_set_absolute_with_group_and_without_wait(x_axis: SimMotor): + assert list(set_absolute(x_axis, 0.5, group="foo", wait=False)) == [ + Msg("set", x_axis, 0.5, group="foo") ] def test_set_relative(x_axis: SimMotor): assert list(set_relative(x_axis, 0.5)) == [ Msg("locate", x_axis), - Msg("set", x_axis, 0.5, group=None), + Msg("set", x_axis, 0.5, group=ANY), + Msg("wait", group=ANY), ] @@ -70,22 +73,21 @@ def test_set_relative_with_group(x_axis: SimMotor): assert list(set_relative(x_axis, 0.5, group="foo")) == [ Msg("locate", x_axis), Msg("set", x_axis, 0.5, group="foo"), + Msg("wait", group="foo"), ] -def test_set_relative_with_wait(x_axis: SimMotor): - msgs = list(set_relative(x_axis, 0.5, wait=True)) - assert len(msgs) == 3 +def test_set_relative_without_wait(x_axis: SimMotor): + msgs = list(set_relative(x_axis, 0.5, wait=False)) + assert len(msgs) == 2 assert msgs[0] == Msg("locate", x_axis) assert msgs[1] == Msg("set", x_axis, 0.5, group=ANY) - assert msgs[2] == Msg("wait", group=msgs[1].kwargs["group"]) -def test_set_relative_with_group_and_wait(x_axis: SimMotor): - assert list(set_relative(x_axis, 0.5, group="foo", wait=True)) == [ +def test_set_relative_with_group_and_without_wait(x_axis: SimMotor): + assert list(set_relative(x_axis, 0.5, group="foo", wait=False)) == [ Msg("locate", x_axis), Msg("set", x_axis, 0.5, group="foo"), - Msg("wait", group="foo"), ]