Spark
Short-comings of Map Reduce
-
Force the data analysis workflow into map and reduce functions, i.e. join, map-reduce-map etc.
-
Relies on reading data from disk for each MapReduce Job.
-
Only native JAVA programming interface
Solution
-
in-memory cashing of data, specified by the user
-
Native Python, Sacala, R interfaces
Architecture of Spark
-
Spark Architecture Diagrams:
-
Start pyspark
PYSPARK_DRIVER_PYTHON=ipython pyspark
Resilient Distributed Dataset
-
Dataset -> Data storage created from HDFS, S3, HBase, JSON, Text, Local hierarchy of folders, or created transforming another RDD
-
Distributed -> Distributed across cluster of machines; Divided in partitions, atomic chunks of data
-
Resilient -> Recover from errors, e.g. node failures, slow processes; Track histrory of each partition, re-run
integer_RDD = sc.parallelize(range(10), 3)
#3 -> number of partitions
integer_RDD.collect()
integer_RDD.glom().collect() #to know how data is partitioned
- Read Text into spark
#from local filesystem
text_RDD = sc.textFile("file:///home/cloudera/testfile1")
#from HDFS
text_RDD = dc.textFile("/user/cloudera/input/testfile1")
- Wordcount in Spark
def split_words(line):
return line.split()
def create_pair(word):
return (word,1)
pairs_RDD=text_RDD.flatMap(split_words).map(create_pair)
pairs_RDD.collect()
def sum_counts(a,b):
return a+b
wordcounts_RDD = pairs_RDD.reduceByKey(sum_counts)
wordcounts_RDD.collect()
Transformation
- RDD are immutable
- Never modify RDD in place
- Transform RDD to another RDD
-
Lazy --> When transformation actually happens in the final step, in intermediate step nothing actually happens
text_RDD - sc.textFile("file:///home/cloudera/testfile1") def lower(line): return line.lower() lower_text_RDD = text_RDD.map(lower)
-
map: apply function to each element of RDD
- comutations are all local on each node
- other transformations:
- flatMap(func) - map then flatten out output
- filter(func) - keep only elements where func is true
- sample(withReplacement,fraction, seed ) - get a random data fraction
- coalesce(numPartitions) - to reduce them to numPartition
-
flatMap : any number of output per input def split_words(line): return line.split()
words_RDD = text_RDD.flatMap(split_words) words_RDD.collect()
-
filter : def starts_with_a(word): return word.lower().startswith("a")
words_RDD.filter(starts_with_a).collect
- coalesce sc.parallelize(range(10),4).glom().collect() sc.parallelize(range(10),4).coalesce().glom().collect()
-
Transformations of (K,V) pairs def create_pair(word): return (word,1)
pairs_RDD - text_RDD.flatMap(split_words).map(create_pair) pairs_RDD.collect() pairs_RDD.groupByKey().collect()
-
groupByKey : (K,V) pairs => (K, iterable of all V) for k,v in pairs_RDD.groupByKey().collect(): print "Key : ", k, ", Values : ", list(v)
-
groupByKey is wide transformation unlike other discussed above which were narrow operations
-
wide transformations require som data to be transferred from one node to another while in narrow transformations all operations occur locally
-
reduceByKey(func) : (K,V) pairs => (K, result of reduction by func on all V)
-
repartition(numPartitions) : similar to coalesce, shuffles all data to increase or descrese number of partitions to numPartitions
Directed Acyclic graph schedular
-
keeps dependencies
-
tracks lineage
-
recovers data by tracing back the execution flow and re-executing required jobs
-
As we write the tasks, spark makes a DAC and when we write an action, it runs all the tasks in the graph in a given order
-
Actions:
- collect() - copy all elements to the driver
- take(n) - copy first n elements
- reduce(func) - aggregate elements with func
- saveAsTextFile(filename) - save to local file or in HDFS
Shared Variables
-
Shared Variables -
- Broadcast Variables
- Accumulators
-
BroadCast Variables:
- Large variables used in all nodes
- Transfer just once per executor
-
Efficient peer-to-peer transfer
config = sc.broadcast({"order":3,"filter":True}) config.value
-
Accumulator
- Common pattern of accumulating to a variable across the cluster
-
Write only nodes
accum = ac.accumulator(0)
def test_accum(x): accum.add(x)
sc.parallelize([1,2,3,4]).foreach(test_accum)
accum.value
Dataframes
-
new API released in February 2015
-
Similar to table in mySQL or a Data Frame in R
-
Python DataFrames are up to 5 times faster than python RDDs
-
Support for: Hive, HDFS, MySQL, PostgreSQL, JSON, S3, Parquet, OpenStack...
-
Yarn based architecture
Tuning with sparkconf
-
A class which configures and tunes SPark jobs
-
key/value pairs
-
can be coded in the driver and at the command line
conf = new SParkConf() conf.set("spark.executor.memory","1g") // change executor memory to 1 GB sc = SParkContext(conf)
Machine Learning with Mlib
-
Native Machine Learning Framework
-
CLassification, Regression, Clustering
-
Chi-Squared, Correlation, Summary Stats
-
Automatic ALgorithm Parallelization
-
Pipeline API( train -> test -> eval)
model = LinearRegressionWithSGD.train( data, iterations = 100, intercept = True)
Spark SQL
-
Native SQL Language
-
Hive, Dataframe, JSON, RDD etc..
-
JDBC Server (Java database connectivity API)
-
UDFs (Universal Disk Formats) (Spark SQL and Hive)
-
Columnar storage, Predicate Pushdowns, Tuning options
-
example:
hiveCtx = HiveContext(sc) allData = hiveCtx.jsonFile(filein) allData.registerTempTable("customers") query1 = hiveCtx.sql( "SELECT last, first FROM custormes ORDER BY last LIMIT 50")
GraphX
-
Native Graph Processing Framework
-
Similar to Pregel, Giraph and GraphLab
-
Designed for Network-orinetd analytics:
- Twitter Analysis
- PageRank
-
16 times faster than Spark/ 60 times faster than MapReduce
-
Uses standard spark RDD transforms
-
PageRank example:
graph = GraphLoader.edgeListFile( sc, "followers.txt") ranks = graph.pageRank(_precision_).vertices
Spark Streaming
-
Real Time Analytics (similar to apache storm)
-
Shopping Cart Suggestions (Amazon)
-
"Micro-Batch" architecture
-
Windowing
-
Checkpoint for fault tolerance
-
word count example:
lines = ssc.socketTextStream(host, port) words = lines.flatMap(lambda line: line.split(" ")) pairs = words.map(lambda word: (word,1)) wc = pairs.reduceByKey(lambda x, y:x + y) wc.pprint() ssc.start() ssc.awaitTermination()