Advanced Spark
Motivation
In this simple example, I am creating some local data, sending it to the cluster, computing on the cluster and then getting back the results to the client. The whole thing can be written in one line.
def bigComputation(n: Int): Int = n + 1 val result = Vector(1,2,3,4). //local data in the client toSparkRDD. //put on the cluster map(n => bigComputation(n)). //execute on the cluster collect //get back the results to the client
Prior to spark, one could not do this example so easily.
The advantage is simple code that can handle larger loads simply by increasing the cluster size.
Installation on a local machine
- This shows the installation from source with scala-2.11 on Ubuntu
./dev/change-scala-version.sh 2.11 ./make-distribution.sh --name custom-spark --tgz -Psparkr -Phadoop-2.4 -Phive -Phive-thriftserver -Pyarn -Dscala-2.11 -DskipTests
The build time is a few hours.
Cheatsheet
More complex examples
toSparkRDD which converts local data to RDD
object MySparkExtensions{ import scala.reflect.ClassTag import org.apache.spark.rdd.RDD class SeqExt[T: ClassTag](v: Seq[T]) { import org.apache.spark.SparkContext def toSparkRDD(implicit globalSc: SparkContext): RDD[T] = { globalSc.parallelize(v) } } implicit def fromSeq[T: ClassTag](v: Seq[T]): SeqExt[T] = new SeqExt(v) } implicit val globalSc = sc import MySparkExtensions._ Vector(1,2,3).toSparkRDD
Data size
Example: put a very large dataset in a Spark RDD
(1 to 200).toSparkRDD.flatMap(x => Seq.range(1, 500000)).cache.count() //non-spark (1 to 100000).flatMap(x => Seq.range(1, 20)).size //unclear why (1 to 200).toSeq.flatMap(x => Seq.range(1, 500000)).size 16/01/15 15:42:04 ERROR Utils: Uncaught exception in thread driver-heartbeater java.lang.OutOfMemoryError: GC overhead limit exceeded
[Stage 4:> (0 + 4) / 4]16/01/15 15:28:47 WARN MemoryStore: Not enough space to cache rdd_10_3 in memory! (computed 102.4 MB so far) 16/01/15 15:28:47 WARN MemoryStore: Not enough space to cache rdd_10_2 in memory! (computed 102.4 MB so far)
/**
* Attempt to put the given block in memory store.
*
* There may not be enough space to fully unroll the iterator in memory, in which case we
* optionally drop the values to disk if
* (1) the block’s storage level specifies useDisk, and
* (2) allowPersistToDisk
is true.
*
* One scenario in which allowPersistToDisk
is false is when the BlockManager reads a block
* back from disk and attempts to cache it in memory. In this case, we should not persist the
* block back on disk again, as it is already in disk store.