Source code for PyRDF.CallableGenerator

[docs]class CallableGenerator(object): """ Class that generates a callable to parse a PyRDF graph. Attributes: head_node: Head node of a PyRDF graph. """
[docs] def __init__(self, head_node): """ Creates a new `CallableGenerator`. Args: head_node: Head node of a PyRDF graph. """ self.head_node = head_node
[docs] def get_action_nodes(self, node_py=None): """ Recurses through PyRDF graph and collects the PyRDF node objects. Args: node_py (optional): The current state's PyRDF node. If `None`, it takes the value of `self.head_node`. Returns: list: A list of the action nodes of the graph in DFS order, which coincides with the order of execution in the callable function. """ return_nodes = [] if not node_py: # In the first recursive state, just set the # current PyRDF node as the head node node_py = self.head_node else: if (node_py.operation.is_action() or node_py.operation.is_instant_action()): # Collect all action nodes in order to return them return_nodes.append(node_py) for n in node_py.children: # Recurse through children and collect them prev_nodes = self.get_action_nodes(n) # Attach the children nodes return_nodes.extend(prev_nodes) return return_nodes
[docs] def get_callable(self): """ Converts a given graph into a callable and returns the same. Returns: function: The callable that takes in a PyROOT RDataFrame object and executes all operations from the PyRDF graph on it, recursively. """ # Prune the graph to check user references self.head_node.graph_prune() def mapper(node_cpp, node_py=None, rdf_range=None): """ The callable that recurses through the PyRDF nodes and executes operations from a starting (PyROOT) RDF node. Args: node_cpp: The current state's ROOT CPP node. Initially this should be given in as a PyROOT RDataFrame object. node_py (optional): The current state's PyRDF node. If `None`, it takes the value of `self.head_node`. rdf_range (optional): The current range of the RDataFrame to run the analysis on. This is an helper parameter for the analysis in a distributed environment. Returns: list: A list of :obj:`ROOT.RResultPtr` objects in DFS order of their corresponding actions in the graph. """ return_vals = [] if rdf_range: parent_node = node_cpp.Range(rdf_range.start, rdf_range.end) else: parent_node = node_cpp if not node_py: # In the first recursive state, just set the # current PyRDF node as the head node node_py = self.head_node else: # Execute the current operation using the output of the parent # node (node_cpp) RDFOperation = getattr(node_cpp, node_py.operation.name) operation = node_py.operation if rdf_range and operation.name == "Snapshot": # Retrieve filename and append range boundaries filename = operation.args[1].partition(".root")[0] start = str(rdf_range.start) end = str(rdf_range.end - 1) path_with_range = "{}_{}_{}.root".format(filename, start, end) # Create a partial snapshot on the current range operation.args[1] = path_with_range pyroot_node = RDFOperation(*operation.args, **operation.kwargs) # The result is a pyroot object which is stored together with # the pyrdf node. This binds the pyroot object lifetime to the # pyrdf node, so both nodes will be kept alive as long as there # is a valid reference poiting to the pyrdf node. node_py.pyroot_node = pyroot_node # The new pyroot_node becomes the parent_node for the next # recursive call parent_node = pyroot_node if (node_py.operation.is_action() or node_py.operation.is_instant_action()): # Collect all action nodes in order to return them # If it's a distributed snapshot return only path to # the file with the partial snapshot if rdf_range and operation.name == "Snapshot": return_vals.append([path_with_range]) else: return_vals.append(pyroot_node) for n in node_py.children: # Recurse through children and get their output prev_vals = mapper(parent_node, node_py=n, rdf_range=rdf_range) # Attach the output of the children node return_vals.extend(prev_vals) return return_vals return mapper