Source code for PyRDF.backend.Spark

from __future__ import print_function
from PyRDF.backend.Dist import Dist
from PyRDF.backend.Utils import Utils
from pyspark import SparkConf, SparkContext
from pyspark import SparkFiles
import ntpath  # Filename from path (should be platform-independent)


[docs]class Spark(Dist): """ Backend that executes the computational graph using using `Spark` framework for distributed execution. """ MIN_NPARTITIONS = 2
[docs] def __init__(self, config={}): """ Creates an instance of the Spark backend class. Args: config (dict, optional): The config options for Spark backend. The default value is an empty Python dictionary :obj:`{}`. :obj:`config` should be a dictionary of Spark configuration options and their values with :obj:'npartitions' as the only allowed extra parameter. Example:: config = { 'npartitions':20, 'spark.master':'myMasterURL', 'spark.executor.instances':10, 'spark.app.name':'mySparkAppName' } Note: If a SparkContext is already set in the current environment, the Spark configuration parameters from :obj:'config' will be ignored and the already existing SparkContext would be used. """ super(Spark, self).__init__(config) sparkConf = SparkConf().setAll(config.items()) self.sparkContext = SparkContext.getOrCreate(sparkConf) # Set the value of 'npartitions' if it doesn't exist self.npartitions = self._get_partitions()
def _get_partitions(self): npart = (self.npartitions or self.sparkContext.getConf().get('spark.executor.instances') or Spark.MIN_NPARTITIONS) # getConf().get('spark.executor.instances') could return a string return int(npart)
[docs] def ProcessAndMerge(self, mapper, reducer): """ Performs map-reduce using Spark framework. Args: mapper (function): A function that runs the computational graph and returns a list of values. reducer (function): A function that merges two lists that were returned by the mapper. Returns: list: A list representing the values of action nodes returned after computation (Map-Reduce). """ from PyRDF import includes_headers from PyRDF import includes_shared_libraries def spark_mapper(current_range): """ Gets the paths to the file(s) in the current executor, then declares the headers found. Args: current_range (tuple): A pair that contains the starting and ending values of the current range. Returns: function: The map function to be executed on each executor, complete with all headers needed for the analysis. """ # Get and declare headers on each worker headers_on_executor = [ SparkFiles.get(ntpath.basename(filepath)) for filepath in includes_headers ] Utils.declare_headers(headers_on_executor) # Get and declare shared libraries on each worker shared_libs_on_ex = [ SparkFiles.get(ntpath.basename(filepath)) for filepath in includes_shared_libraries ] Utils.declare_shared_libraries(shared_libs_on_ex) return mapper(current_range) ranges = self.build_ranges() # Get range pairs # Build parallel collection sc = self.sparkContext parallel_collection = sc.parallelize(ranges, self.npartitions) # Map-Reduce using Spark return parallel_collection.map(spark_mapper).treeReduce(reducer)
[docs] def distribute_files(self, includes_list): """ Spark supports sending files to the executors via the `SparkContext.addFile` method. This method receives in input the path to the file (relative to the path of the current python session). The file is initially added to the Spark driver and then sent to the workers when they are initialized. Args: includes_list (list): A list consisting of all necessary C++ files as strings, created one of the `include` functions of the PyRDF API. """ for filepath in includes_list: self.sparkContext.addFile(filepath)