A Bit More Parallelism - Princeton University Computer Science

3y ago
339 Views
2 Downloads
8.43 MB
54 Pages
Last View : 1m ago
Last Download : 3m ago
Upload by : Mariam Herr
Transcription

A Bit MoreParallelismCOS 326David WalkerPrinceton Universityslides copyright 2013-2015 David Walker and Andrew W. Appelpermission granted to reuse these slides for non-commercial educaDonal purposes

Last Time: Parallel CollecDonsThe parallel sequence abstracDon is powerful: tabulate nth length map split treeview scan– used to implement prefix-sum– clever 2-phase implementaDon– used to implement filters sorDng

PARALLEL COLLECTIONS IN THE"REAL WORLD"

Big DataIf Google wants to index all the web pages (or images or gmailsor google docs or .) in the world, they have a lot of work to do Same with Facebook for all the facebook pages/entries Same with TwiXer Same with Amazon Same with .Many of these tasks come down to map, filter, fold, reduce, scan

Google Map-ReduceGoogle MapReduce (2004): a fault tolerant,massively parallel funcDonal programmingparadigm– based on our friends "map" and "reduce"– Hadoop is the open-source variant– Database people complain that theyhave been doing it for a while . but it was hard to defineFun stats circa 2012:– Big clusters are 4000 nodes– Facebook had 100 PB in Hadoop– TritonSort (UCSD) sorts 900GB/minuteon a 52-node, 800-disk hadoop clusterMapReduce: Simplified Data Processing on Large ClustersJeffrey Dean and Sanjay Ghemawatjeff@google.com, sanjay@google.comGoogle, Inc.AbstractMapReduce is a programming model and an associated implementation for processing and generating largedata sets. Users specify a map function that processes akey/value pair to generate a set of intermediate key/valuepairs, and a reduce function that merges all intermediatevalues associated with the same intermediate key. Manyreal world tasks are expressible in this model, as shownin the paper.Programs written in this functional style are automatically parallelized and executed on a large cluster of commodity machines. The run-time system takes care of thedetails of partitioning the input data, scheduling the program’s execution across a set of machines, handling machine failures, and managing the required inter-machinecommunication. This allows programmers without anyexperience with parallel and distributed systems to easily utilize the resources of a large distributed system.Our implementation of MapReduce runs on a largecluster of commodity machines and is highly scalable:a typical MapReduce computation processes many terabytes of data on thousands of machines. Programmersfind the system easy to use: hundreds of MapReduce programs have been implemented and upwards of one thousand MapReduce jobs are executed on Google’s clustersevery day.1 IntroductionOver the past five years, the authors and many others atGoogle have implemented hundreds of special-purposecomputations that process large amounts of raw data,such as crawled documents, web request logs, etc., tocompute various kinds of derived data, such as invertedindices, various representations of the graph structureof web documents, summaries of the number of pagescrawled per host, the set of most frequent queries in aTo appear in OSDI 2004given day, etc. Most such computations are conceptually straightforward. However, the input data is usuallylarge and the computations have to be distributed acrosshundreds or thousands of machines in order to finish ina reasonable amount of time. The issues of how to parallelize the computation, distribute the data, and handlefailures conspire to obscure the original simple computation with large amounts of complex code to deal withthese issues.As a reaction to this complexity, we designed a newabstraction that allows us to express the simple computations we were trying to perform but hides the messy details of parallelization, fault-tolerance, data distributionand load balancing in a library. Our abstraction is inspired by the map and reduce primitives present in Lispand many other functional languages. We realized thatmost of our computations involved applying a map operation to each logical “record” in our input in order tocompute a set of intermediate key/value pairs, and thenapplying a reduce operation to all the values that sharedthe same key, in order to combine the derived data appropriately. Our use of a functional model with userspecified map and reduce operations allows us to parallelize large computations easily and to use re-executionas the primary mechanism for fault tolerance.The major contributions of this work are a simple andpowerful interface that enables automatic parallelizationand distribution of large-scale computations, combinedwith an implementation of this interface that achieveshigh performance on large clusters of commodity PCs.Section 2 describes the basic programming model andgives several examples. Section 3 describes an implementation of the MapReduce interface tailored towardsour cluster-based computing environment. Section 4 describes several refinements of the programming modelthat we have found useful. Section 5 has performancemeasurements of our implementation for a variety oftasks. Section 6 explores the use of MapReduce withinGoogle including our experiences in using it as the basis1

Data Model & OperaDons Map-reduce operates over collecDons of key-value pairs– millions of files (eg: web pages) drawn from the file system andparsed in parallel by many machines The map-reduce engine is parameterized by 3 funcDons,which roughly speaking do this:map: key1 * value1- (key2 * value2) listcombine : key2 * (value2 list) - value2 optionreduce: key2 * (value2 list) - key3 * (value3 list)opDonal – ocen used to compress data before transfer from a mapper machineto a reducer machine

ArchitectureInput DataLocalStorageLocalStorageOutput DataDistributed duce

Working SetOutput DataInput DataOutput DataInput DataIteraDve Jobs are CommonIterative Jobs are common

The Control PlaneThe control planeInputDataInputDataInputData

Jobs, Tasks and AXempts A single job is split in to many tasks Each task may include many calls to map and reduce Workers are long-running processes that are assigned manytasks MulDple workers may a,empt the same task– each invocaDon of the same task is called an aXempt– the first worker to finish "wins" Why have mulDple machines aXempt the same task?– machines will fail approximately speaking: 5% of high-end disks fail/year if you have 1000 machines: 1 failure per week repeated failures become the common case– machines can parDally fail or be slow for some reason reducers can't start unDl all mappers complete

Flow of InformaDonThe flow of informationHeartbeatsJob config.Tasks to startCompletedOK

A modernstackA ModernsoftwareSocware StackWorkload ManagerHigh-level scripting languageDistributed Execution EngineDistributed luestoreClusterNode

Sort-of FuncDonal Programming in JavaHadoop interfaces:interface Mapper K1,V1,K2,V2 {public void map (K1 key,V1 value,OutputCollector K2,V2 output).}interface Reducer K2,V2,K3,V3 {public void reduce (K2 key,Iterator V2 values,OutputCollector K3,V3 output).}

Word Count in Javaclass WordCountMap implements Map {public void map(DocID keyList String values,OutputCollector String,Integer output){for (String s : values)output.collect(s,1);}}class WordCountReduce {public void reduce(String key,Iterator Integer values,OutputCollector String,Integer output){int count 0;for (int v : values)count 1;output.collect(key, count)}

PLEASE RELAXAND FOR THE SAKE OF HYGIENE,WIPE THEJAVA CODE OFF YOUR BRAIN

ASSIGNMENT #7:IMPLEMENTING AND USINGPARALLEL COLLECTIONS

US Census QueriesEnd goal: develop a system for efficiently compuDng US populaDon queriesby geographic region

Assignment 7inverted indexLibraries rency primitivesHardwareunixprocesses

map-reduce API for Assignment 7tabulate (f: int- ’a) (n: int) : ‘a seqseq of array: ‘a array - ‘a seqarray of seq: ‘a seq - ‘a arrayiter (f: ‘a - unit): ‘a seq - unitlength: ‘a seq - intempty: unit - ‘a seqcons: ‘a - ‘a seq - ‘a seqsingleton: ‘a - ‘a seqappend: ‘a seq - ‘a seq - ‘a seqnth: ‘a seq - int - ‘amap (f: ‘a - ‘b) - ‘a seq - ‘a seqreduce (f: ‘a - ‘a - ‘a) (base: ‘a):‘a seq - ‘amapreduce: (‘a- ’b)- (‘b- ’b- ’b)- ‘b - ‘a seq - ‘bflaXen: ‘a seq seq - ‘a seqrepeat (x: ‘a) (n: int) : ‘a seqzip: (‘a seq * ‘b seq) - (‘a * ‘b) seqsplit: ‘a seq - int - ‘a seq * ‘a seqscan: (‘a- ’a- ’a) - ‘a - ‘a seq - ‘a seqCreate seq of length n, element i holds f(i)Create a sequence from an arrayCreate an array from a sequenceApplying f on each element in order. Useful for debugging.Return the length of the sequenceReturn the empty sequence(nondestrucDvely) cons a new element on the beginningReturn the sequence with a single element(nondestrucDvely) concatenate two sequencesGet the nth value in the sequence. Indexing is zero-based.Map the funcDon f over a sequenceFold a funcDon f over the sequence.f must be associaDve, and base must be the unit for f.ParallelCombine the map and reduce funcDons.ParallelflaXen [[a0;a1]; [a2;a3]] [a0;a1;a2;a3]repeat x 4 [x;x;x;x]zip [a0;a1] [b0;b1;b2] [(a0,b0);(a1,b1)]split [a0;a1;a2;a3] 1 ([a0],[a1;a2;a3])scan f b [a0;a1;a2; ] [f b a0; f (f b a0) a1; f (f (f b a0) a1) a2; .]SequenDalSequenDalSequenDalSequenDalConstant DmeConstant DmeSequenDalConstant DmeConstant DmeSequenDalConstant DmeSequenDalConstant DmeParallelParallelParallel

ProcessescommunicaDon channel (pipe)process 1process 2separateaddress spaces(no shared data)

Need-to-know Info Processes are managed by your operaDng system Share Dme execuDng on available cores Processes have separate address spaces so communicaDonoccurs by:––––serializing data (converDng complex data to a sequence of bits)wriDng data to a bufferreading data out of the buffer on the other sidedeserializing the data Cost is relaDve to the amount of data transferred– minimizing data transfers is an important performanceconsideraDon

Unix (Linux) pipe(), fork(), exec()(Standard Unix, C-language calling sequences)int pipe(int fd[2]);(now can read from file-descriptor fd[0], write to fd[1])int fork(void)(creates a new OS process;in child, returns 0; in parent, returns process id of child.)int execve(char *filename, char *argv[], char *envp[])(overwrite this process with a new execuKon of filename(argv);if execve returns at all, then it must have failed)

Typical use of pipe, fork, execWhat you write at the shell promptcat foo grep abcWhat the shell does (simplified)One learns thisin COS 217int fd[2]; int pid1, pid2;pipe (fd);fd 0 – standard inpid1 fork();fd 1 – standard outif (pid1) { /* in the parent */close(fd[0]); close(1); dup2(fd[1],1); close(fd[1]);exec(“/bin/cat”,“foo”);} else { /* in the child */close(fd[1]); close(0); dup2(fd[0],0); close(fd[0]);exec(“/bin/grep”, “abc”)}

Typical use of pipe, fork, execWhat you write at the shell promptcat foo grep abcWhat the shell does (simplified)One learns thisin COS 217int fd[2]; int pid1, pid2;pipe (fd);pid1 fork();if (pid1) { /* in the parent */pipe is a beauDful funcDonalclose(fd[0]); close(1); dup2(fd[1],1); close(fd[1]);abstracDon, isn't it?exec(“/bin/cat”,“foo”);} else { /* in the child */It hides all this garbage so Idon'tclose(fd[0]);have to think about it!!close(fd[1]); close(0); dup2(fd[0],0);exec(“/bin/grep”, “abc”)}

Processes in OCamlcreate a child process using fork : unit - int– creates two processes; idenDcal except for the return value of fork()parent processlet x fork () instandard use:match fork () with 0 - . child process code . pid - . parent process code .child processlet x fork () incopies of dataare made wheneither parentor child writesto the data

Interprocess CommunicaDon via Pipes A pipe is a first-in, first-out queue Data (a sequence of bytes) may be wriXen on one end of thepipe and read out the other– writes block acer the underlying buffer is filled but not yet read– reads block unDl data appears to be read– bad idea to read and write the same pipe in the same process!pipe CreaDng a pipe:– pipe : unit - file descr * file descr

Futures via Processesfuture interfacetype 'a futureval future : ('a - 'b) - 'a - 'b futureval force : 'a future - 'afuture f x runsf x in a child processresult of f x serializedand sent through a pipeback to the parentchild processfxparent processpipeforce future

Futures via Processesfuture interfacetype 'a futureval future : ('a - 'b) - 'a - 'b futureval force : 'a future - 'atype 'a future {fd : file descr;pid: int}pipe endpoint read by parentprocess id of the child

Futures via Processesfuture interfacetype 'a futureval future : ('a - 'b) - 'a - 'b futureval force : 'a future - 'acreate pipe tolet future (f: 'a - 'b) (x: 'a) : 'b future communicatelet (fin, fout) pipe () inmatch fork () with 0 - (close fin;let oc out channel of descr fout inMarshal.to channel oc (f x) [Marshal.Closures];Pervasives.exit 0 ) cid - (close fout;{fd fin; pid cid} )type 'a future {fd : file descr;pid: int}

Futures via Processesfuture interfacetype 'a futureval future : ('a - 'b) - 'a - 'b futureval force : 'a future - 'alet future (f: 'a - 'b) (x: 'a) : 'b future fork childlet (fin, fout) pipe () inmatch fork () with 0 - (close fin;let oc out channel of descr fout inMarshal.to channel oc (f x) [Marshal.Closures];Pervasives.exit 0 ) cid - (close fout;{fd fin; pid cid} )type 'a future {fd : file descr;pid: int}

Futures via Processesfuture interfacetype 'a futureval future : ('a - 'b) - 'a - 'b futureval force : 'a future - 'atype 'a future {fd : file descr;pid: int}child uses thelet future (f: 'a - 'b) (x: 'a) : 'b future output (fout)let (fin, fout) pipe () inand closes thematch fork () withinput (fin) 0 - (close fin;let oc out channel of descr fout inMarshal.to channel oc (f x) [Marshal.Closures];parent uses thePervasives.exit 0 )input (fin) and cid - (closes the outputclose fout;(fout){fd fin; pid cid} )

Futures via Processesfuture interfacetype 'a futureval future : ('a - 'b) - 'a - 'b futureval force : 'a future - 'atype 'a future {fd : file descr;pid: int}let future (f: 'a - 'b) (x: 'a) : 'b future let (fin, fout) pipe () inmatch fork () with 0 - (parent completesclose fin;rouDnelet oc out channel of descr fout inimmediately;Marshal.to channel oc (f x) [Marshal.Closures];keeping thePervasives.exit 0 )future data cid - (structure aroundclose fout;to force later{fd fin; pid cid} )

Futures via Processesfuture interfacetype 'a futureval future : ('a - 'b) - 'a - 'b futureval force : 'a future - 'atype 'a future {fd : file descr;pid: int}let future (f: 'a - 'b) (x: 'a) : 'b future child executeslet (fin, fout) pipe () inthe futurematch fork () withfuncDon 0 - (close fin;let oc out channel of descr fout inMarshal.to channel oc (f x) [Marshal.Closures];Pervasives.exit 0 ) cid - (close fout;{fd fin; pid cid} )

Futures via Processesfuture interfacetype 'a futureval future : ('a - 'b) - 'a - 'b futureval force : 'a future - 'atype 'a future {fd : file descr;pid: int}then marshallslet future (f: 'a - 'b) (x: 'a) : 'b future the results,let (fin, fout) pipe () insending themmatch fork () withover the pipe 0 - (close fin;let oc out channel of descr fout inMarshal.to channel oc (f x) [Marshal.Closures];Pervasives.exit 0 ). and then cid - (terminates, itsclose fout;job complete{fd fin; pid cid} )

Marshalling / caml/libref/Marshal.htmlmodule Marshal: sig . endModule MarshalIn Java thisis called“serialize”Marshaling of data structures.This module provides functions to encode arbitrary data structures as sequences of bytes, whichcan then be written on a file or sent over a pipe or network connection. The bytes can then beread back later, possibly in another process, and decoded back into a data structure. The formatfor the byte sequences is compatible across all machines for a given version of OCaml.Warning: marshaling is currently not type-safe. The type of marshaled data is not transmittedalong the value of the data, making it impossible to check that the data read back possesses thetype expected by the context. In particular, the result type of the Marshal.from * functions isgiven as 'a, but this is misleading: the returned OCaml value does not possess type 'a for all 'a; ithas one, unique type which cannot be determined at compile-type. The programmer shouldexplicitly give the expected type of the returned value, using the following syntax: (Marshal.from channel chan : type).Anything can happen at run-time if the object in the file does not belong to the given type.

Futures via Processesfuture interfacetype 'a futureval future : ('a - 'b) - 'a - 'b futureval force : 'a future - 'alet force (f: 'a future) : 'a let ic in channel of descr f.fd inlet res ((Marshal.from channel ic) : 'a) inclose f.fd;match waitpid [] f.pid with ( ,WEXITED 0) - res - failwith "process failed to terminate in force"type 'a future {fd : file descr;pid: int}reads the datafrom thefuture's pipecloses the filedescriptor

Futures via Processesfuture interfacetype 'a futureval future : ('a - 'b) - 'a - 'b futureval force : 'a future - 'atype 'a future {fd : file descr;pid: int}let force (f: 'a future) : 'a let ic in channel of descr f.fd inlet res ((Marshal.from channel ic) : 'a) inclose f.fd;match waitpid [] f.pid with ( ,WEXITED 0) - res - failwith "process failed to terminate in force"wait unDl childterminates; prevents"fork bomb" (othertechniques could be usedhere)

Costs of “fork” Futures enable a rather simple communicaDon paXern:parentworker But the cost of starDng up a process and communicaDng databack and forth is highUnix “fork” system call copies the enDre address space into thechild process. That includes all the closures and heap datastructures in your enDre program! OperaDng system does it lazily, using virtual-memory paging. That means this paXern: if (fork()) {parent } else {exec();}does not pay a price, does no copyingBut the paXern on the previous slides has no “exec();” call.

Another problem with “fork”let future (f: 'a - 'b) (x: 'a) : 'b future let (fin, fout) pipe () inmatch fork () with 0 - (close fin;let oc out channel of descr fout inMarshal.to channel oc (f x) [Marshal.Closures];Pervasives.exit 0 ) cid - (close fout; {fd fin; pid cid} )Parent process and child process must share memory! This is possible on two different cores of the same mulDcore chipSomeDmes possible with two chips on the same circuit board.Not scalable to massive parallelism in the data center!

Message Passing Futures enable a rather simple communicaDon paXern:parentworkerBut the cost of starDng up a process and communicaDng databack and forth is high Instead: spawn 1 worker and have it do many tasks– (the implementaDon of futures could be opDmized to reuse 1process for many futures)parentworker

Message PassingAlso: when creaDng the worker (with “fork”),don’t send data at the same Dme! No need toshare memory; the “fork” can be remote onanother machine (in the data center). Instead: spawn 1 worker and have it do many tasks– (the implementaDon of futures could be opDmized to reuse 1process for many futures)parentworker

History: Shared Memory vs. Message-PassingIn 1968 and 1973, Dijkstraand Hoare described theprinciples of shared-memorycompuDng with semaphores(locks, mutual exclusion).In 1978, a new paradigm,“CommunicaDng SequenDalProcesses”, was introduced.CSP uses synchronous channelswith no shared memory. Nicerthan that Dijkstra-Hoare sharedmemory stuff.CSP was invented byEdsger W. Dijkstra1930 - 2001C. Antony R. Hoare1934 -based onideas from

Commu

Fun stats circa 2012: – Big clusters are 4000 nodes – Facebook had 100 PB in Hadoop – TritonSort (UCSD) sorts 900GB/minute . 1 Introduction Overthepastfiveyears,theauthorsandmanyothersat . Concurrency primitives Library Library Library Library Library Library Applications Libraries build layered

Related Documents:

Parallelism within the Gradient Computation Try to compute the gradient samples themselvesin parallel Problems: We run this so many times, we will need to synchronize a lot Typical place to use: instruction level parallelism, SIMD parallelism And distributed parallelism when using model/pipeline parallelism x t 1 x t rf (x t .

Query Parallelism and the Explain Facility TBSCAN, IXSCAN (row-organized processing only) Optimizer determines these options for row-organized parallelism Determined at runtime for column-organized parallelism SCANGRAN (n): (Intra-Partition Parallelism Scan Granularity) SCANTYPE: (Intra-Partition Parallelism Scan Type)

GPU parallelism Will Landau A review of GPU parallelism Examples of parallelism Vector addition Pairwise summation Matrix multiplication K-means clustering Markov chain Monte Carlo A review of GPU parallelism The single instruction, multiple data (SIMD) paradigm I SIMD: apply the same command to multiple places in a dataset. for( i 0; i 1e6 .

CS378 TYPES OF PARALLELISM Task parallelism Distributes multiple tasks (jobs) across cores to be performed in parallel Data parallelism Distributes data across cores to have sub-operations performed on that data to facilitate parallelism of a single task Note: Parallelism is frequently accompanied by concurrency (i.e. multiple cores still have multiple threads operating on the data)

Windows XP Professional 32-Bit/64-Bit, Windows Vista Business 32-Bit/64-Bit, Red Hat Enterprise Linux WS v4.0 32-bit/64-bit, Red Hat Enterprise Desktop v5.0 32-bit/64-bit (with Workstation Option), SUSE Linux Enterprise (SLE) desktop and server v10.1 32-bit/64-bit Resources Configuration LUTs

25 Valley Road, Princeton, New Jersey 08540 t 609.806.4204 f 609 .806.4225 October 16, 2013 Honorable President and Members of the Princeton Board of Education Princeton Public Schools County of Mercer Princeton

yDepartment of Electrical Engineering, Princeton University, Princeton, NJ 08544-5263 (jdurante@princeton.edu). zDepartment of Operations Research and Financial Engineering, Princeton University, Princeton, NJ 0854

Princeton Admission Office Box 430 Princeton, NJ 08542-0430 www.princeton.edu Experience Princeton EXPERIENCE PRINCETON 2019-20 Office of Admission . Finance Gender and Sexuality Studies Geological Engineering Global Health and Health Policy Hellenic Studies History and the Practice of Diplomacy