Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions src/dodal/devices/electron_analyser/abstract/base_driver_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,13 @@
DualEnergySource,
EnergySource,
)
from dodal.devices.electron_analyser.enums import EnergyMode
from dodal.devices.electron_analyser.enums import EnergyMode, SelectedSource
from dodal.devices.electron_analyser.util import to_binding_energy
from dodal.devices.fast_shutter import (
AbstractFastShutter,
DualFastShutter,
SelectedDevice,
)


class AbstractAnalyserDriverIO(
Expand All @@ -53,6 +58,7 @@ def __init__(
psu_mode_type: type[TPsuMode],
pass_energy_type: type[TPassEnergy],
energy_source: EnergySource | DualEnergySource,
shutter: AbstractFastShutter | None,
name: str = "",
) -> None:
"""
Expand Down Expand Up @@ -88,6 +94,11 @@ def __init__(
self._calculate_total_intensity, spectrum=self.spectrum
)
self.energy_source = energy_source
self.shutter = shutter

self._cached_excitation_energy = soft_signal_rw(
float, initial_value=0, units="eV"
)

with self.add_children_as_readables(StandardReadableFormat.CONFIG_SIGNAL):
# Read once per scan after data acquired
Expand Down Expand Up @@ -121,7 +132,7 @@ def __init__(
self._calculate_binding_energy_axis,
"eV",
energy_axis=self.energy_axis,
excitation_energy=self.energy_source.energy,
excitation_energy=self.energy_source.energy, # Need to check this actaully takes the correct source...
energy_mode=self.energy_mode,
)
self.angle_axis = self._create_angle_axis_signal(prefix)
Expand Down Expand Up @@ -151,15 +162,32 @@ async def set(self, region: TAbstractBaseRegion):
"""
if isinstance(self.energy_source, DualEnergySource):
self.energy_source.selected_source.set(region.excitation_energy_source)

if isinstance(self.shutter, DualFastShutter):
selected_shutter = (
SelectedDevice.DEVICE1
if region.excitation_energy_source == SelectedSource.SOURCE1
else SelectedDevice.DEVICE2
)
self.shutter.selected_shutter.set(selected_shutter)

excitation_energy = await self.energy_source.energy.get_value()

shutter = self.shutter
if shutter is not None:
await shutter.set(shutter.close_state)

# Switch to kinetic energy as epics doesn't support BINDING.
ke_region = region.switch_energy_mode(EnergyMode.KINETIC, excitation_energy)
await self._set_region(ke_region)

# Set the true energy mode from original region so binding_energy_axis can be
# calculated correctly.
await self.energy_mode.set(region.energy_mode)

if shutter is not None:
await shutter.set(shutter.open_state)

@abstractmethod
async def _set_region(self, ke_region: TAbstractBaseRegion):
"""
Expand Down
98 changes: 76 additions & 22 deletions src/dodal/devices/fast_shutter.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,33 @@
from abc import abstractmethod
from typing import TypeVar

from bluesky.protocols import Movable
from ophyd_async.core import (
AsyncStatus,
EnumTypes,
Reference,
StandardReadable,
StrictEnum,
soft_signal_rw,
)
from ophyd_async.epics.core import epics_signal_rw

StrictEnumT = TypeVar("StrictEnumT", bound=EnumTypes)


class GenericFastShutter(StandardReadable, Movable[StrictEnumT]):
class AbstractFastShutter(StandardReadable, Movable[StrictEnumT]):
def __init__(self, open_state: StrictEnumT, close_state: StrictEnumT, name: str):
self.open_state = open_state
self.close_state = close_state
super().__init__(name)

@abstractmethod
@AsyncStatus.wrap
async def set(self, state: StrictEnumT) -> None:
""""""


class GenericFastShutter(AbstractFastShutter[StrictEnumT]):
"""
Basic enum device specialised for a fast shutter with configured open_state and
close_state so it is generic enough to be used with any device or plan without
Expand All @@ -38,32 +54,70 @@ def __init__(
open_state: The enum value that corresponds with opening the shutter.
close_state: The enum value that corresponds with closing the shutter.
"""
self.open_state = open_state
self.close_state = close_state
with self.add_children_as_readables():
self.state = epics_signal_rw(type(self.open_state), pv)
super().__init__(name)
super().__init__(open_state, close_state, name)

@AsyncStatus.wrap
async def set(self, value: StrictEnumT) -> None:
await self.state.set(value)
async def set(self, state: StrictEnumT) -> None:
await self.state.set(state)

async def is_open(self) -> bool:
"""
Checks to see if shutter is in open_state. Should not be used directly inside a
plan. A user should use the following instead in a plan:

from bluesky import plan_stubs as bps
is_open = yield from bps.rd(shutter.state) == shutter.open_state
"""
return await self.state.get_value() == self.open_state
class SelectedDevice(StrictEnum):
DEVICE1 = "device1"
DEVICE2 = "device2"

async def is_closed(self) -> bool:
"""
Checks to see if shutter is in close_state. Should not be used directly inside a
plan. A user should use the following instead in a plan:

from bluesky import plan_stubs as bps
is_closed = yield from bps.rd(shutter.state) == shutter.close_state
"""
return await self.state.get_value() == self.close_state
T = TypeVar("T")


def get_obj_from_selected_device(
selected_device: SelectedDevice, selected_source1_obj: T, selected_source2_obj: T
) -> T:
match selected_device:
case SelectedDevice.DEVICE1:
return selected_source1_obj
case SelectedDevice.DEVICE2:
return selected_source2_obj


class DualFastShutter(AbstractFastShutter[StrictEnumT]):
def __init__(
self, shutter1: GenericFastShutter, shutter2: GenericFastShutter, name: str
):
if shutter1.open_state is not shutter2.open_state:
raise Exception("")
if shutter1.close_state is not shutter2.close_state:
raise Exception("")

open_state = shutter1.open_state
close_state = shutter1.close_state

self.shutter1_ref = Reference(shutter1)
self.shutter2_ref = Reference(shutter2)

self.selected_shutter = soft_signal_rw(
SelectedDevice, initial_value=SelectedDevice.DEVICE1
)
super().__init__(open_state, close_state, name)

async def get_active_shutter(self) -> GenericFastShutter:
return get_obj_from_selected_device(
await self.selected_shutter.get_value(),
selected_source1_obj=self.shutter1_ref(),
selected_source2_obj=self.shutter2_ref(),
)

async def get_inactive_shutter(self) -> GenericFastShutter:
return get_obj_from_selected_device(
await self.selected_shutter.get_value(),
selected_source1_obj=self.shutter2_ref(),
selected_source2_obj=self.shutter1_ref(),
)

@AsyncStatus.wrap
async def set(self, state: StrictEnumT):
inactive_shutter = await self.get_inactive_shutter()
await inactive_shutter.set(self.close_state)
active_shutter = await self.get_active_shutter()
await active_shutter.set(state)
Loading