aboutsummaryrefslogtreecommitdiffstats
path: root/mpm/python/usrp_mpm/simulator/rfnoc_graph.py
diff options
context:
space:
mode:
authorSamuel O'Brien <sam.obrien@ni.com>2020-07-21 14:20:24 -0500
committerAaron Rossetto <aaron.rossetto@ni.com>2020-10-08 07:44:12 -0500
commitee9085a494d6f5030e49f5a47aff6a84008e0852 (patch)
tree1234c4a6aa5c88f3df290c1586e69e4a91d24f7d /mpm/python/usrp_mpm/simulator/rfnoc_graph.py
parentf54a22c60a0cbe990c9d3892f4c565d64226196b (diff)
downloaduhd-ee9085a494d6f5030e49f5a47aff6a84008e0852.tar.gz
uhd-ee9085a494d6f5030e49f5a47aff6a84008e0852.tar.bz2
uhd-ee9085a494d6f5030e49f5a47aff6a84008e0852.zip
sim: Simulator CHDR Parsing and RFNoC Graph
This commit adds a simulated RFNoC Graph to the simulator. It is also able to process management and control packets which can traverse the graph and read from simulated registers. Stub callbacks for creating streams have been provided but are not implemented yet. Signed-off-by: Samuel O'Brien <sam.obrien@ni.com>
Diffstat (limited to 'mpm/python/usrp_mpm/simulator/rfnoc_graph.py')
-rw-r--r--mpm/python/usrp_mpm/simulator/rfnoc_graph.py554
1 files changed, 554 insertions, 0 deletions
diff --git a/mpm/python/usrp_mpm/simulator/rfnoc_graph.py b/mpm/python/usrp_mpm/simulator/rfnoc_graph.py
new file mode 100644
index 000000000..42210ab6e
--- /dev/null
+++ b/mpm/python/usrp_mpm/simulator/rfnoc_graph.py
@@ -0,0 +1,554 @@
+#
+# Copyright 2020 Ettus Research, a National Instruments Brand
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+"""This module includes all of the components necessary to simulate the
+configuration of a NoC Core and NoC Blocks.
+
+This module handles and responds to Management and Ctrl Packets. It
+also instantiates the registers and acts as an interface between
+the chdr packets on the network and the registers.
+"""
+from enum import IntEnum
+from uhd.chdr import PacketType, MgmtOp, MgmtOpCode, MgmtOpNodeInfo, \
+ CtrlStatus, CtrlOpCode, MgmtOpCfg, MgmtOpSelDest
+from .noc_block_regs import NocBlockRegs, NocBlock, StreamEndpointPort, NocBlockPort
+from .stream_ep_regs import StreamEpRegs, STRM_STATUS_FC_ENABLED
+
+class StreamSpec:
+ """This class carries the configuration parameters of a Tx stream.
+
+ total_samples, is_continuous, and packet_samples come from the
+ radio registers (noc_block_regs.py)
+
+ sample_rate comes from an rpc to the daughterboard (through the
+ set_sample_rate method in chdr_sniffer.py)
+
+ dst_epid comes from the source stream_ep
+
+ addr comes from the xport passed through when routing to dst_epid
+ """
+ LOW_MASK = 0xFFFFFFFF
+ HIGH_MASK = (0xFFFFFFFF) << 32
+ def __init__(self):
+ self.total_samples = 0
+ self.is_continuous = True
+ self.packet_samples = None
+ self.sample_rate = None
+ self.dst_epid = None
+ self.addr = None
+
+ def set_num_words_lo(self, low):
+ """Set the low 32 bits of the total_samples field"""
+ self.total_samples = (self.total_samples & (StreamSpec.HIGH_MASK)) \
+ | (low & StreamSpec.LOW_MASK)
+
+ def set_num_words_hi(self, high):
+ """Set the high 32 bits of the total_samples field"""
+ self.total_samples = (self.total_samples & StreamSpec.LOW_MASK) \
+ | ((high & StreamSpec.LOW_MASK) << 32)
+
+ def seconds_per_packet(self):
+ """Calculates how many seconds should be between each packet
+ transmit
+ """
+ assert self.packet_samples != 0
+ assert self.sample_rate != 0
+ return self.packet_samples / self.sample_rate
+
+ def __str__(self):
+ return "StreamSpec{{total_samples: {}, is_continuous: {}, packet_samples: {}," \
+ " sample_rate: {}, dst_epid: {}, addr: {}}}" \
+ .format(self.total_samples, self.is_continuous, self.packet_samples,
+ self.sample_rate, self.dst_epid, self.addr)
+
+def to_iter(index_func, length):
+ """Allows looping over an indexed object in a for-each loop"""
+ for i in range(length):
+ yield index_func(i)
+
+def _swap_src_dst(packet, payload):
+ """Swap the src_epid and the dst_epid of a packet"""
+ header = packet.get_header()
+ our_epid = header.dst_epid
+ header.dst_epid = payload.src_epid
+ payload.src_epid = our_epid
+ packet.set_header(header)
+
+class NodeType(IntEnum):
+ """The type of a node in a NoC Core
+
+ RTS is a magic value used to determine when to return a packet to
+ the sender
+ """
+ INVALID = 0
+ XBAR = 1
+ STRM_EP = 2
+ XPORT = 3
+ RTS = 0xFF
+
+RETURN_TO_SENDER = (0, NodeType.RTS, 0)
+
+class Node:
+ """Represents a node in a RFNoCGraph
+
+ These objects are usually constructed by the caller of RFNoC Graph.
+ Initially, they have references to other nodes as indexes of the
+ list of nodes. The graph calls graph_init and from_index so this
+ node can initialize their references to node_ids instead.
+
+ Subclasses can override _handle_<packet_type>_packet methods, and
+ _handle_default_packet is provided, which will receive packets whose
+ specific methods are not overridden
+ """
+ def __init__(self, node_inst):
+ """node_inst should start at 0 and increase for every Node of
+ the same type that is constructed
+ """
+ self.device_id = None
+ self.node_inst = node_inst
+ self.log = None
+
+ def graph_init(self, log, device_id, **kwargs):
+ """This method is called to initialize the Node Graph"""
+ self.device_id = device_id
+ self.log = log
+
+ def get_id(self):
+ """Return the NodeID (device_id, node_type, node_inst)"""
+ return (self.device_id, self.get_type(), self.node_inst)
+
+ def get_type(self):
+ """Returns the NodeType of this node"""
+ raise NotImplementedError
+
+ def handle_packet(self, packet, **kwargs):
+ """Processes a packet
+
+ The return value should be a node_id or RETURN_TO_SENDER
+ """
+ packet_type = packet.get_header().pkt_type
+ next_node = None
+ if packet_type == PacketType.MGMT:
+ next_node = self._handle_mgmt_packet(packet, **kwargs)
+ elif packet_type == PacketType.CTRL:
+ next_node = self._handle_ctrl_packet(packet, **kwargs)
+ elif packet_type in (PacketType.DATA_W_TS, PacketType.DATA_NO_TS):
+ next_node = self._handle_data_packet(packet, **kwargs)
+ elif packet_type == PacketType.STRS:
+ next_node = self._handle_strs_packet(packet, **kwargs)
+ elif packet_type == PacketType.STRC:
+ next_node = self._handle_strc_packet(packet, **kwargs)
+ else:
+ raise RuntimeError("Invalid Enum Value for PacketType: {}".format(packet_type))
+ if next_node == NotImplemented:
+ next_node = self._handle_default_packet(packet, **kwargs)
+ return next_node
+
+ def _handle_mgmt_packet(self, packet, **kwargs):
+ return NotImplemented
+
+ def _handle_ctrl_packet(self, packet, **kwargs):
+ return NotImplemented
+
+ def _handle_data_packet(self, packet, **kwargs):
+ return NotImplemented
+
+ def _handle_strc_packet(self, packet, **kwargs):
+ return NotImplemented
+
+ def _handle_strs_packet(self, packet, **kwargs):
+ return NotImplemented
+
+ def _handle_default_packet(self, packet, **kwargs):
+ raise RuntimeError("{} has no operation defined for a {} packet"
+ .format(self.__class__, packet.get_header().pkt_type))
+
+ def from_index(self, nodes):
+ """Initialize this node's indexes to block_id references"""
+ pass
+
+ def info_response(self, extended_info):
+ """Generate a node info response MgmtOp"""
+ return MgmtOp(
+ op_payload=MgmtOpNodeInfo(self.device_id, self.get_type(),
+ self.node_inst, extended_info),
+ op_code=MgmtOpCode.INFO_RESP
+ )
+
+class XportNode(Node):
+ """Represents an Xport node
+
+ When an Advertise Management op is received, the address and
+ src_epid of the packet is placed in self.addr_map
+ """
+ def __init__(self, node_inst):
+ super().__init__(node_inst)
+ self.downstream = None
+ self.addr_map = {}
+
+ def get_type(self):
+ return NodeType.XPORT
+
+ def _handle_mgmt_packet(self, packet, addr, **kwargs):
+ send_upstream = False
+ payload = packet.get_payload_mgmt()
+ our_hop = payload.pop_hop()
+ packet.set_payload(payload)
+ for op in to_iter(our_hop.get_op, our_hop.get_num_ops()):
+ if op.op_code == MgmtOpCode.INFO_REQ:
+ payload.get_hop(0).add_op(self.info_response(0))
+ elif op.op_code == MgmtOpCode.RETURN:
+ send_upstream = True
+ _swap_src_dst(packet, payload)
+ elif op.op_code == MgmtOpCode.NOP:
+ pass
+ elif op.op_code == MgmtOpCode.ADVERTISE:
+ self.log.info("Advertise: {} | EPID:{} -> EPID:{}"
+ .format(self.get_id(), payload.src_epid,
+ packet.get_header().dst_epid))
+ self.log.info("addr_map updated: EPID:{} -> {}".format(payload.src_epid, addr))
+ self.addr_map[payload.src_epid] = addr
+ else:
+ raise NotImplementedError(op.op_code)
+ self.log.trace("Xport {} processed hop:\n{}"
+ .format(self.node_inst, our_hop))
+ packet.set_payload(payload)
+ if send_upstream:
+ return RETURN_TO_SENDER
+ else:
+ return self.downstream
+
+ def _handle_default_packet(self, packet, **kwargs):
+ return self.downstream
+
+class XbarNode(Node):
+ """Represents a crossbar node
+
+ self.routing_table stores a mapping of dst_epids to xbar ports
+
+ port numbers start from 0. xports are addressed first
+ stream ep ports are then addressed where
+ the index of the first ep = len(xport_ports)
+
+ NOTE: UHD inteprets the node_inst of a xbar as the port which
+ it is connected to. This differs from its handling of node_inst
+ for other types of nodes.
+ """
+ def __init__(self, node_inst, ports, xport_ports):
+ super().__init__(node_inst)
+ self.nports = len(ports) + len(xport_ports)
+ self.nports_xport = len(xport_ports)
+ self.ports = xport_ports + ports
+ self.routing_table = {}
+
+ def get_type(self):
+ return NodeType.XBAR
+
+ def _handle_mgmt_packet(self, packet, **kwargs):
+ send_upstream = False
+ destination = None
+ payload = packet.get_payload_mgmt()
+ our_hop = payload.pop_hop()
+ for op in to_iter(our_hop.get_op, our_hop.get_num_ops()):
+ if op.op_code == MgmtOpCode.INFO_REQ:
+ payload.get_hop(0).add_op(self.info_response(
+ (self.nports_xport << 8) | self.nports))
+ elif op.op_code == MgmtOpCode.RETURN:
+ send_upstream = True
+ _swap_src_dst(packet, payload)
+ elif op.op_code == MgmtOpCode.NOP:
+ pass
+ elif op.op_code == MgmtOpCode.ADVERTISE:
+ self.log.info("Advertise: {}".format(self.get_id()))
+ elif op.op_code == MgmtOpCode.CFG_WR_REQ:
+ cfg = op.get_op_payload()
+ cfg = MgmtOpCfg.parse(cfg)
+ self.routing_table[cfg.addr] = cfg.data
+ self.log.debug("Xbar {} routing changed: {}"
+ .format(self.node_inst, self.routing_table))
+ elif op.op_code == MgmtOpCode.SEL_DEST:
+ cfg = op.get_op_payload()
+ cfg = MgmtOpSelDest.parse(cfg)
+ dest_port = cfg.dest
+ destination = self.ports[dest_port]
+ else:
+ raise NotImplementedError(op.op_code)
+ self.log.trace("Xbar {} processed hop:\n{}"
+ .format(self.node_inst, our_hop))
+ packet.set_payload(payload)
+ if send_upstream:
+ return RETURN_TO_SENDER
+ elif destination is not None:
+ return destination
+
+ def _handle_default_packet(self, packet, **kwargs):
+ dst_epid = packet.get_header().dst_epid
+ if dst_epid not in self.routing_table:
+ raise RuntimeError("Xbar no destination for packet (dst_epid: {})".format(dst_epid))
+ return self.ports[self.routing_table[dst_epid]]
+
+ def from_index(self, nodes):
+ """This iterates through the list of nodes and sets the
+ reference in self.ports to the node's id
+ """
+ for i in range(len(self.ports)):
+ nodes_index = self.ports[i]
+ node = nodes[nodes_index]
+ self.ports[i] = node.get_id()
+ if node.__class__ is XportNode:
+ node.downstream = self.get_id()
+ elif node.__class__ is StreamEndpointNode:
+ node.upstream = self.get_id()
+
+class StreamEndpointNode(Node):
+ """Represents a Stream endpoint node
+
+ This class contains a StreamEpRegs object. To clarify, management
+ packets access these registers, while control packets access the
+ registers of the noc_blocks which are held in the RFNoCGraph and
+ passed into handle_packet as the regs parameter
+ """
+ def __init__(self, node_inst):
+ super().__init__(node_inst)
+ self.epid = node_inst
+ self.dst_epid = None
+ self.upstream = None
+ self.sep_config = None
+ # These 4 aren't configurable right now
+ self.has_data = True
+ self.has_ctrl = True
+ self.input_ports = 2
+ self.output_ports = 2
+ self.ep_regs = StreamEpRegs(self.get_epid, self.set_epid, self.set_dst_epid,
+ self.status_callback_out, self.status_callback_in, 4, 4 * 8000)
+
+ def status_callback_out(self, status):
+ """Called by the ep_regs when on a write to the
+ REG_OSTRM_CTRL_STATUS register
+ """
+ if status.cfg_start:
+ # This only creates a new TxWorker if the cfg_start flag is set
+ self.log.info("Starting Stream EPID:{} -> EPID:{}".format(self.epid, self.dst_epid))
+ self.sep_config(self, True)
+ return STRM_STATUS_FC_ENABLED
+
+ def status_callback_in(self, status):
+ """Called by the ep_regs on a write to the
+ REG_OSTRM_CTRL_STATUS register
+ """
+ # This always triggers the graph to create a new RxWorker
+ self.sep_config(self, False)
+ return STRM_STATUS_FC_ENABLED
+
+ def graph_init(self, log, device_id, sep_config, **kwargs):
+ super().graph_init(log, device_id)
+ self.ep_regs.log = log
+ self.sep_config = sep_config
+
+ def get_type(self):
+ return NodeType.STRM_EP
+
+ def get_epid(self):
+ return self.epid
+
+ def set_epid(self, epid):
+ self.epid = epid
+
+ def set_dst_epid(self, dst_epid):
+ self.dst_epid = dst_epid
+
+ def _handle_mgmt_packet(self, packet, regs, **kwargs):
+ send_upstream = False
+ payload = packet.get_payload_mgmt()
+ our_hop = payload.pop_hop()
+ for op in to_iter(our_hop.get_op, our_hop.get_num_ops()):
+ if op.op_code == MgmtOpCode.INFO_REQ:
+ ext_info = 0
+ ext_info |= (1 if self.has_ctrl else 0) << 0
+ ext_info |= (1 if self.has_data else 0) << 1
+ ext_info |= (self.input_ports & 0x3F) << 2
+ ext_info |= (self.output_ports & 0x3F) << 8
+ payload.get_hop(0).add_op(self.info_response(ext_info))
+ elif op.op_code == MgmtOpCode.RETURN:
+ send_upstream = True
+ _swap_src_dst(packet, payload)
+ elif op.op_code == MgmtOpCode.NOP:
+ pass
+ elif op.op_code == MgmtOpCode.CFG_RD_REQ:
+ request = MgmtOpCfg.parse(op.get_op_payload())
+ value = self.ep_regs.read(request.addr)
+ payload.get_hop(0).add_op(MgmtOp(MgmtOpCode.CFG_RD_RESP,
+ MgmtOpCfg(request.addr, value)))
+ elif op.op_code == MgmtOpCode.CFG_WR_REQ:
+ request = MgmtOpCfg.parse(op.get_op_payload())
+ self.ep_regs.write(request.addr, request.data)
+ else:
+ raise NotImplementedError("op_code {} is not implemented for "
+ "StreamEndpointNode".format(op.op_code))
+ self.log.trace("Stream Endpoint {} processed hop:\n{}"
+ .format(self.node_inst, our_hop))
+ packet.set_payload(payload)
+ if send_upstream:
+ return RETURN_TO_SENDER
+
+ def _handle_ctrl_packet(self, packet, regs, **kwargs):
+ payload = packet.get_payload_ctrl()
+ _swap_src_dst(packet, payload)
+ if payload.status != CtrlStatus.OKAY:
+ raise RuntimeError("Control Status not OK: {}".format(payload.status))
+ if payload.op_code == CtrlOpCode.READ:
+ payload.is_ack = True
+ payload.set_data([regs.read(payload.address)])
+ elif payload.op_code == CtrlOpCode.WRITE:
+ payload.is_ack = True
+ regs.write(payload.address, payload.get_data()[0])
+ else:
+ raise NotImplementedError("Unknown Control OpCode: {}".format(payload.op_code))
+ packet.set_payload(payload)
+ return RETURN_TO_SENDER
+
+class RFNoCGraph:
+ """This class holds all of the nodes of the NoC core and the Noc
+ blocks.
+
+ It serves as an interface between the ChdrSniffer and the
+ individual blocks/nodes.
+ """
+ def __init__(self, graph_list, log, device_id, create_tx_func,
+ stop_tx_func, send_strc_func, create_rx_func):
+ self.log = log.getChild("Graph")
+ self.send_strc = send_strc_func
+ self.device_id = device_id
+ self.stream_spec = StreamSpec()
+ self.create_tx = create_tx_func
+ self.stop_tx = stop_tx_func
+ self.create_rx = create_rx_func
+ num_stream_ep = 0
+ for node in graph_list:
+ if node.__class__ is StreamEndpointNode:
+ num_stream_ep += 1
+ node.graph_init(self.log, device_id, sep_config=self.prepare_stream_ep)
+ # These must be done sequentially so that device_id is initialized on all nodes
+ # before from_index is called on any node
+ for node in graph_list:
+ node.from_index(graph_list)
+ self.graph_map = {node.get_id(): node
+ for node in graph_list}
+ # For now, just use one radio block and hardcode it to the first stream endpoint
+ radio = NocBlock(1 << 16, 2, 2, 512, 1, 0x12AD1000, 16)
+ adj_list = [
+ (StreamEndpointPort(0, 0), NocBlockPort(0, 0)),
+ (StreamEndpointPort(0, 1), NocBlockPort(0, 1)),
+ (NocBlockPort(0, 0), StreamEndpointPort(0, 0)),
+ (NocBlockPort(0, 1), StreamEndpointPort(0, 1))
+ ]
+ self.regs = NocBlockRegs(self.log, 1 << 16, True, 1, [radio], num_stream_ep, 1, 0xE320,
+ adj_list, 8, 1, self.get_stream_spec, self.radio_tx_cmd,
+ self.radio_tx_stop)
+
+ def radio_tx_cmd(self, sep_block_id):
+ """Triggers the creation of a TxWorker in the ChdrSniffer using
+ the current stream_spec.
+
+ This method transforms the sep_block_id into an epid useable by
+ the transmit code
+ """
+ # TODO: Use the port
+ sep_blk, sep_port = sep_block_id
+ # The NoC Block index for stream endpoints is the inst + 1
+ # See rfnoc_graph.cpp:rfnoc_graph_impl#_init_sep_map()
+ sep_inst = sep_blk - 1
+ sep_id = (self.get_device_id(), NodeType.STRM_EP, sep_inst)
+ stream_ep = self.graph_map[sep_id]
+ self.stream_spec.dst_epid = stream_ep.dst_epid
+ self.stream_spec.addr = self.dst_to_addr(stream_ep)
+ self.log.info("Streaming with StreamSpec:")
+ self.log.info(str(self.stream_spec))
+ self.create_tx(stream_ep.epid, self.stream_spec)
+ self.stream_spec = StreamSpec()
+
+ def radio_tx_stop(self, sep_block_id):
+ """Triggers the destuction of a TxWorker in the ChdrSniffer
+
+ This method transforms the sep_block_id into an epid useable by
+ the transmit code
+ """
+ sep_blk, sep_port = sep_block_id
+ sep_inst = sep_blk - 1
+ # The NoC Block index for stream endpoints is the inst + 1
+ # See rfnoc_graph.cpp:rfnoc_graph_impl#_init_sep_map()
+ sep_id = (self.get_device_id(), NodeType.STRM_EP, sep_inst)
+ src_epid = self.graph_map[sep_id].epid
+ self.stop_tx(src_epid)
+
+ def get_device_id(self):
+ return self.device_id
+
+ def prepare_stream_ep(self, stream_ep, is_tx):
+ """This is called by a stream_ep when it receives a status update
+
+ Depending on whether it was a tx or rx status, it will either
+ trigger an strc packet or an rx stream
+ """
+ if is_tx:
+ addr = self.dst_to_addr(stream_ep)
+ self.send_strc(stream_ep, addr)
+ else:
+ self.create_rx(stream_ep.epid)
+
+ def change_spp(self, spp):
+ self.stream_spec.packet_samples = spp
+
+ def find_ep_by_id(self, epid):
+ """Find a Stream Endpoint which identifies with epid"""
+ for node in self.graph_map.values():
+ if node.__class__ is StreamEndpointNode:
+ if node.epid == epid:
+ return node
+
+ # Fixme: This doesn't support intra-device connections
+ # i.e. connecting nodes by connecting two internal stream endpoints
+ #
+ # That would require looking at the adjacency list if we end up at
+ # another stream endpoint instead of an xport
+ def dst_to_addr(self, src_ep):
+ """This function traverses backwards through the node graph,
+ starting from src_ep and taking the path traveled by a packet
+ heading for src_ep.dst_epid. When it encounters an xport, it
+ returns the address associated with the dst_epid in the xport's
+ addr_map
+ """
+ current_node = src_ep
+ dst_epid = src_ep.dst_epid
+ while current_node.__class__ != XportNode:
+ if current_node.__class__ == StreamEndpointNode:
+ current_node = self.graph_map[current_node.upstream]
+ else:
+ # current_node is a xbar
+ port = current_node.routing_table[dst_epid]
+ upstream_id = current_node.ports[port]
+ current_node = self.graph_map[upstream_id]
+ return current_node.addr_map[dst_epid]
+
+ def handle_packet(self, packet, xport_input, addr):
+ """Given a chdr_packet, the id of an xport node to serve as an
+ entry point, and a source address, send the packet through the
+ node graph.
+ """
+ node_id = xport_input
+ response_packet = None
+ while node_id is not None:
+ if node_id[1] == NodeType.RTS:
+ response_packet = packet
+ break
+ node = self.graph_map[node_id]
+ # If the node returns a value, it is the node id of the
+ # node the packet should be passed to next
+ # or RETURN_TO_SENDER
+ node_id = node.handle_packet(packet, regs=self.regs, addr=addr)
+ return response_packet
+
+ def get_stream_spec(self):
+ return self.stream_spec