Spark Examples
Spark is built around distributed datasets that support types of parallel operations: transformations, which are lazy and yield another distributed dataset (e.g., map, filter, and join), and actions, which force the computation of a dataset and return a result (e.g., count). The following examples show off some of the available operations and features.Text Search
In this example, we search through the error messages in a log file:
val file = spark.textFile( "hdfs://...") val errors = file. filter( line => line.contains("ERROR")) // Count all the errors errors. count() // Count errors mentioning MySQL errors. filter( line => line.contains("MySQL")). count() // Fetch the MySQL errors as an array of strings errors. filter( line => line.contains("MySQL")). collect()
The red code fragments are Scala function literals (closures) that get passed automatically to the cluster. The blue ones are Spark operations.
In-Memory Text Search
Spark can cache datasets in memory to speed up reuse. In the example above, we can load just the error messages in RAM using:
errors. cache()
After the first action that uses errors, later ones will be much faster.
Word Count
In this example, we use a few more transformations to build a dataset of (String, Int) pairs called counts and then save it to a file.
val file = spark.textFile( "hdfs://...") val counts = file. flatMap( line => line.split(" ")) . map( word => (word, 1)) . reduceByKey( _ + _) counts. saveAsTextFile( "hdfs://...")
Estimating Pi
Spark can also be used for compute-intensive tasks. This code estimates π by "throwing darts" at a circle. We pick random points in the unit square ((0, 0) to (1,1)) and see how many fall in the unit circle. The fraction should be π / 4, so we use this to get our estimate.
val count = spark.parallelize(1 to NUM_SAMPLES). map( i => val x = Math.random val y = Math.random if (x*x + y*y < 1) 1.0 else 0.0). reduce( _ + _) println( "Pi is roughly " + 4 * count / NUM_SAMPLES)
Logistic Regression
This is an iterative machine learning algorithm that seeks to find the best hyperplane that separates two sets of points in a multi-dimensional feature space. It can be used to classify messages into spam vs non-spam, for example. Because the algorithm applies the same MapReduce operation repeatedly to the same dataset, it benefits greatly from caching the input data in RAM across iterations.
val points = spark.textFile(...). map(parsePoint). cache() var w = Vector.random(D) // current separating plane for (i <- 1 to ITERATIONS) { val gradient = points. map( p => (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x ). reduce( _ + _) w -= gradient } println( "Final separating plane: " + w)
Note that w gets shipped automatically to the cluster with every map call.
The graph below compares the performance of this Spark program against a Hadoop implementation on 30 GB of data on an 80-core cluster, showing the benefit of in-memory caching: