Back to Contents

Spark



Short-comings of Map Reduce

  • Force the data analysis workflow into map and reduce functions, i.e. join, map-reduce-map etc.

  • Relies on reading data from disk for each MapReduce Job.

  • Only native JAVA programming interface



Solution

  • in-memory cashing of data, specified by the user

  • Native Python, Sacala, R interfaces

    Spark Performance



Architecture of Spark

  • Spark Architecture Diagrams:

    Spark Architecture

    Spark Architecture

    Spark Architecture

    Spark Architecture

  • Start pyspark

    PYSPARK_DRIVER_PYTHON=ipython pyspark
    



Resilient Distributed Dataset

  • Dataset -> Data storage created from HDFS, S3, HBase, JSON, Text, Local hierarchy of folders, or created transforming another RDD

  • Distributed -> Distributed across cluster of machines; Divided in partitions, atomic chunks of data

  • Resilient -> Recover from errors, e.g. node failures, slow processes; Track histrory of each partition, re-run

    integer_RDD = sc.parallelize(range(10), 3)
    #3 -> number of partitions

    integer_RDD.collect()
    integer_RDD.glom().collect() #to know how data is partitioned
  • Read Text into spark
    #from local filesystem
    text_RDD = sc.textFile("file:///home/cloudera/testfile1")

    #from HDFS
    text_RDD = dc.textFile("/user/cloudera/input/testfile1")
  • Wordcount in Spark
    def split_words(line):
        return line.split() 

    def create_pair(word):
        return (word,1)

    pairs_RDD=text_RDD.flatMap(split_words).map(create_pair)
    pairs_RDD.collect()

    def sum_counts(a,b):
        return a+b

    wordcounts_RDD = pairs_RDD.reduceByKey(sum_counts)
    wordcounts_RDD.collect()



Transformation

  • RDD are immutable
  • Never modify RDD in place
  • Transform RDD to another RDD
  • Lazy --> When transformation actually happens in the final step, in intermediate step nothing actually happens

    text_RDD - sc.textFile("file:///home/cloudera/testfile1")
    def lower(line):
        return line.lower()
    
    lower_text_RDD = text_RDD.map(lower)
    
  • map: apply function to each element of RDD

  • comutations are all local on each node
  • other transformations:
    • flatMap(func) - map then flatten out output
    • filter(func) - keep only elements where func is true
    • sample(withReplacement,fraction, seed ) - get a random data fraction
    • coalesce(numPartitions) - to reduce them to numPartition
  • flatMap : any number of output per input def split_words(line): return line.split()

    words_RDD = text_RDD.flatMap(split_words)
    words_RDD.collect()
    
  • filter : def starts_with_a(word): return word.lower().startswith("a")

    words_RDD.filter(starts_with_a).collect
    
    • coalesce sc.parallelize(range(10),4).glom().collect() sc.parallelize(range(10),4).coalesce().glom().collect()
  • Transformations of (K,V) pairs def create_pair(word): return (word,1)

    pairs_RDD - text_RDD.flatMap(split_words).map(create_pair)
    pairs_RDD.collect()
    pairs_RDD.groupByKey().collect()
    
  • groupByKey : (K,V) pairs => (K, iterable of all V) for k,v in pairs_RDD.groupByKey().collect(): print "Key : ", k, ", Values : ", list(v)

  • groupByKey is wide transformation unlike other discussed above which were narrow operations

  • wide transformations require som data to be transferred from one node to another while in narrow transformations all operations occur locally

  • reduceByKey(func) : (K,V) pairs => (K, result of reduction by func on all V)

  • repartition(numPartitions) : similar to coalesce, shuffles all data to increase or descrese number of partitions to numPartitions



Directed Acyclic graph schedular

  • keeps dependencies

  • tracks lineage

  • recovers data by tracing back the execution flow and re-executing required jobs

  • As we write the tasks, spark makes a DAC and when we write an action, it runs all the tasks in the graph in a given order

  • Actions:

    • collect() - copy all elements to the driver
    • take(n) - copy first n elements
    • reduce(func) - aggregate elements with func
    • saveAsTextFile(filename) - save to local file or in HDFS



Shared Variables

  • Shared Variables -

    1. Broadcast Variables
    2. Accumulators
  • BroadCast Variables:

    • Large variables used in all nodes
    • Transfer just once per executor
    • Efficient peer-to-peer transfer

      config = sc.broadcast({"order":3,"filter":True}) config.value

  • Accumulator

    • Common pattern of accumulating to a variable across the cluster
    • Write only nodes

      accum = ac.accumulator(0)

      def test_accum(x): accum.add(x)

      sc.parallelize([1,2,3,4]).foreach(test_accum)

      accum.value



Dataframes

  • new API released in February 2015

  • Similar to table in mySQL or a Data Frame in R

  • Python DataFrames are up to 5 times faster than python RDDs

  • Support for: Hive, HDFS, MySQL, PostgreSQL, JSON, S3, Parquet, OpenStack...

  • Yarn based architecture Spark Architecture



Tuning with sparkconf

  • A class which configures and tunes SPark jobs

  • key/value pairs

  • can be coded in the driver and at the command line

    conf = new SParkConf()
    conf.set("spark.executor.memory","1g") // change executor memory to 1 GB
    sc = SParkContext(conf)
    



Machine Learning with Mlib

  • Native Machine Learning Framework

  • CLassification, Regression, Clustering

  • Chi-Squared, Correlation, Summary Stats

  • Automatic ALgorithm Parallelization

  • Pipeline API( train -> test -> eval)

    model = LinearRegressionWithSGD.train(
            data,
            iterations = 100,
            intercept = True)
    

    Spark SQL

  • Native SQL Language

  • Hive, Dataframe, JSON, RDD etc..

  • JDBC Server (Java database connectivity API)

  • UDFs (Universal Disk Formats) (Spark SQL and Hive)

  • Columnar storage, Predicate Pushdowns, Tuning options

  • example:

    hiveCtx = HiveContext(sc)
    allData = hiveCtx.jsonFile(filein)
    allData.registerTempTable("customers")
    
    query1 = hiveCtx.sql(
                "SELECT last, first
                 FROM custormes
                 ORDER BY last
                 LIMIT 50")
    

GraphX

  • Native Graph Processing Framework

  • Similar to Pregel, Giraph and GraphLab

  • Designed for Network-orinetd analytics:

    • Twitter Analysis
    • PageRank
  • 16 times faster than Spark/ 60 times faster than MapReduce

  • Uses standard spark RDD transforms

  • PageRank example:

    graph = GraphLoader.edgeListFile(
                sc, "followers.txt")
    
    ranks = graph.pageRank(_precision_).vertices
    

Spark Streaming

  • Real Time Analytics (similar to apache storm)

  • Shopping Cart Suggestions (Amazon)

  • "Micro-Batch" architecture

  • Windowing

  • Checkpoint for fault tolerance

  • word count example:

    lines = ssc.socketTextStream(host, port)
    words = lines.flatMap(lambda line: line.split(" "))
    
    pairs = words.map(lambda word: (word,1))
    wc = pairs.reduceByKey(lambda x, y:x + y)
    
    wc.pprint()
    
    ssc.start()
    ssc.awaitTermination()
    



Back to Contents