Source code for PyRDF.backend.Dist

from __future__ import print_function
import PyRDF
from PyRDF.backend.Backend import Backend
from abc import abstractmethod
import glob
import warnings
import ROOT
import numpy


[docs]class Range(object): """ Base class to represent ranges. A range represents a logical partition of the entries of a chain and is the basis for parallelization. First entry of the range (start) is inclusive while the second one is not (end). """
[docs] def __init__(self, start, end, filelist=None, friend_info=None): """ Create an instance of a Range Args: start (int): First entry of the range. end (int): Last entry of the range, which is exclusive. filelist (list, optional): Files where the range of entries belongs to. """ self.start = start self.end = end self.filelist = filelist self.friend_info = friend_info
[docs] def __repr__(self): """Return a string representation of the range composition.""" if self.filelist: return ("(" + str(self.start) + "," + str(self.end) + "), " + str(self.filelist)) else: return "(" + str(self.start) + "," + str(self.end) + ")"
[docs]class FriendInfo(object): """ A simple class to hold information about friend trees. Attributes: friend_names (list): A list with the names of the `ROOT.TTree` objects which are friends of the main `ROOT.TTree`. friend_file_names (list): A list with the paths to the files corresponding to the trees in the `friend_names` attribute. Each element of `friend_names` can correspond to multiple file names. """
[docs] def __init__(self, friend_names=[], friend_file_names=[]): """ Create an instance of FriendInfo Args: friend_names (list): A list containing the treenames of the friend trees. friend_file_names (list): A list containing the file names corresponding to a given treename in friend_names. Each treename can correspond to multiple file names. """ self.friend_names = friend_names self.friend_file_names = friend_file_names
[docs] def __bool__(self): """ Define the behaviour of FriendInfo instance when boolean evaluated. Both lists have to be non-empty in order to return True. Returns: bool: True if both lists are non-empty, False otherwise. """ return bool(self.friend_names) and bool(self.friend_file_names)
[docs] def __nonzero__(self): """ Python 2 dunder method for __bool__. Kept for compatibility. """ return self.__bool__()
[docs]class Dist(Backend): """ Base class for implementing all distributed backends. Attributes: npartitions (int): The number of chunks to divide the dataset in, each chunk is then processed in parallel. supported_operations (list): list of supported RDataFrame operations in a distributed environment. friend_info (PyRDF.Dist.FriendInfo): A class instance that holds information about any friend trees of the main ROOT.TTree """
[docs] def __init__(self, config={}): """ Creates an instance of Dist. Args: config (dict, optional): The config options for the current distributed backend. Default value is an empty python dictionary: :obj:`{}`. """ super(Dist, self).__init__(config) # Operations that aren't supported in distributed backends operations_not_supported = [ 'Mean', 'Max', 'Min', 'Range', 'Take', 'Foreach', 'Reduce', 'Report', 'Aggregate' ] # Remove the value of 'npartitions' from config dict self.npartitions = config.pop('npartitions', None) self.supported_operations = [op for op in self.supported_operations if op not in operations_not_supported] self.friend_info = FriendInfo()
[docs] def get_clusters(self, treename, filelist): """ Extract a list of cluster boundaries for the given tree and files Args: treename (str): Name of the TTree split into one or more files. filelist (list): List of one or more ROOT files. Returns: list: List of tuples defining the cluster boundaries. Each tuple contains four elements: first entry of a cluster, last entry of cluster, offset of the cluster and file where the cluster belongs to. """ import ROOT clusters = [] offset = 0 for filename in filelist: f = ROOT.TFile.Open(str(filename)) t = f.Get(treename) entries = t.GetEntriesFast() it = t.GetClusterIterator(0) start = it() end = 0 while start < entries: end = it() cluster = (start + offset, end + offset, offset, filename) clusters.append(cluster) start = end offset += entries return clusters
def _get_balanced_ranges(self, nentries): """ Builds range pairs from the given values of the number of entries in the dataset and number of partitions required. Each range contains the same amount of entries, except for those cases where the number of entries is not a multiple of the partitions. Args: nentries (int): The number of entries in a dataset. Returns: list: List of :obj:`Range`s objects. """ partition_size = int(nentries / self.npartitions) i = 0 # Iterator ranges = [] remainder = nentries % self.npartitions while i < nentries: # Start value of current range start = i end = i = start + partition_size if remainder: # If the modulo value is not # exhausted, add '1' to the end # of the current range end = i = end + 1 remainder -= 1 ranges.append(Range(start, end)) return ranges def _get_clustered_ranges(self, nentries, treename, filelist, friend_info=FriendInfo()): """ Builds range pairs taking into account the clusters of the dataset. Args: nentries (int): The number of entries in a dataset. treename (str): Name of the tree. filelist (list): List of ROOT files. friend_info (FriendInfo): Information about friend trees. Returns: list: List of :obj:`Range`s objects. """ clusters = self.get_clusters(treename, filelist) numclusters = len(clusters) # Restrict 'npartitions' if it's greater # than number of clusters in the filelist if self.npartitions > numclusters: msg = ("Number of partitions is greater than number of clusters" "in the filelist") msg += "\nUsing {} partition(s)".format(numclusters) warnings.warn(msg, UserWarning, stacklevel=2) self.npartitions = numclusters partSize = numclusters // self.npartitions remainder = numclusters % self.npartitions i = 0 # Iterator ranges = [] entries_to_process = 0 while i < numclusters: index_start = i start = clusters[i][0] i = i + partSize if remainder > 0: i += 1 remainder -= 1 index_end = i if i == numclusters: end = clusters[-1][1] else: end = clusters[i - 1][1] range_files = [] for idx in range(index_start, index_end): current_file = clusters[idx][3] if range_files and range_files[-1] == current_file: continue range_files.append(clusters[idx][3]) offset_first_cluster = clusters[index_start][2] ranges.append(Range(start - offset_first_cluster, end - offset_first_cluster, range_files, friend_info)) entries_to_process += (end - start) return ranges def _get_filelist(self, files): """ Convert single file into list of files and expand globbing Args: files (str, list): String containing name of a single file or list with several file names, both cases may contain globbing characters. Returns: list: list of file names. """ if isinstance(files, str): # Expand globbing excluding remote files remote_prefixes = ("root:", "http:", "https:") if not files.startswith(remote_prefixes): files = glob.glob(files) else: # Convert single file into a filelist files = [files, ] return files
[docs] def build_ranges(self): """ Define two type of ranges based on the arguments passed to the RDataFrame head node. """ if self.npartitions > self.nentries: # Restrict 'npartitions' if it's greater # than 'nentries' self.npartitions = self.nentries if self.treename and self.files: filelist = self._get_filelist(self.files) return self._get_clustered_ranges(self.nentries, self.treename, filelist, self.friend_info) else: return self._get_balanced_ranges(self.nentries)
def _get_friend_info(self, tree): """ Retrieve friend tree names and filenames of a given `ROOT.TTree` object. Args: tree (ROOT.TTree): the ROOT.TTree instance used as an argument to PyRDF.RDataFrame(). ROOT.TChain inherits from ROOT.TTree so it is a valid argument too. Returns: (FriendInfo): A FriendInfo instance with two lists as variables. The first list holds the names of the friend tree(s), the second list holds the file names of each of the trees in the first list, each tree name can correspond to multiple file names. """ friend_names = [] friend_file_names = [] # Get a list of ROOT.TFriendElement objects friends = tree.GetListOfFriends() if not friends: # RDataFrame may have been created with a TTree without # friend trees. return FriendInfo() for friend in friends: friend_tree = friend.GetTree() # ROOT.TTree real_name = friend_tree.GetName() # Treename as string # TChain inherits from TTree if isinstance(friend_tree, ROOT.TChain): cur_friend_files = [ # The title of a TFile is the file name chain_file.GetTitle() for chain_file # Get a list of ROOT.TFile objects in friend_tree.GetListOfFiles() ] else: cur_friend_files = [ friend_tree. GetCurrentFile(). # ROOT.TFile GetName() # Filename as string ] friend_file_names.append(cur_friend_files) friend_names.append(real_name) return FriendInfo(friend_names, friend_file_names)
[docs] def execute(self, generator): """ Executes the current RDataFrame graph in the given distributed environment. Args: generator (PyRDF.CallableGenerator): An instance of :obj:`CallableGenerator` that is responsible for generating the callable function. """ callable_function = generator.get_callable() # Arguments needed to create PyROOT RDF object rdf_args = generator.head_node.args treename = generator.head_node.get_treename() selected_branches = generator.head_node.get_branches() # Avoid having references to the instance inside the mapper initialization = Backend.initialization def mapper(current_range): """ Triggers the event-loop and executes all nodes in the computational graph using the callable. Args: current_range (tuple): A pair that contains the starting and ending values of the current range. Returns: list: This respresents the list of values of all action nodes in the computational graph. """ import ROOT # We have to decide whether to do this in Dist or in subclasses # Utils.declare_headers(worker_includes) # Declare headers if any # Run initialization method to prepare the worker runtime # environment initialization() # Build rdf start = int(current_range.start) end = int(current_range.end) if treename: # Build TChain of files for this range: chain = ROOT.TChain(treename) for f in current_range.filelist: chain.Add(str(f)) # We assume 'end' is exclusive chain.SetCacheEntryRange(start, end) # Gather information about friend trees friend_info = current_range.friend_info if friend_info: # Zip together the treenames of the friend trees and the # respective file names. Each friend treename can have # multiple corresponding friend file names. tree_files_names = zip( friend_info.friend_names, friend_info.friend_file_names ) for friend_treename, friend_filenames in tree_files_names: # Start a TChain with the current friend treename friend_chain = ROOT.TChain(friend_treename) # Add each corresponding file to the TChain for filename in friend_filenames: friend_chain.Add(filename) # Set cache on the same range as the parent TChain friend_chain.SetCacheEntryRange(start, end) # Finally add friend TChain to the parent chain.AddFriend(friend_chain) if selected_branches: rdf = ROOT.ROOT.RDataFrame(chain, selected_branches) else: rdf = ROOT.ROOT.RDataFrame(chain) else: rdf = ROOT.ROOT.RDataFrame(*rdf_args) # PyROOT RDF object # # TODO : If we want to run multi-threaded in a Spark node in # # the future, use `TEntryList` instead of `Range` # rdf_range = rdf.Range(current_range.start, current_range.end) # Output of the callable output = callable_function(rdf, rdf_range=current_range) for i in range(len(output)): # `AsNumpy` and `Snapshot` return respectively `dict` and `list` # that don't have the `GetValue` method. if isinstance(output[i], dict): # Fix class name to 'ndarray' to avoid issues with # Pickle protocol 2 for value in output[i].values(): value.__class__.__name__ = "ndarray" continue if isinstance(output[i], list): continue # FIX ME : RResultPtrs aren't serializable, # because of which we have to manually find # out the types here and copy construct the # values. # The type of the value of the action node value_type = type(output[i].GetValue()) # The `value_type` is required here because, # after a call to `GetValue`, the values die # along with the RResultPtrs output[i] = value_type(output[i].GetValue()) return output def reducer(values_list1, values_list2): """ Merges two given lists of values that were returned by the mapper function for two different ranges. Args: values_list1 (list): A list of computed values for a given entry range in a dataset. values_list2 (list): A list of computed values for a given entry range in a dataset. Returns: list: This is a list of values obtained after merging two given lists. """ import ROOT for i in range(len(values_list1)): # A bunch of if-else conditions to merge two values # Create a global list with all the files of the partial # snapshots if isinstance(values_list1[i], list): values_list1[i].extend(values_list2[i]) elif isinstance(values_list1[i], dict): combined = { key: numpy.concatenate([values_list1[i][key], values_list2[i][key]]) for key in values_list1[i] } values_list1[i] = combined elif (isinstance(values_list1[i], ROOT.TH1) or isinstance(values_list1[i], ROOT.TH2)): # Merging two objects of type ROOT.TH1D or ROOT.TH2D values_list1[i].Add(values_list2[i]) elif isinstance(values_list1[i], ROOT.TGraph): # Prepare a TList tlist = ROOT.TList() tlist.Add(values_list2[i]) # Merge the second graph onto the first num_points = values_list1[i].Merge(tlist) # Check if there was an error in merging if num_points == -1: msg = "Error reducing two result values of type TGraph!" raise Exception(msg) elif isinstance(values_list1[i], float): # Adding values resulting from a Sum() operation # Sum() always returns a float in python values_list1[i] += values_list2[i] elif (isinstance(values_list1[i], int) or isinstance(values_list1[i], long)): # noqa: Python 2 # Adding values resulting from a Count() operation values_list1[i] += values_list2[i] else: msg = ("Type \"{}\" is not supported by the reducer yet!" .format(type(values_list1[i]))) raise NotImplementedError(msg) return values_list1 # Get number of entries in the input dataset using # arguments passed to RDataFrame constructor self.nentries = generator.head_node.get_num_entries() # Retrieve the treename used to initialize the RDataFrame self.treename = generator.head_node.get_treename() # Retrieve the filenames used to initialize the RDataFrame self.files = generator.head_node.get_inputfiles() # Retrieve the ROOT.TTree instance used to initialize the RDataFrame self.tree = generator.head_node.get_tree() # Retrieve info about the friend trees if self.tree: self.friend_info = self._get_friend_info(self.tree) if not self.nentries: # Fall back to local execution # if 'nentries' is '0' msg = ("No entries in the Tree, falling back to local execution!") warnings.warn(msg, UserWarning, stacklevel=2) PyRDF.use("local") from .. import current_backend return current_backend.execute(generator) # Values produced after Map-Reduce values = self.ProcessAndMerge(mapper, reducer) # List of action nodes in the same order as values nodes = generator.get_action_nodes() # Set the value of every action node for node, value in zip(nodes, values): if node.operation.name == "Snapshot": # Retrieve treename from operation args and start TChain snapshot_treename = node.operation.args[0] snapshot_chain = ROOT.TChain(snapshot_treename) # Add partial snapshot files to the chain for filename in value: snapshot_chain.Add(filename) # Create a new rdf with the chain and return that to user snapshot_rdf = PyRDF.RDataFrame(snapshot_chain) node.value = snapshot_rdf else: node.value = value
[docs] @abstractmethod def ProcessAndMerge(self, mapper, reducer): """ Subclasses must define how to run map-reduce functions on a given backend. """ pass
[docs] @abstractmethod def distribute_files(self, includes_list): """ Subclasses must define how to send all files needed for the analysis (like headers and libraries) to the workers. """ pass