Scala And The JVM For Big Data: Lessons From Spark

3y ago
35 Views
2 Downloads
7.94 MB
100 Pages
Last View : 1m ago
Last Download : 3m ago
Upload by : Jamie Paz
Transcription

Scala and the JVM for Big Data:Lessons from htbend.com@deanwampler1 Dean Wampler 2014-2019, All Rights Reserved

Spark2

A DistributedComputing Engineon the JVM3

ClusterNodeNodeNodeRDDPartition 1Partition 1Partition 1Resilient DistributedDatasets4

Productivity?Very concise, elegant, functional APIs. Scala, Java Python, R . and SQL!5

Productivity?Interactive shell (REPL) Scala, Python, R, and SQL6

Notebooks Jupyter Spark Notebook Zeppelin Beaker Databricks7

8

Example:Inverted Index9

Web Crawlwikipedia.org/hadoopindexHadoop providesMapReduce and HDFSblock.wikipedia.org/hbaseHBase stores data in HDFS.wikipedia.org/hadoopHadoop provides.block.wikipedia.org/hbaseHBase stores.10

lCompute Inverted Indexindexinverse indexblockblock.wikipedia.org/hadoopHadoop .block.wikipedia.org/hbaseHBase stores.Miracle!!block.block.wikipedia.org/hiveHive queries.block.block.11

nverted Indexracle!!inverse ./hive,1).12

import org.apache.spark.SparkContextimport org.apache.spark.SparkContext.val sparkContext new SparkContext(master, “Inv. ap { line val array line.split(",", 2)(array(0), array(1)) // (id, content)}.flatMap {case (id, content) toWords(content).map(word ((word,id),1)) // toWords not shown}.reduceByKey( ).map {case ((word,id),n) (word,(id,n))}.groupByKey.mapValues {seq sortByCount(seq) // Sort the value seq by count, desc.}.saveAsTextFile("/path/to/output")13

import org.apache.spark.SparkContextimport org.apache.spark.SparkContext.val sparkContext newSparkContext(master, “Inv. ap { line val array line.split(",", 2)(array(0), array(1))}.flatMap {case (id, contents) toWords(contents).map(w ((w,id),1))14

import org.apache.spark.SparkContextimport org.apache.spark.SparkContext.val sparkContext newRDD[String]: ./hadoop, Hadoop provides.SparkContext(master, “Inv. ap { line val array line.split(",", 2)(array(0), array(1))}.flatMap se (id, contents) toWords(contents).map(w ((w,id),1))15

val array line.split(",", 2)(array(0), array(1))}.flatMap {case (id, contents) toWords(contents).map(w ((w,id),1))}.reduceByKey( ).map {RDD[((String,String),Int)]: ((Hadoop,./hadoop),20)case ((word,id),n) (word,(id,n))}.groupByKey.mapValues {seq )16

val array line.split(",", 2)(array(0), array(1))}.flatMap {case (id, contents) toWords(contents).map(w ((w,id),1))}.reduceByKey( ).map {case ((word,id),n) (word,(id,n))}.groupByKey.mapValues {RDD[(String,Iterable((String,Int))]: (Hadoop,seq(./hadoop,20),.))seq )17

val array line.split(",", 2)(array(0), array(1))}.flatMap {case (id, contents) toWords(contents).map(w ((w,id),1))}.reduceByKey( ).map {case ((word,id),n) (word,(id,n))RDD[(String,Iterable((String,Int))]: (Hadoop,seq(./hadoop,20),.))}.groupByKey.mapValues {seq )18

Productivity?textFilemapIntuitive API: Dataflow of steps. Inspired by Scala collectionsand functional AsTextFile19

Performance?textFilemapLazy API: Combines steps into “stages”. Cache intermediate data extFile20

21

Higher-LevelAPIs22

SQL:Datasets/DataFrames23

import org.apache.spark.SparkSessionval spark eries").getOrCreate()Exampleval flights spark.read.parquet("./flights")val planes aceTempView("flights")planes. createOrReplaceTempView("planes")flights.cache(); planes.cache()val planes for flights1 sqlContext.sql("""SELECT * FROM flights fJOIN planes p ON f.tailNum p.tailNum LIMIT 100""")val planes for flights2 flights.join(planes,flights("tailNum") planes ("tailNum")).limit(100)24

import org.apache.spark.SparkSessionval spark eries").getOrCreate()val flights spark.read.parquet("./flights")val planes aceTempView("flights")planes. createOrReplaceTempView("planes")flights.cache(); planes.cache()25

import org.apache.spark.SparkSessionval spark eries").getOrCreate()val flights spark.read.parquet("./flights")val planes aceTempView("flights")planes. createOrReplaceTempView("planes")flights.cache(); planes.cache()26

planes. createOrReplaceTempView("planes")flights.cache(); planes.cache()val planes for flights1 sqlContext.sql("""SELECT * FROM flights fJOIN planes p ON f.tailNum p.tailNumLIMIT 100""")Returns anotherDataset.val planes for flights2 flights.join(planes,flights("tailNum") planes ("tailNum")).limit(100)27

planes. createOrReplaceTempView("planes")flights.cache(); planes.cache()val planes for flights1 sqlContext.sql("""SELECT * FROM flights fJOIN planes p ON f.tailNum p.tailNumLIMIT 100""")Returns anotherDataset.val planes for flights2 flights.join(planes,flights("tailNum") planes ("tailNum")).limit(100)28

val planes for flights2 flights.join(planes,flights("tailNum") planes ("tailNum")).limit(100)Not an “arbitrary”anonymous funcRon, but a“Column” instance.29

PerformanceThe Dataset API has thesame performance for alllanguages:Scala, Java,Python, R,and SQL!30

in(right: Dataset[ ], joinExprs: Column): DataFrame {groupBy(cols: Column*): RelationalGroupedDataset {orderBy(sortExprs: Column*): Dataset[T] {select(cols: Column*): Dataset[.] {where(condition: Column): Dataset[T] {limit(n: Int): Dataset[T] {intersect(other: Dataset[T]): Dataset[T] {sample(withReplacement: Boolean, fraction, seed) {drop(col: Column): DataFrame {map[U](f: T U): Dataset[U] {flatMap[U](f: T Traversable[U]): Dataset[U] {foreach(f: T Unit): Unit {take(n: Int): Array[Row] {count(): Long {distinct(): Dataset[T] {agg(exprs: Map[String, String]): DataFrame {31

32

StructuredStreaming33

Time 1 RDDTime 2 RDDTime 3 RDDTime 4 RDD EventEventEventEvent EventDStream (discretized stream) Window of 3 RDD Batches #1Window of 3 RDD Batches #234

ML/MLlib

K-Means Machine Learning requires: Iterative training of models. Good linear algebra perf.

GraphX

PageRank Graph algorithms require: Incremental traversal. Efficient edge and node reps.

Foundation:The JVM39

20 Years ofDevOpsLots of Java Devs40

Tools and LibrariesAkkaBreezeAlgebirdSpire & CatsAxle.41

Big Data Ecosystem42

But it’snot perfect.43

Richer data libs.in Python & R44

GarbageCollection45

GC Challenges Typical Spark heaps: 10s-100s GB. Uncommon for “generic”, non-dataservices.46

GC Challenges Too many cached RDDs leads to hugeold generation garbage. Billions of objects long GC pauses.47

Tuning GC Best for Spark: -XX:UseG1GC -XX:-ResizePLAB Xms. -Xmx. XX:InitiatingHeapOccupancyPercent . -XX:ConcGCThread e-collection-for-sparkapplications.html48

JVM Object Model49

Java Objects? “abcd”: 4 bytes for raw UTF8, right? 48 bytes for the Java object: 12 byte header. 8 bytes for hash code. 20 bytes for array overhead. 8 bytes for UTF16 chars.50

val myArray: �Arrays“zeroth”51

val person: Personname: Stringage: Int29“Buck Trends”addr: Address Class Instances52

Hash Maph/c1h/c2keyvalueh/c3h/c4 “a value”“a key”Hash Maps53

Improving PerformanceWhy obsess about this?Spark jobs are CPU bound: Improve network I/O? 2% better. Improve disk I/O? 20% better.54

What changed? Faster HW (compared to 2000) 10Gbs networks SSDs.55

What changed? Smarter use of I/O Pruning unneeded data sooner. Caching more effectively. Efficient formats, like Parquet.56

What changed? But more CPU use today: More Serialization. More Compression. More Hashing (joins, group-bys).57

Improving PerformanceTo improve performance, we need tofocus on the CPU, the: Better algorithms, sure. And optimize use of memory.58

Project TungstenInitiative to greatly improveDataset/DataFrame performance.59

Goals60

Reduce Referencesval myArray: Array[String]val person: Person0123“second”name: Stringage: Int29“Buck Trends”“first”addr: Address“third” “zeroth”Hash Maph/c1h/c2keyvalueh/c3h/c4 “a value”“a key”61

Reduce References Fewer, bigger objects to GC. Fewer cache missesval myArray: Array[String]val person: Person0123“second”name: Stringage: Int29“Buck Trends”“first”addr: Address“third” “zeroth”Hash Maph/c1h/c2keyvalueh/c3h/c4 “a value”“a key”62

Less Expression Overheadsql("SELECT a b FROM table") Evaluating expressions billions oftimes: Virtual function calls. Boxing/unboxing. Branching (if statements, etc.)63

Implementation64

Object EncodingNew CompactRow type:null bit set (1bit/field)values (8bytes/field)variable lengthoffset to var. len. data Compute hashCode and equals onraw bytes.65

val person: Person Compare:name: Stringage: Int29“Buck Trends”addr: Address null bit set (1bit/field)values (8bytes/field)variable lengthoffset to var. len. data66

BytesToBytesMap:h/c1h/c2h/c3h/c4Tungsten Memory Pagek1k3v1k2v3k4v2v4 67

Hash Maph/c1keyh/c2 Compareh/c3h/c2h/c3h/c4 “a value”h/c4h/c1value“a key”Tungsten Memory Pagek1k3v1k2v3k4v2v4 68

Memory Management Some allocations off heap. sun.misc.Unsafe.69

Less Expression Overheadsql("SELECT a b FROM table") Solution: Generate custom byte code. Spark 1.X - for subexpressions.70

Less Expression Overheadsql("SELECT a b FROM table") Solution: Generate custom byte code. Spark 1.X - for subexpressions. Spark 2.0 - for whole queries.71

72

No Value Types(Planned for Java 9 or 10)73

case class Timestamp(epochMillis: Long) {def toString: String { . }def add(delta: TimeDelta): Timestamp {/* return new shifted time */}.}Don’t allocate on the heap;just push the primiRve longon the stack.(scalac does this now.)74

Long operationsaren’t atomicAccording to theJVM spec75

No Unsigned TypesWhat’sfactorial(-1)?76

Arrays Indexedwith IntsByte Arrayslimited to 2GB!77

scala val N 1100*1000*1000N2: Int 1100000000 // 1.1 billionscala val array Array.fill[Short](N)(0)array: Array[Short] Array(0, 0, .)scala importorg.apache.spark.util.SizeEstimatorscala SizeEstimator.estimate(array)res3: Long 2200000016 // 2.2GB78

scala val b t]] .scala SizeEstimator.estimate(b)res0: Long 2368scala sc.parallelize(0 until 100000). map(i b.value(i))79

scala SizeEstimator.estimate(b)res0: Long 2368scala sc.parallelize(0 until 100000). map(i ed array size exceeds VM limitat java.util.Arrays.copyOf(.).80

But wait.I actually liedto you.81

Spark handles largebroadcast variablesby breaking theminto blocks.82

ScalaREPL83

java.lang.OutOfMemoryError:Requested array size exceeds VM limitat java.util.Arrays.copyOf(.).at java.io.ByteArrayOutputStream.write(.).at java.io.ObjectOutputStream.writeObject(.)at ect(.).at .spark.util.ClosureCleaner .ensureSerializable(.).at org.apache.spark.rdd.RDD.map(.)84

java.lang.OutOfMemoryError:Requested array size exceeds VM limitat java.util.Arrays.copyOf(.).at java.io.ByteArrayOutputStream.write(.).at java.io.ObjectOutputStream.writeObject(.)Pass this closure toat writeObject(.).i b.value(i)at .spark.util.ClosureCleaner .ensureSerializable(.).at org.apache.spark.rdd.RDD.map(.)85

java.lang.OutOfMemoryError:Requested array size exceeds VM limitat java.util.Arrays.copyOf(.).at ’s.“clean” (serializable).at java.io.ObjectOutputStream.writeObject(.)at .spark.serializer.JavaSerializationStreami b.value(i).writeObject(.).at .spark.util.ClosureCleaner .ensureSerializable(.).at org.apache.spark.rdd.RDD.map(.)86

java.lang.OutOfMemoryError:Requested array size exceeds VM limitat java.util.Arrays.copyOf(.).at java.io.ByteArrayOutputStream.write(.).at java.io.ObjectOutputStream.writeObject(.)at ect(.).at .spark.util.ClosureCleaner .ensureSerializable(.).which it does by.serializingtoabytearray.at org.apache.spark.rdd.RDD.map(.)87

java.lang.OutOfMemoryError:Requested array size exceeds VM limitat java.util.Arrays.copyOf(.).at java.io.ByteArrayOutputStream.write(.).which requires copying.an array.at java.io.ObjectOutputStream.writeObject(.)at ect(.)What array?.i b.value(i)at .spark.util.ClosureCleaner .ensureSerializable(.).at org.apache.spark.rdd.RDD.map(.)scala val array Array.fill[Short](N)(0).88

Why did thishappen?89

You write:scala scala scala val array Array.fill[Short](N)(0)val b sc.broadcast(array)sc.parallelize(0 until 100000).map(i b.value(i))90

scala scala scala Scala compiles:val array Array.fill[Short](N)(0)val b sc.broadcast(array)sc.parallelize(0 until 100000).map(i b.value(i))class iwC extends Serializable {val array Array.fill[Short](N)(0)val b sc.broadcast(array)class iwC extends Serializable {sc.parallelize(.).map(i b.value(i))}}91

scala scala scala Scala compiles:val array Array.fill[Short](N)(0)val b sc.broadcast(array)sc.parallelize(0 until 100000).map(i b.value(i)). sucks in the whole object!class iwC extends Serializable {val array Array.fill[Short](N)(0)val b sc.broadcast(array)So,thisclosureover“b”.class iwC extends Serializable {sc.parallelize(.).map(i b.value(i))}}92

Lightbend isinvestigatingre-engineeringthe REPL93

Workarounds.94

Transient is often all you need:scala @transient val array Array.fill[Short](N)(0)scala .95

object Data { // Encapsulate in objects!val N 1100*1000*1000val array Array.fill[Short](N)(0)val getB sc.broadcast(array)}object Work {def run(): Unit {val b Data.getB // local ref!val rdd sc.parallelize(.).map(i b.value(i)) // only needs brdd.take(10).foreach(println)}}96

Why Scala?See the longer versionof this talk atpolyglotprogramming.com/talks97

polyglotprogramming.com/talks

bend.com@deanwamplerQuestions?

Bonus MaterialYou can find an extended version of thistalk with more details atpolyglotprogramming.com/talks100

Scala, Java Python, R . and functional programming. reduceByKey flatMap textFile map map groupByKey map saveAsTextFile. 20 reduceByKey flatMap textFile map map groupByKey map saveAsTextFile Performance? Lazy API:

Related Documents:

Silat is a combative art of self-defense and survival rooted from Matay archipelago. It was traced at thé early of Langkasuka Kingdom (2nd century CE) till thé reign of Melaka (Malaysia) Sultanate era (13th century). Silat has now evolved to become part of social culture and tradition with thé appearance of a fine physical and spiritual .

May 02, 2018 · D. Program Evaluation ͟The organization has provided a description of the framework for how each program will be evaluated. The framework should include all the elements below: ͟The evaluation methods are cost-effective for the organization ͟Quantitative and qualitative data is being collected (at Basics tier, data collection must have begun)

̶The leading indicator of employee engagement is based on the quality of the relationship between employee and supervisor Empower your managers! ̶Help them understand the impact on the organization ̶Share important changes, plan options, tasks, and deadlines ̶Provide key messages and talking points ̶Prepare them to answer employee questions

Dr. Sunita Bharatwal** Dr. Pawan Garga*** Abstract Customer satisfaction is derived from thè functionalities and values, a product or Service can provide. The current study aims to segregate thè dimensions of ordine Service quality and gather insights on its impact on web shopping. The trends of purchases have

On an exceptional basis, Member States may request UNESCO to provide thé candidates with access to thé platform so they can complète thé form by themselves. Thèse requests must be addressed to esd rize unesco. or by 15 A ril 2021 UNESCO will provide thé nomineewith accessto thé platform via their émail address.

Getting Started in Scala scala Runs compiled scala code Or without arguments, as an interpreter! scalac - compiles fsc - compiles faster! (uses a background server to minimize startup time) Go to scala-lang.org for downloads/documentation Read Scala: A Scalable Language

Java Virtual Machine (JVM) Java Virtual Machine (JVM) –is a “virtual” computer that resides in the “real” computer as a software process. The JVM gives Java the flexibility of platform independence. The .class files can be run on any OS, once a JVM has been in

Bruksanvisning för bilstereo . Bruksanvisning for bilstereo . Instrukcja obsługi samochodowego odtwarzacza stereo . Operating Instructions for Car Stereo . 610-104 . SV . Bruksanvisning i original