Skip to content

Commit

Permalink
Updated the run browser to work with new database. (#355)
Browse files Browse the repository at this point in the history
* Added a basic XRF view (no data plotted yet).

* Split the run browser tab into a metadata view.

* Moved the multi-plot tab from main display into its own tab.

* Updated the kafka_client to use the new iconfig topic.

* Moved the lineplot from display to its own view.

* Enabled the run_browser gradient checkbox and added line plot labels.

* Moved the grid plot widget to it's own view.

* Fixed tests and linting.

* Fixed imports to work with isort version 6.

* Run browser adds a button that swaps the value and reference signals.

* Removed some extra hinted signals from the energy positioner.

* Added a data key for the 'seq_num' column in the run browser.

* Black, isort.

* Restored live updates to the run browser.
  • Loading branch information
canismarko authored Jan 30, 2025
1 parent 326c4eb commit 393ea57
Show file tree
Hide file tree
Showing 32 changed files with 3,119 additions and 1,282 deletions.
81 changes: 80 additions & 1 deletion src/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,85 @@ async def filters(sim_registry):
"energy": {"fields": ["energy_energy", "energy_id_energy_readback"]},
}

data_keys = {
"I0-mcs-scaler-channels-0-net_count": {
"dtype": "number",
"dtype_numpy": "<f8",
"limits": {
"control": {"high": 0.0, "low": 0.0},
"display": {"high": 0.0, "low": 0.0},
},
"object_name": "I0",
"precision": 0,
"shape": [],
"source": "ca://25idcVME:3820:scaler1_netA.A",
"units": "",
},
"I0-mcs-scaler-channels-3-net_count": {
"dtype": "number",
"dtype_numpy": "<f8",
"limits": {
"control": {"high": 0.0, "low": 0.0},
"display": {"high": 0.0, "low": 0.0},
},
"object_name": "I0",
"precision": 0,
"shape": [],
"source": "ca://25idcVME:3820:scaler1_netA.D",
"units": "",
},
"I0-mcs-scaler-elapsed_time": {
"dtype": "number",
"dtype_numpy": "<f8",
"limits": {
"control": {"high": 0.0, "low": 0.0},
"display": {"high": 0.0, "low": 0.0},
},
"object_name": "I0",
"precision": 3,
"shape": [],
"source": "ca://25idcVME:3820:scaler1.T",
"units": "",
},
"I0-net_current": {
"dtype": "number",
"dtype_numpy": "<f8",
"object_name": "I0",
"shape": [],
"source": "soft://I0-net_current(gain,count,clock_count,clock_frequency,counts_per_volt_second)",
"units": "A",
},
"ge_8element": {
"dtype": "array",
"dtype_numpy": "<u4",
"external": "STREAM:",
"object_name": "ge_8element",
"shape": [8, 4096],
"source": "ca://XSP_Ge_8elem:HDF1:FullFileName_RBV",
},
"ge_8element-element0-all_event": {
"dtype": "number",
"dtype_numpy": "<f8",
"external": "STREAM:",
"object_name": "ge_8element",
"shape": [],
"source": "ca://XSP_Ge_8elem:HDF1:FullFileName_RBV",
},
"sim_motor_2": {
"dtype": "number",
"dtype_numpy": "<f8",
"limits": {
"control": {"high": 32000.0, "low": -32000.0},
"display": {"high": 32000.0, "low": -32000.0},
},
"object_name": "sim_motor_2",
"precision": 5,
"shape": [],
"source": "ca://25idc:simMotor:m2.RBV",
"units": "degrees",
},
}

bluesky_mapping = {
"7d1daf1d-60c7-4aa7-a668-d1cd97e5335f": MapAdapter(
{
Expand All @@ -266,7 +345,7 @@ async def filters(sim_registry):
}
),
},
metadata={"hints": hints},
metadata={"hints": hints, "data_keys": data_keys},
),
},
metadata={
Expand Down
2 changes: 2 additions & 0 deletions src/firefly/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,8 @@ def start_kafka_client(self):
self._kafka_client.start()
except Exception as exc:
log.error(f"Could not start kafka client: {exc}")
else:
log.info("Started kafka client.")

def start_queue_client(self):
try:
Expand Down
4 changes: 2 additions & 2 deletions src/firefly/kafka_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async def consumer_loop(self):
if self.kafka_consumer is None:
config = load_config()
self.kafka_consumer = AIOKafkaConsumer(
config["queueserver"]["kafka_topic"],
config["kafka"]["topic"],
bootstrap_servers="fedorov.xray.aps.anl.gov:9092",
group_id=str(uuid4()),
value_deserializer=msgpack.loads,
Expand Down Expand Up @@ -74,7 +74,6 @@ def _process_document(self, doc_type, doc):
# Notify clients that a new run has started
uid = doc.get("uid", "")
log.info(f"Received new start UID: {uid}")
print(f"Emitting run start: {uid=}")
self.run_started.emit(uid)
elif doc_type == "descriptor":
# Save the description to reference to later
Expand All @@ -88,6 +87,7 @@ def _process_document(self, doc_type, doc):
log.warning("fUnknown descriptor UID {descriptor_uid}")
return
log.info(f"Emitting run updated: {run_uid=}")
print(f"Emitting run updated: {run_uid=}")
self.run_updated.emit(run_uid)
elif doc_type == "stop":
run_uid = doc["run_start"]
Expand Down
65 changes: 64 additions & 1 deletion src/firefly/run_browser/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import datetime as dt
import logging
import warnings
from collections import OrderedDict
from collections import ChainMap, OrderedDict
from functools import partial
from typing import Mapping, Sequence

Expand Down Expand Up @@ -49,6 +49,25 @@ async def stream_names(self):
streams = [stream for streams in all_streams for stream in streams]
return list(set(streams))

async def data_keys(self, stream: str):
aws = [run.data_keys(stream=stream) for run in self.selected_runs]
keys = await asyncio.gather(*aws)
keys = ChainMap(*keys)
keys["seq_num"] = {
"dtype": "number",
"dtype_numpy": "<i8",
"precision": 0,
"shape": [],
}
return keys

async def data_frames(self, stream: str) -> dict:
"""Return the internal dataframes for selected runs as {uid: dataframe}."""
aws = (run.data(stream=stream) for run in self.selected_runs)
dfs = await asyncio.gather(*aws)
dfs = {run.uid: df for run, df in zip(self.selected_runs, dfs)}
return dfs

async def filtered_nodes(self, filters: Mapping):
case_sensitive = False
log.debug(f"Filtering nodes: {filters}")
Expand Down Expand Up @@ -150,6 +169,22 @@ async def load_all_runs(self, filters: Mapping = {}):
all_runs.append(run_data)
return all_runs

async def hints(self, stream: str = "primary") -> tuple[list, list]:
"""Get hints for this stream, as two lists.
(*independent_hints*, *dependent_hints*)
*independent_hints* are those operated by the experiment,
while *dependent_hints* are those measured as a result.
"""
aws = [run.hints(stream) for run in self.selected_runs]
all_hints = await asyncio.gather(*aws)
# Flatten arrays
ihints, dhints = zip(*all_hints)
ihints = [hint for hints in ihints for hint in hints]
dhints = [hint for hints in dhints for hint in hints]
return ihints, dhints

async def signal_names(self, stream: str, *, hinted_only: bool = False):
"""Get a list of valid signal names (data columns) for selected runs.
Expand Down Expand Up @@ -225,6 +260,34 @@ async def all_signals(self, stream: str, *, hinted_only=False) -> dict:
dfs[run.uid] = df
return dfs

async def dataset(
self,
dataset_name: str,
*,
stream: str,
uids: Sequence[str] | None = None,
) -> Mapping:
"""Produce a dictionary with the n-dimensional datasets for plotting.
The keys of the dictionary are the UIDs for each scan, and
the corresponding value is a pandas dataset with the data for
each signal.
Parameters
==========
uids
If not ``None``, only runs with UIDs listed in this
parameter will be included.
"""
# Build the dataframes
arrays = OrderedDict()
for run in self.selected_runs:
# Get data from the database
arr = await run.dataset(dataset_name, stream=stream)
arrays[run.uid] = arr
return arrays

async def signals(
self,
x_signal,
Expand Down
Loading

0 comments on commit 393ea57

Please sign in to comment.