Motivation

In this simple example, I am creating some local data, sending it to the cluster, computing on the cluster and then getting back the results to the client. The whole thing can be written in one line.

def bigComputation(n: Int): Int = n + 1

val result = Vector(1,2,3,4).              //local data in the client
             toSparkRDD.                   //put on the cluster
             map(n => bigComputation(n)).  //execute on the cluster
             collect                       //get back the results to the client

Prior to spark, one could not do this example so easily.

The advantage is simple code that can handle larger loads simply by increasing the cluster size.

Installation on a local machine

  • This shows the installation from source with scala-2.11 on Ubuntu
./dev/change-scala-version.sh 2.11
./make-distribution.sh --name custom-spark --tgz -Psparkr -Phadoop-2.4 -Phive -Phive-thriftserver -Pyarn -Dscala-2.11 -DskipTests

The build time is a few hours.

Cheatsheet

More complex examples

toSparkRDD which converts local data to RDD

object MySparkExtensions{
  import scala.reflect.ClassTag
  import org.apache.spark.rdd.RDD

  class SeqExt[T: ClassTag](v: Seq[T]) {
    import org.apache.spark.SparkContext
    def toSparkRDD(implicit globalSc: SparkContext): RDD[T] = {
      globalSc.parallelize(v)
    }
  }
  implicit def fromSeq[T: ClassTag](v: Seq[T]): SeqExt[T] = new SeqExt(v)
}

implicit val globalSc = sc
import MySparkExtensions._
Vector(1,2,3).toSparkRDD

Data size

Example: put a very large dataset in a Spark RDD

(1 to 200).toSparkRDD.flatMap(x => Seq.range(1, 500000)).cache.count()
//non-spark   (1 to 100000).flatMap(x => Seq.range(1, 20)).size

//unclear why
(1 to 200).toSeq.flatMap(x => Seq.range(1, 500000)).size
16/01/15 15:42:04 ERROR Utils: Uncaught exception in thread driver-heartbeater
java.lang.OutOfMemoryError: GC overhead limit exceeded

[Stage 4:>                                                          (0 + 4) / 4]16/01/15 15:28:47 WARN MemoryStore: Not enough space to cache rdd_10_3 in memory! (computed 102.4 MB so far)
16/01/15 15:28:47 WARN MemoryStore: Not enough space to cache rdd_10_2 in memory! (computed 102.4 MB so far)

/** * Attempt to put the given block in memory store. * * There may not be enough space to fully unroll the iterator in memory, in which case we * optionally drop the values to disk if * (1) the block’s storage level specifies useDisk, and * (2) allowPersistToDisk is true. * * One scenario in which allowPersistToDisk is false is when the BlockManager reads a block * back from disk and attempts to cache it in memory. In this case, we should not persist the * block back on disk again, as it is already in disk store.

submitting continuous jobs to spark (one after another)