from PyRDF.RDataFrame import RDataFrame # noqa
from PyRDF.RDataFrame import RDataFrameException # noqa
from PyRDF.CallableGenerator import CallableGenerator # noqa
from PyRDF.backend.Local import Local
from PyRDF.backend.Backend import Backend
from PyRDF.backend.Utils import Utils
import os
import logging
import sys
from pyspark import SparkContext
current_backend = Local()
includes_headers = set() # All headers included in the analysis
includes_shared_libraries = set() # All shared libraries included
includes_files = set() # All other generic files included
logger = logging.getLogger(__name__)
[docs]def create_logger(level="WARNING", log_path="./PyRDF.log"):
"""PyRDF basic logger"""
logger = logging.getLogger(__name__)
level = getattr(logging, level)
logger.setLevel(level)
format_string = ("%(levelname)s: %(name)s[%(asctime)s]: %(message)s")
formatter = logging.Formatter(format_string)
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
if log_path:
file_handler = logging.FileHandler(log_path)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
[docs]def use(backend_name, conf={}):
"""
Allows the user to choose the execution backend.
Args:
backend_name (str): This is the name of the chosen backend.
conf (str, optional): This should be a dictionary with
necessary configuration parameters. Its default value is an empty
dictionary {}.
"""
future_backends = [
"dask"
]
global current_backend
if backend_name in future_backends:
msg = "This backend environment will be considered in the future !"
raise NotImplementedError(msg)
elif backend_name == "local":
current_backend = Local(conf)
# Retrieve current Spark context if present and stop it
cur_context = SparkContext.getOrCreate()
cur_context.stop()
elif backend_name == "spark":
from PyRDF.backend.Spark import Spark
current_backend = Spark(conf)
else:
msg = "Incorrect backend environment \"{}\"".format(backend_name)
raise Exception(msg)
def _get_paths_set_from_string(path_string):
"""
Retrieves paths to files (directory or single file) from a string.
Args:
path_string (str): The string to the path of the file or directory
to be recursively searched for files.
Returns:
set: The set with all paths returned from the directory, or a set
with only the path of the string.
"""
logger.debug("Retrieving paths from {}".format(path_string))
if os.path.isdir(path_string):
# Create a set with all the headers in the directory
paths_set = {
os.path.join(rootpath, filename)
for rootpath, dirs, filenames
in os.walk(path_string)
for filename
in filenames
}
logger.debug("\nInitial path: {} \nPaths retrieved: {}".format(
path_string,
paths_set
))
return paths_set
elif os.path.isfile(path_string):
# Convert to set if this is a string
logger.debug("File path retrieved: {}".format(path_string))
return {path_string}
def _check_pcm_in_library_path(shared_library_path):
"""
Retrieves paths to shared libraries and pcm file(s) in a directory.
Args:
shared_library_path (str): The string to the path of the file or
directory to be recursively searched for files.
Returns:
list, list: Two lists, the first with all paths to pcm files, the
second with all paths to shared libraries.
"""
all_paths = _get_paths_set_from_string(
shared_library_path
)
pcm_paths = {
filepath
for filepath in all_paths
if filepath.endswith(".pcm")
}
shared_library_formats = (".so", ".dll", ".dylib")
libraries_path = {
filepath
for filepath in all_paths
if filepath.endswith(shared_library_formats)
}
return pcm_paths, libraries_path
[docs]def include_shared_libraries(shared_libraries_paths):
"""
Includes the C++ shared libraries to be declared before execution.
Each library is also declared on the current running session. If any pcm
file is present in the same folder as the shared libraries, the function
will try to retrieve them (and distribute them if working on a distributed
backend).
Args:
shared_libraries_paths (str, iter): A string or an iterable (such as a
list, set...) containing the paths to all necessary C++ shared
libraries as strings. This function accepts both paths to the
libraries themselves and paths to directories containing the
libraries.
"""
global current_backend, includes_shared_libraries
libraries_to_include = set()
pcm_to_include = set()
if isinstance(shared_libraries_paths, str):
pcm_to_include, libraries_to_include = _check_pcm_in_library_path(
shared_libraries_paths
)
else:
for path_string in shared_libraries_paths:
pcm, libraries = _check_pcm_in_library_path(
path_string
)
libraries_to_include.update(libraries)
pcm_to_include.update(pcm)
# If not on the local backend, distribute files to executors
if not isinstance(current_backend, Local):
current_backend.distribute_files(libraries_to_include)
current_backend.distribute_files(pcm_to_include)
# Declare the shared libraries in ROOT
Utils.declare_shared_libraries(libraries_to_include)
# Finally, add everything to the includes set
includes_shared_libraries.update(includes_shared_libraries)
[docs]def send_generic_files(files_paths):
"""
Sends to the workers the generic files needed by the user.
Args:
files_paths (str, iter): Paths to the files to be sent to the
distributed workers.
"""
global current_backend, includes_files
files_to_include = set()
if isinstance(files_paths, str):
files_to_include.update(_get_paths_set_from_string(files_paths))
else:
for path_string in files_paths:
files_to_include.update(_get_paths_set_from_string(path_string))
# If not on the local backend, distribute files to executors
if not isinstance(current_backend, Local):
current_backend.distribute_files(files_to_include)
[docs]def initialize(fun, *args, **kwargs):
"""
Set a function that will be executed as a first step on every backend before
any other operation. This method also executes the function on the current
user environment so changes are visible on the running session.
This allows users to inject and execute custom code on the worker
environment without being part of the RDataFrame computational graph.
Args:
fun (function): Function to be executed.
*args (list): Variable length argument list used to execute the
function.
**kwargs (dict): Keyword arguments used to execute the function.
"""
Backend.register_initialization(fun, *args, **kwargs)