AnalyticsDojo

Introduction to Spark

rpi.analyticsdojo.com

50. Introduction to Spark

Adopted from work by Steve Phelps: https://github.com/phelps-sg/python-bigdata This work is licensed under the Creative Commons Attribution 4.0 International license agreement.

50.1. Reference

   !pip install pyspark
!pip install pyspark
Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
     |████████████████████████████████| 204.2MB 67kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
     |████████████████████████████████| 204kB 38.3MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=1a9e4bc82e720a3f1f694e7fb88a7ef0cf8c63bf2e37e709f1f8768f1695772a
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1

50.2. Overview

  • History

  • Data Structures

  • Using Apache Spark with Python

50.3. History

  • Apache Spark was first released in 2014.

  • It was originally developed by Matei Zaharia as a class project, and later a PhD dissertation, at University of California, Berkeley.

  • In contrast to Hadoop, Apache Spark:

    • is easy to install and configure.

    • provides a much more natural iterative workflow

50.4. Resilient Distributed Datasets (RDD)

  • The fundamental abstraction of Apache Spark is a read-only, parallel, distributed, fault-tolerent collection called a resilient distributed datasets (RDD).

  • When working with Apache Spark we iteratively apply functions to every elelement of these collections in parallel to produce new RDDs.

  • For the most part, you can think/use RDDs like distributed dataframes.

50.5. Resilient Distributed Datasets (RDD)

  • Properties resilient distributed datasets (RDDs):

    • The data is distributed across nodes in a cluster of computers.

    • No data is lost if a single node fails.

    • Data is typically stored in HBase tables, or HDFS files.

    • The map and reduce functions can work in parallel across different keys, or different elements of the collection.

  • The underlying framework (e.g. Hadoop or Apache Spark) allocates data and processing to different nodes, without any intervention from the programmer.

50.6. Word Count Example

  • In this simple example, the input is a set of URLs, each record is a document.


  • Problem: Compute how many times each word has occurred across data set.

50.7. Word Count: Map

The input to \(\operatorname{map}\) is a mapping:

  • Key: URL

  • Value: Contents of document
    \(\left< document1, to \; be \; or \; not \; to \; be \right>\)

  • In this example, our \(\operatorname{map}\) function will process a given URL, and produces a mapping:

  • So our original data-set will be transformed to:

    \(\left< to, 1 \right>\) \(\left< be, 1 \right>\) \(\left< or, 1 \right>\) \(\left< not, 1 \right>\) \(\left< to, 1 \right>\) \(\left< be, 1 \right>\)

50.8. Word Count: Reduce

  • The reduce operation groups values according to their key, and then performs areduce on each key.

  • The collections are partitioned across different storage units, therefore.

  • Map-Reduce will fold the data in such a way that it minimises data-copying across the cluster.

  • Data in different partitions are reduced separately in parallel.

  • The final result is a reduce of the reduced data in each partition.

  • Therefore it is very important that our operator is both commutative and associative.

  • In our case the function is the + operator

    \(\left< be, 2 \right>\)
    \(\left< not, 1 \right>\)
    \(\left< or, 1 \right>\)
    \(\left< to, 2 \right>\)

50.9. Map-Reduce on a Cluster of Computers

  • The code we have written so far will not allow us to exploit parallelism from multiple computers in a cluster.

  • Developing such a framework would be a very large software engineering project.

  • There are existing frameworks we can use:

  • This notebook covers Apache Spark.

50.10. Apache Spark

  • Apache Spark provides an object-oriented library for processing data on the cluster.

  • It provides objects which represent resilient distributed datasets (RDDs).

  • RDDs behave a bit like Python collections (e.g. lists).

  • However:

    • the underlying data is distributed across the nodes in the cluster, and

    • the collections are immutable.

50.11. Apache Spark and Map-Reduce

  • We process the data by using higher-order functions to map RDDs onto new RDDs.

  • Each instance of an RDD has at least two methods corresponding to the Map-Reduce workflow:

    • map

    • reduceByKey

  • These methods work in the same way as the corresponding functions we defined earlier to work with the standard Python collections.

  • There are also additional RDD methods in the Apache Spark API including ones for SQL.

50.12. Word-count in Apache Spark

words = "to be or not to be".split()
words
['to', 'be', 'or', 'not', 'to', 'be']

50.12.1. The SparkContext class

  • When working with Apache Spark we invoke methods on an object which is an instance of the pyspark.context.SparkContext context.

  • Typically, (such as when running on DataBricks) an instance of this object will be created automatically for you and assigned to the variable sc.

  • The parallelize method in SparkContext can be used to turn any ordinary Python collection into an RDD;

#Don't Execute this on Databricks
#To be used if executing via docker
import pyspark
sc = pyspark.SparkContext('local[*]')
words_rdd = sc.parallelize(words)
words_rdd
ParallelCollectionRDD[1] at readRDDFromFile at PythonRDD.scala:262

50.12.2. Mapping an RDD

  • Now when we invoke the map or reduceByKey methods on my_rdd we can set up a parallel processing computation across the cluster.

word_tuples_rdd = words_rdd.map(lambda x: (x, 1))
word_tuples_rdd
PythonRDD[2] at RDD at PythonRDD.scala:53

50.12.3. Collecting the RDD

  • Notice that we do not have a result yet.

  • The computation is not performed until we request the final result to be collected.

  • We do this by invoking the collect() method.

  • Be careful with the collect method, as all data you are collecting must fit in memory.

  • The take method is similar to collect, but only returns the first \(n\) elements.

word_tuples_rdd.collect()
[('to', 1), ('be', 1), ('or', 1), ('not', 1), ('to', 1), ('be', 1)]
word_tuples_rdd.take(4)
[('to', 1), ('be', 1), ('or', 1), ('not', 1)]

50.12.4. Reducing an RDD

  • However, we require additional processing to reduce the data using the word key.

word_counts_rdd = word_tuples_rdd.reduceByKey(lambda x, y: x + y)
word_counts_rdd
PythonRDD[9] at RDD at PythonRDD.scala:53
  • Now we request the final result:

word_counts = word_counts_rdd.collect()
word_counts
[('to', 2), ('be', 2), ('or', 1), ('not', 1)]

50.12.5. Lazy evaluation

  • It is only when we invoke collect() that the processing is performed on the cluster.

  • Invoking collect() will cause both the map and reduceByKey operations to be performed.

  • If the resulting collection is very large then this can be an expensive operation.

word_counts_rdd.take(2)
[('to', 2), ('be', 2)]

50.12.6. Connecting MapReduce in Single Command

  • Can string together map and reduce commands.

  • Not executed until it is collected.

text = "to be or not to be".split()
rdd = sc.parallelize(text)
counts = rdd.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)
counts.collect()
[('to', 2), ('be', 2), ('or', 1), ('not', 1)]

50.13. Additional RDD transformations

  • Apache Spark offers many more methods for operating on collections of tuples over and above the standard Map-Reduce framework:

50.14. Creating an RDD from a text file

  • In the previous example, we created an RDD from a Python collection.

  • This is not typically how we would work with big data.

  • More commonly we would create an RDD corresponding to data in an HBase table, or an HDFS file.

  • The following example creates an RDD from a text file on the native filesystem (ext4);

    • With bigger data, you would use an HDFS file, but the principle is the same.

  • Each element of the RDD corresponds to a single line of text.

genome = sc.textFile('../input/iris.csv')

50.15. Calculating \(\pi\) using Spark

  • We can estimate an approximate value for \(\pi\) using the following Monte-Carlo method:

  1. Inscribe a circle in a square

  2. Randomly generate points in the square

  3. Determine the number of points in the square that are also in the circle

  4. Let \(r\) be the number of points in the circle divided by the number of points in the square, then \(\pi \approx 4 r\).

  • Note that the more points generated, the better the approximation

See this tutorial.

import numpy as np

def sample(p):
    #here x,y are the x,y coordinate
    x, y = np.random.random(), np.random.random()
    #Because the circle is of 
    return 1 if x*x + y*y < 1 else 0

NUM_SAMPLES = 1000000

count = sc.parallelize(range(0, NUM_SAMPLES)).map(sample) \
             .reduce(lambda a, b: a + b)
#Area  = 4*PI*r
r = float(count) / float(NUM_SAMPLES)
r
print ("Pi is approximately %f" % (4.0 * r))
Pi is approximately 3.138136