Source code for bbndb.auspex

import numpy as np
import os
from copy import deepcopy
import datetime
import networkx as nx

from sqlalchemy import Column, DateTime, String, Boolean, Float, Integer, LargeBinary, ForeignKey, func, JSON, PickleType
from sqlalchemy.orm import relationship, backref, validates
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy import inspect
from IPython.display import HTML, display
from .session import Base

__current_pipeline__ = None

[docs]class Connection(Base): __tablename__ = "connection" id = Column(Integer, primary_key=True) node1_id = Column(Integer, ForeignKey("nodeproxy.id")) node2_id = Column(Integer, ForeignKey("nodeproxy.id")) node1_name = Column(String, nullable=False, default="source") node2_name = Column(String, nullable=False, default="sink") pipeline_name = Column(String, nullable=False) time = Column(DateTime)
[docs]class NodeProxy(Base): __tablename__ = "nodeproxy" id = Column(Integer, primary_key=True) label = Column(String) qubit_name = Column(String) node_type = Column(String(50)) hash_val = Column(Integer, nullable=False) connection_to = relationship("Connection", uselist=False, backref='node1', foreign_keys="[Connection.node1_id]") connection_from = relationship("Connection", uselist=False, backref='node2', foreign_keys="[Connection.node2_id]") __mapper_args__ = { 'polymorphic_identity':'channel', 'polymorphic_on':node_type } def __init__(self, **kwargs): self.hash_val = int(np.random.randint(np.iinfo(np.uint32).max, dtype=np.uint32)) super().__init__(**kwargs)
[docs] def print(self, show=True): table_code = "" label = self.label if self.label else "Unlabeled" inspr = inspect(self) for c in list(self.__mapper__.columns): if c.name not in ["id", "label", "qubit_name", "node_type"]: hist = getattr(inspr.attrs, c.name).history dirty = "Yes" if hist.has_changes() else "" if c.name == "kernel_data": table_code += f"<tr><td>{c.name}</td><td>Binary Data of length {len(self.kernel)}</td><td>{dirty}</td></tr>" else: table_code += f"<tr><td>{c.name}</td><td>{getattr(self,c.name)}</td><td>{dirty}</td></tr>" html = f"<b>{self.node_type}</b> ({self.qubit_name}) <i>{label}</i></br><table style='{{padding:0.5em;}}'><tr><th>Attribute</th><th>Value</th><th>Changes?</th></tr><tr>{table_code}</tr></table>" if show: display(HTML(html)) else: return html
[docs]class NodeMixin(object): @declared_attr def id(cls): return Column(Integer, ForeignKey("nodeproxy.id"), primary_key=True) @declared_attr def __mapper_args__(cls): return {'polymorphic_identity': cls.__name__.lower()} @declared_attr def __tablename__(cls): return cls.__name__.lower() def __repr__(self): return str(self) def __str__(self): return f"{self.__class__.__name__}('{self.label}')"
[docs]class FilterProxy(NodeMixin, NodeProxy): """docstring for FilterProxy""" def __init__(self, **kwargs): global __current_pipeline__ # with db_session: super(FilterProxy, self).__init__(**kwargs) self.pipelineMgr = __current_pipeline__
[docs] def add(self, filter_obj, connector_out="source", connector_in="sink"): # if not self.pipeline: # raise Exception("This filter does not correspond to any experiment. Please file a bug report.") if not self.pipelineMgr: raise Exception("This filter does not know about any pipeline manager.") filter_obj.pipelineMgr = self.pipelineMgr filter_obj.qubit_name = self.qubit_name self.pipelineMgr.meas_graph.add_node(filter_obj.hash_val, node_obj=filter_obj) self.pipelineMgr.meas_graph.add_edge(self.hash_val, filter_obj.hash_val, connector_in=connector_in, connector_out=connector_out) self.pipelineMgr.session.add(filter_obj) return filter_obj
[docs] def drop(self): desc = list(nx.algorithms.dag.descendants(self.pipelineMgr.meas_graph, str(self))) desc.append(str(self)) for n in desc: # n.exp = None n = self.pipelineMgr.meas_graph.nodes[n]['node_obj'] self.pipelineMgr.session.expunge(n) self.pipelineMgr.meas_graph.remove_nodes_from(desc)
[docs] def node_label(self): label = self.label if self.label else "" return f"{self.__class__.__name__} {label} ({self.qubit_name})"
def __repr__(self): return f"{self.__class__.__name__} {self.label} ({self.qubit_name})" def __str__(self): return f"{self.__class__.__name__} {self.label} ({self.qubit_name})" def __getitem__(self, key): ss = list(self.pipelineMgr.meas_graph.successors(self.hash_val)) label_matches = [self.pipelineMgr.meas_graph.nodes[s]['node_obj'] for s in ss if self.pipelineMgr.meas_graph.nodes[s]['node_obj'].label == key] class_matches = [self.pipelineMgr.meas_graph.nodes[s]['node_obj'] for s in ss if self.pipelineMgr.meas_graph.nodes[s]['node_obj'].__class__.__name__ == key] if len(label_matches) == 1: return label_matches[0] elif len(class_matches) == 1: return class_matches[0] else: raise ValueError("Could not find suitable filter by label or by class")
[docs]class Demodulate(FilterProxy, NodeMixin): """Digital demodulation and filtering to select a signal at a particular frequency component. This filter does the following: 1. First stage decimating filter on data 2. Take product of result with with reference signal at demodulation frequency 3. Second stage decimating filter on result to boost n_bandwidth 4. Final channel selecting filter at n_bandwidth/2 If an axis name is supplied to `follow_axis` then the filter will demodulate at the freqency `axis_frequency_value - follow_freq_offset` otherwise it will demodulate at `frequency`. Note that the filter coefficients are still calculated with respect to the `frequency` paramter, so it should be chosen accordingly when `follow_axis` is defined.""" # Demodulation frequency id = Column(Integer, ForeignKey("filterproxy.id"), primary_key=True) frequency = Column(Float, default=10e6, nullable=False)
[docs] @validates('frequency') def validate_frequency(self, key, value): assert (value >= -10.0e9) and (value <= 10.0e9) return value
# Filter bandwidth bandwidth = Column(Float, default=5e6, nullable=False)
[docs] @validates('bandwidth') def validate_bandwidth(self, key, value): assert (value >= 0.0) and (value <= 100.0e6) return value
# Let the demodulation frequency follow an axis' value (useful for sweeps) follow_axis = Column(String) # Name of the axis to follow # Offset of the actual demodulation from the followed axis follow_freq_offset = Column(Float) # Offset # Maximum data reduction factor decimation_factor = Column(Integer, default=4, nullable=False)
[docs] @validates('decimation_factor') def validate_decimation_factor(self, key, value): assert value in range(1, 101) return value
[docs]class Average(FilterProxy, NodeMixin): """Takes data and collapses along the specified axis. Threshold is used for state identification.""" id = Column(Integer, ForeignKey("filterproxy.id"), primary_key=True) axis = Column(String, default="averages") threshold = Column(Float, default=0.5)
[docs]class Framer(FilterProxy, NodeMixin): """Emit out data in increments defined by the specified axis.""" id = Column(Integer, ForeignKey("filterproxy.id"), primary_key=True) axis = Column(String, nullable=False)
[docs]class FidelityKernel(FilterProxy, NodeMixin): """Calculates the single shot fidelity from given input""" id = Column(Integer, ForeignKey("filterproxy.id"), primary_key=True) save_kernel = Column(Boolean, nullable=False, default=False) optimal_integration_time = Column(Boolean, nullable=False, default=False) set_threshold = Column(Boolean, nullable=False, default=True) zero_mean = Column(Boolean, nullable=False, default=False) logistic_regression = Column(Boolean, nullable=False, default=False) tolerance = Column(Float, nullable=False, default=1.0e-3)
[docs]class Integrate(FilterProxy, NodeMixin): id = Column(Integer, ForeignKey("filterproxy.id"), primary_key=True) """Integrate with a given kernel or using a simple boxcar. Kernel will be padded/truncated to match record length""" # Use a boxcar (simple) or use a kernel specified by the kernel_filename parameter simple_kernel = Column(Boolean, default=True, nullable=False) # File in which to find the kernel data, or raw python string to be evaluated kernel = Column(String) # DC bias bias = Column(Float, default=0.0, nullable=False) # For a simple kernel, where does the boxcar start box_car_start = Column(Float, default=0.0, nullable=False)
[docs] @validates('box_car_start') def validate_box_car_start(self, key, value): assert (value >= 0.0) and (value <= 1e-4) return value
# For a simple kernel, where does the boxcar stop box_car_stop = Column(Float, default=100.0e-9, nullable=False)
[docs] @validates('box_car_stop') def validate_box_car_stop(self, key, value): assert (value >= 0.0) and (value <= 1e-4) return value
# Built in frequency for demodulation demod_frequency = Column(Float, default=0.0, nullable=False)
[docs]class OutputProxy(FilterProxy, NodeMixin): id = Column(Integer, ForeignKey("filterproxy.id"), primary_key=True)
[docs]class Display(OutputProxy, NodeMixin): """Create a plot tab within the plotting interface.""" # Should we plot in 1D or 2D? 0 means choose the largest possible. id = Column(Integer, ForeignKey("outputproxy.id"), primary_key=True) plot_dims = Column(Integer, default=0, nullable=False)
[docs] @validates('plot_dims') def validate_plot_dims(self, key, value): assert value in [0,1,2] return value
# Allowed values are "real", "imag", "real/imag", "amp/phase", "quad" # TODO: figure out how to validate these in pony plot_mode = Column(String, default="quad", nullable=False)
[docs]class Write(OutputProxy, NodeMixin): """Writes data to file.""" id = Column(Integer, ForeignKey("outputproxy.id"), primary_key=True) filename = Column(String, default = "output.auspex", nullable=False) groupname = Column(String, default = "main", nullable=False) add_date = Column(Boolean, default = False, nullable=False)
# save_settings = Column(Boolean, default = True, nullable=False)
[docs]class Buffer(OutputProxy, NodeMixin): """Saves data in a buffer""" id = Column(Integer, ForeignKey("outputproxy.id"), primary_key=True) max_size = Column(Integer) def __init__(self, **kwargs): if "groupname" in kwargs: kwargs.pop("groupname") super(Buffer, self).__init__(**kwargs)
[docs]class StreamSelect(NodeMixin, NodeProxy): """docstring for FilterProxy""" id = Column(Integer, ForeignKey("nodeproxy.id"), primary_key=True) dsp_channel = Column(Integer, default=0, nullable=False) stream_type = Column(String, default='raw', nullable=False)
[docs] @validates('stream_type') def validate_stream_type(self, key, value): assert (value in ["raw", "demodulated", "integrated", "averaged"]) return value
dsp_channel = Column(Integer, default=1) if_freq = Column(Float, default=0.0, nullable=False) kernel_data = Column(LargeBinary) # Binary string of np.complex128 kernel_bias = Column(Float, default=0.0, nullable=False) threshold = Column(Float, default=0.0, nullable=False) threshold_invert = Column(Boolean, default=False, nullable=False) def kernel(): doc = "The kernel as represented by a numpy complex128 array, or the name of a kernel file." def fget(self): if self.kernel_data: return np.frombuffer(self.kernel_data, dtype=np.complex128) else: return np.empty((0,), dtype=np.complex128) def fset(self, value): if isinstance(value, np.ndarray): self.kernel_data = value.astype(np.complex128).tobytes() elif isinstance(value, str): try: from auspex.config import KernelDir except ImportError: KernelDir = "./" logger.warning("Could not load auspex config, loading kernel from current directory.") kpath = os.path.join(KernelDir, value) if os.path.exists(kpath): kdata = np.loadtxt(kpath, dtype=complex, converters={0: lambda s: complex(s.decode().replace('+-', '-'))}) self.kernel_data = kdata.astype(np.complex128).tobytes() else: raise ValueError(f"Unable to open kernel file: {kpath}.") else: raise ValueError(f"Unable to load kernel for stream selector {self.id} -- please provide a path or numpy array.") return locals() kernel = property(**kernel()) def __init__(self, pipelineMgr=None, **kwargs): global __current_pipeline__ super(StreamSelect, self).__init__(**kwargs) self.pipelineMgr = pipelineMgr __current_pipeline__ = pipelineMgr self.digitizer_settings = None self.available_streams = None
[docs] def add(self, filter_obj, connector_out="source", connector_in="sink"): # if not self.pipeline: # raise Exception("This qubit does not correspond to any experiment. Please file a bug report.") if not self.pipelineMgr: raise Exception("This qubit does not know about any pipeline manager.") filter_obj.qubit_name = self.qubit_name filter_obj.pipelineMgr = self.pipelineMgr # filter_obj.exp = self.pipeline if self.hash_val not in self.pipelineMgr.meas_graph.nodes(): self.pipelineMgr.meas_graph.add_node(self.hash_val, node_obj=self) self.pipelineMgr.meas_graph.add_node(filter_obj.hash_val, node_obj=filter_obj) self.pipelineMgr.meas_graph.add_edge(self.hash_val, filter_obj.hash_val, connector_in=connector_in, connector_out=connector_out) self.pipelineMgr.session.add(filter_obj) return filter_obj
[docs] def clear_pipeline(self): """Remove all nodes coresponding to the qubit""" desc = nx.algorithms.dag.descendants(self.pipelineMgr.meas_graph, self.hash_val) for n in desc: # n.exp = None n = self.pipelineMgr.meas_graph.nodes[n]['node_obj'] self.pipelineMgr.session.expunge(n) self.pipelineMgr.meas_graph.remove_nodes_from(desc)
[docs] def create_default_pipeline(self, average=True, buffers=False): Output = Buffer if buffers else Write if average: if self.stream_type.lower() == "raw": self.add(Demodulate()).add(Integrate()).add(Average()).add(Output(groupname=self.qubit_name+'-raw_int')) if self.stream_type.lower() == "demodulated": self.add(Integrate()).add(Average()).add(Output(groupname=self.qubit_name+'-demod_int')) if self.stream_type.lower() == "integrated": self.add(Average()).add(Output(groupname=self.qubit_name+'-main')) if self.stream_type.lower() == "averaged": self.add(Output(groupname=self.qubit_name+'-avg')) else: if self.stream_type.lower() == "raw": self.add(Demodulate()).add(Integrate()).add(Output(groupname=self.qubit_name+'-raw_int')) if self.stream_type.lower() == "demodulated": self.add(Integrate()).add(Output(groupname=self.qubit_name+'-demod_int')) if self.stream_type.lower() == "integrated": self.add(Output(groupname=self.qubit_name+'-main')) if self.stream_type.lower() == "averaged": raise Exception("Cannot have averaged stream without averaging?!")
[docs] def show_pipeline(self): desc = list(nx.algorithms.dag.descendants(self.pipelineMgr.meas_graph, self.hash_val)) + [self.hash_val] subgraph = self.pipelineMgr.meas_graph.subgraph(desc) return self.pipelineMgr.show_pipeline(subgraph=subgraph)
[docs] def show_connectivity(self): pass
[docs] def node_label(self): return self.__repr__()
def __repr__(self): return f"StreamSelect for {self.qubit_name}" def __str__(self): return f"StreamSelect for {self.qubit_name}" def __getitem__(self, key): ss = list(self.pipelineMgr.meas_graph.successors(self.hash_val)) label_matches = [self.pipelineMgr.meas_graph.nodes[s]['node_obj'] for s in ss if self.pipelineMgr.meas_graph.nodes[s]['node_obj'].label == key] class_matches = [self.pipelineMgr.meas_graph.nodes[s]['node_obj'] for s in ss if self.pipelineMgr.meas_graph.nodes[s]['node_obj'].__class__.__name__ == key] if len(label_matches) == 1: return label_matches[0] elif len(class_matches) == 1: return class_matches[0] else: raise ValueError("Could not find suitable filter by label or by class")
stream_hierarchy = [Demodulate, Integrate, Average, OutputProxy]