MapReduce Frameworks Lab - CSE231 Wiki MapReduce Frameworks Lab From CSE231 Wiki Jump to: navigation, search credit for this assignment: Finn Voichick and Dennis Cosgrove Contents 1 Motivation 2 Background 3 Java Advice 3.1 Parameterized Type Array Tip 3.2 Use map.entrySet() 3.3 Use the appropriate version of forall 4 Mistakes To Avoid 5 Code To Use 5.1 BiConsumer 5.2 Collector 5.3 IndexedRange 5.4 Slices 5.5 HashUtils 5.6 MultiWrapMap 6 A Path To Victory 7 The Core Questions 8 Optional Warm Up 8.1 WordCountConcreteStaticMapReduce 8.1.1 Mapper 8.1.1.1 map (Provided) 8.1.2 Reducer 8.1.2.1 createMutableContainer (Provided) 8.1.2.2 accumulate (Provided) 8.1.2.3 combine (Provided) 8.1.2.4 reduce (Provided) 8.1.3 Framework 8.1.3.1 mapAll 8.1.3.2 accumulateAll 8.1.3.3 finishAll 8.1.4 Testing Your Solution 8.1.4.1 Correctness 8.2 MutualFriendsConcreteStaticMapReduce 8.2.1 Mapper 8.2.1.1 map 8.2.2 Reducer 8.2.2.1 createMutableContainer 8.2.2.2 accumulate 8.2.2.3 combine 8.2.2.4 reduce 8.2.3 Framework 8.2.3.1 mapAll 8.2.3.2 accumulateAll 8.2.3.3 finishAll 8.2.4 Testing Your Solution 8.2.4.1 Correctness 9 Required Lab 9.1 Bottlenecked MapReduce Framework 9.1.1 mapAll 9.1.2 accumulateAll 9.1.3 finishAll 9.2 Matrix MapReduce Framework 9.2.1 mapAndAccumulateAll 9.2.2 combineAndFinishAll 10 Testing Your Solution 10.1 Correctness 10.1.1 Bottlenecked 10.1.1.1 MapAll 10.1.1.2 AccumulateAll 10.1.1.3 FinishAll 10.1.1.4 Holistic 10.1.2 Matrix 10.1.2.1 MapAccumulateAll 10.1.2.2 CombineFinishAll 10.1.2.3 Holistic 11 Rubric Motivation Dealing with big data is Hansel-level hot right now. We will build two implementations to better understand the inner workings of Hadoop and Spark-like frameworks. Your frameworks will actually be more general than just MapReduce. The Matrix implementation gives us experience with dividing the work up into thread confined tasks whose results are then combined together. Background wikipedia article on MapReduce As the name suggests, MapReduce refers to the process of mapping then reducing some stream of data. At its core, all a MapReduce algorithm entails is transforming a list of one kind of item before collecting those items and reducing them down to a single values per key using some computation. As you can probably tell, the concept of MapReduce is extremely general and can apply to a wide berth of problems. In this assignment, we will use MapReduce to simulate the Facebook mutual friends algorithm (finding the mutual friends between all friend pairs) as well as pinpointing the offending well in the 1854 Soho Cholera Outbreak. For more information on the general concept of MapReduce, refer to this article. Java Advice Parameterized Type Array Tip Creating arrays of parameterized types in Java is madness inducing. Some details are available in Java Generics Restrictions. The example below creates an array of List
. The @SuppressWarnings annotation is optional. @SuppressWarnings("unchecked")
List[] result = new List[length];
Use map.entrySet() Prefer the use of map.entrySet() over map.keySet() followed by looking up the value with map.get(key). Use the appropriate version of forall there are many overloaded versions of forall including: range of ints from min to maxExclusive array Iterable choose the correct one for each situation where "correct" is often the one that produces the cleanest code. Mistakes To Avoid Warning: Arrays (and Matrices) are initially filled with null. You must fill them with instances. Warning: Ensure that your Slices studio is conforming to spec. Code To Use To allow our frameworks to work well with JDK8 Streams, we employ a couple of standard interfaces over creating our own custom ones. BiConsumer We use the standard BiConsumer interface with an BiConsumer's accept(t,u) method in the place of a mythical, custom MapContext interface with an emit(key,value) method. public interface BiConsumer {
// invoke accept with each key and value you wish to emit
void accept(T t, U u);
}
Collector read the javadoc for Collector check out the Collector_MapReduce_Studio#Background IndexedRange class IndexedRange This class has everything you need for n-way split problems, specifically: getSliceIndexId() getMinInclusive() getMaxExclusive(). Slices class Slices List> createNSlices(C[] data, int numSlices) HashUtils toIndex(key,N) MultiWrapMap class MultiWrapMap A Path To Victory Int_Sum_MapReduce_Apps_Studio Collector_MapReduce_Studio Mutual_Friends_MapReduce_Application #Optional_Warm_Up #Bottlenecked_MapReduce_Framework Cholera_MapReduce_Application #Matrix_MapReduce_Framework The Core Questions What are the tasks? What is the data? Is the data mutable? If so, how is it shared? Optional Warm Up WordCountConcreteStaticMapReduce class: WordCountConcreteStaticMapReduce.java methods: mapAll accumulateAll finishAll package: mapreduce.framework.warmup.wordcount source folder: src/main/java Mapper map (Provided) static void map(TextSection textSection, BiConsumer keyValuePairConsumer) {
mapper.map(textSection, keyValuePairConsumer);
}
Reducer createMutableContainer (Provided) static List createMutableContainer() {
return collector.supplier().get();
}
accumulate (Provided) static void accumulate(List list, int v) {
collector.accumulator().accept(list, v);
}
combine (Provided) static List combine(List a, List b) {
return collector.combiner().apply(a, b);
}
reduce (Provided) static int reduce(List list) {
return collector.finisher().apply(list);
}
Framework mapAll mapAll can be performed in parallel. A task should be created for each item in the input array. Each task should accept the emitted (key, value) pairs and store them in its own List to avoid data races. These lists make up the array which is returned (one list for each item in the input array). method: List>[] mapAll(TextSection[] input) (parallel implementation required) Warning: When first created arrays of Objects are filled with null. You will need to assign each array index to a new List before you start the process of adding key-value pairs Warning: Reminder: our course libraries consistently specify max to be exclusive. This includes the parallel forall loop. Tip:You are encouraged to utilize the provided map method. accumulateAll method: static Map> accumulateAll(List>[] mapAllResults) (sequential implementation only) Tip:You are encouraged to utilize the provided createMutableContainer and accumulate methods. finishAll method: static Map finishAll(Map> accumulateAllResult) (parallel implementation required) Tip:You are encouraged to utilize the provided reduce method. Testing Your Solution Correctness class: WarmUpWordCountMapReduceTestSuite.java package: mapreduce source folder: src/test/java MutualFriendsConcreteStaticMapReduce class: MutualFriendsConcreteStaticMapReduce.java methods: map reduceCreateList reduceAccumulate reduceCombine reduceFinish mapAll accumulateAll finishAll package: mapreduce.framework.warmup.friends source folder: src/main/java Mapper map method: static void map(Account account, BiConsumer, Set> keyValuePairConsumer) (sequential implementation only) Reducer createMutableContainer method: static List> createMutableContainer() (sequential implementation only) accumulate method: static void accumulate(List> list, Set v) (sequential implementation only) combine method: static void combine(List> a, List> b) (sequential implementation only) reduce method: static AccountIdMutableContainer reduce(List> list) (sequential implementation only) Framework mapAll method: static List, Set>>[] mapAll(Account[] input) (parallel implementation required) Warning: When first created arrays of Objects are filled with null. You will need to assigned each array index to a new List before you start the process of adding key-value pairs Warning: Reminder: our course libraries consistently specify max to be exclusive. This includes the parallel forall loop. Tip:You are encouraged to utilize the map method you implemented. accumulateAll method: static Map, List>> accumulateAll(List, Set>>[] mapAllResults) (sequential implementation only) Tip:You are encouraged to utilize the reduceCreateList and reduceAccumulate methods you implemented. finishAll method: static Map, MutualFriendIds> finishAll(Map, List>> accumulateAllResult) (parallel implementation required) Tip:You are encouraged to utilize the reduceFinish method you implemented. Testing Your Solution Correctness class: WarmUpMutualFriendsMapReduceTestSuite.java package: mapreduce source folder: src/test/java Required Lab Bottlenecked MapReduce Framework class: BottleneckedMapReduceFramework.java methods: mapAll accumulateAll finishAll package: mapreduce.framework.lab.bottlenecked source folder: src/main/java Navigate to the BottleneckedMapReduceFramework.java class and there will be three methods for you to complete: mapAll, accumulateAll, and finishAll. These frameworks are meant to be extremely general and applied to more specific uses of MapReduce. Whereas the warm ups for this lab serve to prepare you to build this required section of the lab, this bottlenecked framework is in many ways a warm up for the matrix implementation. mapAll NOTE: If you struggle to get through this method, you are strongly encouraged to try the warm-ups. method: List>[] mapAll(E[] input) (parallel implementation required) Warning: When first created arrays of Objects are filled with null. You will need to assigned each array index to a new List before you start the process of adding key-value pairs Warning: Reminder: our course libraries consistently specify max to be exclusive. This includes the parallel forall loop. With this method, you will map all of the elements of an array of data into a new array of equivalent size consisting of Lists of key value pairs. We will leverage the Mapper which is a field/instance variable on this BottleneckedFramework instance. When invoking the mapper's map method with an element of the input array and a BiConsumer which will accept each key and value passed to it, adding a KeyValuePair to its List. This list should then be added to the array of lists you previously defined, therefore completing the mapping stage of MapReduce. This should all be done in parallel. Hint: you should create an array of lists equivalent in size to the original array. Each list will contain all of the emitted (key,value) pairs for its item. slide accumulateAll method: Map accumulateAll(List>[] mapAllResults) (sequential implementation only) This middle step is often excluded in more advanced MapReduce applications. When run in parallel, it is the only step of the framework that must be completed sequentially. In the matrix framework implementation, we will do away with this step altogether for the sake of performance. In this method, you will take in the array of lists you previously created and accumulate the key value pairs in the lists into a newly defined map. To help deal with this issue, you must make use of the Collector provided to you. More specifically, access the accumulator in the collector by calling the accumulator() method and accept the key/value pair when you add it to the map. You probably noticed that the method must return a map of , which differs from the generics fed into the method. The framework is designed this way as the data originally fed into the mapping stage can be collected into a mutable container before reaching the finish/reduce stage. In order to access the correct value for the map if the key has no associated value yet, use the supplier associated with the Collector with the supplier() method. slide finishAll method: Map finishAll(Map accumulateAllResult) (parallel implementation required) This final step reduces the accumulated data and returns the final map in its reduced form. Again, you may notice that the method returns a map of instead of the which was returned in the accumulateAll method. This happens for the exact same reason as the accumulateAll method, as the framework is designed to handle cases in which the reduced data differs in type from the accumulated data. To reduce the data down, use the map returned from the accumulateAll stage and put the results of the reduction into a new map. The provided Collector will come in extremely handy for this stage, more specifically the finisher which can be called using the finisher() method. This step should run in parallel and will probably be the easiest of the three methods. slide Matrix MapReduce Framework class: MatrixMapReduceFramework.java methods: mapAndAccumulateAll combineAndFinishAll package: mapreduce.framework.lab.matrix source folder: src/main/java Navigate to the MatrixMapReduceFramework.java class and there will be two methods for you to complete: mapAndAccumulateAll and combineAndFinishAll. These frameworks are meant to be extremely general and applied to more specific uses of MapReduce. The matrix framework is much more complex than the bottlenecked framework, but it boosts performance by grouping the map and accumulate stages so that everything can run in parallel. It does so by slicing up the given data into the specified mapTaskCount number of slices and assigns a reduce task number to each entry using the HashUtils toIndex() method. This, in effect, creates a matrix of dictionaries, hence the name of the framework. In the combineAndFinishAll stage, the matrix comes in handy by allowing us to go directly down the columns of the matrix (as each key is essentially grouped into a bucket), combining and reducing elements all-in-one. This concept was explained in more depth during class. mapAndAccumulateAll method: Map[][] mapAndAccumulateAll(E[] input) (parallel implementation required) In this stage, you will map and accumulate a given array of data into a matrix of dictionaries. This method should run in parallel while performing the map and accumulate portions of the bottlenecked framework (which we recommend you complete prior to embarking on this mission). As mentioned previously, the input should be sliced into a mapTaskCount number of IndexedRanges and then mapped/accumulated into its appropriate dictionary in the matrix. Although you could slice up the data into chunks yourself, we require using an identical algorithm as performed the IndexedRange and Slices classes introduced earlier in the course. This will allow us to provide better feedback to allow you to pinpoint bugs sooner. What is the best way to perform an identical algorithm to your Slices studio? Use your Slices studio, of course. For each slice, the mapper should map the input into its appropriate cell in the matrix and accumulate it into that specific dictionary. Essentially, you will need to nestle the actions of the accumulate method into the mapper. In order to find where the input should go in the matrix, remember that each slice keeps track of its index id and HashUtils has a toIndex method. Which is applicable to the row and which is applicable to the column? Hint: The number of rows should match the number of slices. slide slide combineAndFinishAll method: Map combineAndFinishAll(Map[][] input) (parallel implementation required) In this stage, you will take the matrix you just completed and combine all of the separate rows down to one array. Afterward, you will convert this combined array of maps into one final map. This method should run in parallel. As mentioned previously, you should go directly down the matrix to access the same bucket across the different slices you created in the mapAndAccumulateAll step. For all of the maps in a column, you should go through each entry and combine it down into one row. You will need to make use of the Collector’s finisher again, but you will also need to make use of the combiner. You can access the Collector’s combiner using the combiner() method. Although the combine step differs from the bottlenecked framework, the finish step should mirror what you did previously. Hint: You can use the provided MultiWrapMap class to return the final row as a valid output. You should also combine before you finish. slide Testing Your Solution Correctness There is a top-level test suite comprised of sub test suites which can be invoked separately when you want to focus on one part of the assignment. class: FrameworksLabTestSuite.java package: mapreduce.framework.lab source folder: src/test/java Bottlenecked class: BottleneckedFrameworkTestSuite.java package: mapreduce.framework.lab.bottlenecked source folder: src/test/java MapAll class: BottleneckedFrameworkTestSuite.java package: mapreduce.framework.lab.bottlenecked source folder: src/test/java AccumulateAll class: BottleneckedAccumulateAllTestSuite.java package: mapreduce.framework.lab.bottlenecked source folder: src/test/java FinishAll class: BottleneckedFinishAllTestSuite.java package: mapreduce.framework.lab.bottlenecked source folder: src/test/java Holistic class: BottleneckedHolisticTestSuite.java package: mapreduce.framework.lab.bottlenecked source folder: src/test/java Matrix class: MatrixFrameworkTestSuite.java package: mapreduce.framework.lab.matrix source folder: src/test/java MapAccumulateAll class: MatrixMapAccumulateAllTestSuite.java package: mapreduce.framework.lab.matrix source folder: src/test/java CombineFinishAll class: MatrixCombineFinishAllTestSuite.java package: mapreduce.framework.lab.matrix source folder: src/test/java Holistic class: MatrixHolisticTestSuite.java package: mapreduce.framework.lab.matrix source folder: src/test/java Rubric As always, please make sure to cite your work appropriately. Total points: 100 Bottlenecked framework subtotal: 40 Correct mapAll (10) Correct accumulateAll (20) Correct finishAll (10) Matrix framework subtotal: 60 Correct mapAndAccumulateAll (30) Correct combineAndFinishAll (30) Retrieved from "https://classes.engineering.wustl.edu/cse231/core/index.php?title=MapReduce_Frameworks_Lab&oldid=3174" Navigation menu Personal tools English Log in Namespaces Page Discussion Variants Views Read View source View history More Search Navigation Main page Recent changes Random page Help General Initial Setup Office Hours Reference Eclipse Tips Labs and Studios Lab: Nucleobase Counting Studio: Slices Studio: Floodfill Studio: Powers Of 2 Iterable Studio: Filter a.k.a Using Collections Studio: Race Conditions Studio: Matrix Multiply Lab: NQueens & Sudoku Challenge Sudoku Constraint Propagation +++Challenge: Advanced Constraint Propagator Studio: HashTableMap +++Challenge: Connect Four Studio: Int Sum MR Apps Studio: Collector Studio: Mutual Friends MR App Demo: Lambdas Studio: Cholera MR App Lab: MapReduce Frameworks Fun: Raytrace Scheduler Studio: Windbag MR App Studio: Fibonacci Group: Comparator Studio: MergeSort +++Challenge: Parallel Combiner Lab: Threads and Executors +++Challenge: Parallel Partitioner Studio: Scan Studio: Pack Group: K-mer String Map Lab: K-mer Studio: Atomicity Studio: Ordered Locks Studio: All or Nothing Locks Studio: Iterative Averaging Warmup: Fuzzy Iterative Averaging Studio: Legged Races Studio: Pipeline Extra Credit Tools What links here Related changes Special pages Printable version Permanent link Page information Cite this page This page was last modified on 5 November 2021, at 20:21. Privacy policy About CSE231 Wiki Disclaimers