### CNS*2020 Tutorial #6 ## Methods from Data Science for Model Simulation, Analysis, and Visualization **Organizers:** Cengiz Gunay and Anca Doloc-Mihu **Schedule:** 7-10pm Berlin time on July 18, 2020 1. *C Gunay*, "From High Performance Computing to Hadoop and Spark" 2. *A Doloc-Mihu*, "High-dimensional data visualizations" 3. *H Dinh, J Walton, and A Morariu*, "Analysim.tech: A data sharing site for crowdsourcing analysis of parameter-search datasets" (each hands-on session is 50 min followed by break of 10 min)
### CNS*2020 Tutorial #6 - Session 1 ## From High Performance Computing to Hadoop and Spark #### Cengiz Gunay, July 18, 2020
### Data! - Prediction: the world will have 44 _zettabytes_ by year 2020`$^*$` - Scale: mega, giga, tera, peta, exa, zetta (`$10^{21}$`) Data producers: - NY Stock Exchange: 4-5 terabytes/day - Facebook: 7 petabytes/month - Ancestry.com: 10 petabytes - Internet Archive: 18 petabytes - Computational neuroscience: ??? `$^*$` Hadoop: The definitive guide (2015); seem to match [current data](https://techjury.net/blog/big-data-statistics/#gref)
### Large-scale simulations and data analysis in computational neuroscience Workflows in historical order: 1. High-performance computing clusters (SGE, PBS, NSG, ...) 2. Cloud deployment of virtual machines (AWS, GCP, Azure, ...) 3. Specialized distributed computing platforms (Hadoop, Spark, ...)
### High-performance computing (HPC) Classical *scientific computing* environment: - Cluster of similar computers - Submit "jobs", scheduler software will run on available nodes - Results can be collected on network drives, or collected at end Cons: - Cluster usually administered by central authority - Can't install custom programs into OS, except in home folder - Must compete with other users and wait turn in queue - Errors come with delay
### Cloud deployment of virtual machines Popular and "easy" method on several vendors: - Amazon Web Services, Google Cloud Platform, Microsoft Azure, etc - Freedom to deploy own virtual machine (VM) "images" - Can quickly create copies of same machine - No waiting, but have to pay Cons: - More expensive than the HPC approach - More administrative work needed to orchestrate VMs - Not efficient because it involves creating whole machines
### Data science field has parallel computing solutions Not unique to computational neuroscience: - Large data requires more than one machine - Parallelization is painful - Load balancing is a problem, otherwise you wait for the slowest one A general solution: - **MapReduce** algorithm, implemented in **Apache Hadoop** - Has been widely adopted in industry - Hadoop comes with an ecosystem of tools: YARN, HDFS, Pig, Spark
### Your turn! Type on Zoom chat your research use case for MapReduce. Identify steps for: - **map**: extract information from each item - **reduce**: combine/process mapped information to produce output **Example:** *map* to select a parameter set, and *reduce* to average it
### What is MapReduce? An example A weather dataset from White Chapter 2 - Live streaming data from weather stations all around world - Each row is one reading from one station at a time
### Back to the command-line: inspect the data ```bash % ls raw/1990 | head 010010-99999-1990.gz 010014-99999-1990.gz 010015-99999-1990.gz 010016-99999-1990.gz 010017-99999-1990.gz 010030-99999-1990.gz 010040-99999-1990.gz 010080-99999-1990.gz 010100-99999-1990.gz 010150-99999-1990.gz ```
- Many small files; can be analyzed sequentially with `awk`: ```bash % ./max_temperature.sh 1901 317 1902 244 1903 289 1904 256 1905 283 ... ```
### Enter Map+Reduce: Can be partitioned to run on parallel hardware
### Map in Java: (ID, row of text) `$\Rightarrow$` (year, temp) ```java public class MaxTemperatureMapper extends Mapper
{ private static final int MISSING = 9999; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if (airTemperature != MISSING && quality.matches("[01459]")) { context.write(new Text(year), new IntWritable(airTemperature)); } } } ```
### Reduce: (year, [temps]) `$\Rightarrow$` (year, max temp) ```java public class MaxTemperatureReducer extends Reducer
{ @Override public void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue)); } } ```
### Putting it all together ```java public class MaxTemperature { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: MaxTemperature
"); System.exit(-1); } Job job = new Job(); job.setJarByClass(MaxTemperature.class); job.setJobName("Max temperature"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } } ```
### A sample run ```bash % export HADOOP_CLASSPATH=hadoop-examples.jar % hadoop MaxTemperature input/ncdc/sample.txt output 14/09/16 09:48:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/09/16 09:48:40 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 14/09/16 09:48:40 INFO input.FileInputFormat: Total input paths to process : 1 14/09/16 09:48:40 INFO mapreduce.JobSubmitter: number of splits:1 14/09/16 09:48:40 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local26392882_0001 14/09/16 09:48:40 INFO mapreduce.Job: The url to track the job: http://localhost:8080/ 14/09/16 09:48:40 INFO mapreduce.Job: Running job: job_local26392882_0001 14/09/16 09:48:40 INFO mapred.LocalJobRunner: OutputCommitter set in config null 14/09/16 09:48:40 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 14/09/16 09:48:40 INFO mapred.LocalJobRunner: Waiting for map tasks 14/09/16 09:48:40 INFO mapred.LocalJobRunner: Starting task: attempt_local26392882_0001_m_000000_0 14/09/16 09:48:40 INFO mapred.Task: Using ResourceCalculatorProcessTree : null 14/09/16 09:48:40 INFO mapred.LocalJobRunner: 14/09/16 09:48:40 INFO mapred.Task: Task:attempt_local26392882_0001_m_000000_0 is done. And is in the process of committing 14/09/16 09:48:40 INFO mapred.LocalJobRunner: map 14/09/16 09:48:40 INFO mapred.Task: Task 'attempt_local26392882_0001_m_000000_0' done. 14/09/16 09:48:40 INFO mapred.LocalJobRunner: Finishing task: attempt_local26392882_0001_m_000000_0 14/09/16 09:48:40 INFO mapred.LocalJobRunner: map task executor complete. 14/09/16 09:48:40 INFO mapred.LocalJobRunner: Waiting for reduce tasks 14/09/16 09:48:40 INFO mapred.LocalJobRunner: Starting task: attempt_local26392882_0001_r_000000_0 14/09/16 09:48:40 INFO mapred.Task: Using ResourceCalculatorProcessTree : null 14/09/16 09:48:40 INFO mapred.LocalJobRunner: 1 / 1 copied. 14/09/16 09:48:40 INFO mapred.Merger: Merging 1 sorted segments 14/09/16 09:48:40 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 50 bytes 14/09/16 09:48:40 INFO mapred.Merger: Merging 1 sorted segments 14/09/16 09:48:40 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 50 bytes 14/09/16 09:48:40 INFO mapred.LocalJobRunner: 1 / 1 copied. 14/09/16 09:48:40 INFO mapred.Task: Task:attempt_local26392882_0001_r_000000_0 is done. And is in the process of committing 14/09/16 09:48:40 INFO mapred.LocalJobRunner: 1 / 1 copied. 14/09/16 09:48:40 INFO mapred.Task: Task attempt_local26392882_0001_r_000000_0 is allowed to commit now 14/09/16 09:48:40 INFO output.FileOutputCommitter: Saved output of task 'attempt...local26392882_0001_r_000000_0' to file:/Users/tom/book-workspace/ hadoop-book/output/_temporary/0/task_local26392882_0001_r_000000 14/09/16 09:48:40 INFO mapred.LocalJobRunner: reduce > reduce 14/09/16 09:48:40 INFO mapred.Task: Task 'attempt_local26392882_0001_r_000000_0' done. 14/09/16 09:48:40 INFO mapred.LocalJobRunner: Finishing task: attempt_local26392882_0001_r_000000_0 14/09/16 09:48:40 INFO mapred.LocalJobRunner: reduce task executor complete. 14/09/16 09:48:41 INFO mapreduce.Job: Job job_local26392882_0001 running in uber mode : false 14/09/16 09:48:41 INFO mapreduce.Job: map 100% reduce 100% 14/09/16 09:48:41 INFO mapreduce.Job: Job job_local26392882_0001 completed successfully 14/09/16 09:48:41 INFO mapreduce.Job: Counters: 30 File System Counters FILE: Number of bytes read=377168 FILE: Number of bytes written=828464 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 Map-Reduce Framework Map input records=5 Map output records=5 Map output bytes=45 Map output materialized bytes=61 Input split bytes=129 Combine input records=0 Combine output records=0 Reduce input groups=2 Reduce shuffle bytes=61 Reduce input records=5 Reduce output records=2 Spilled Records=10 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=39 Total committed heap usage (bytes)=226754560 File Input Format Counters Bytes Read=529 File Output Format Counters Bytes Written=29 ```
### HDFS: Hadoop Distributed Filesystem - Optimized to work on parallel hardware:
### HDFS _local_ data replication - Data shipped out to each machine via _input splits_ for Map tasks
### HDFS input _splits_ merged into output _part_
### Workflow with multiple Reduce tasks - Output of *map* can be used for different *reduce* steps:
### Reduce step can be omitted - Sometimes *map* is enough for "embarassingly parallel" simulations!
### Hadoop is dead. Long live Hadoop! - Hadoop created an ecosystem of projects: - Avro: Data serialization system - Flume: Work with data streams - Sqoop: Interface with traditional relational DBs - Hive: SQL queries converted to MapReduce - [Pig](https://www.slideshare.net/kevinweil/hadoop-pig-and-twitter-nosql-east-2009/): Hadoop processing with custom high-level language - Parquet: Columnar storage for nested data - Crunch: High-level API for using Hadoop - [Kafka](https://kafka.apache.org/): Distributed streaming platform - [Spark](https://spark.apache.org/): Another distributed computing framework - We will give a brief intro to Spark
### Spark - Since 2009 - 10x-20x faster than Hadoop - Written in Scala running on JVM - Can be programmed in Scala, Java, and Python
### Practice time! Connect to our Spark server - See our [Sched page](https://cns2020online.sched.com/event/a1f62dac60f1e6d93a34724500d3ff66) for the IP address and downloads On Mac and Linux: - Download `spark-key` from [Sched](https://cns2020online.sched.com/event/a1f62dac60f1e6d93a34724500d3ff66) - Open terminal and run: ```bash $ ssh -i spark-key cnsuser@IP address ``` On Windows: - Download `spark-windows.ppk` from [Sched](https://cns2020online.sched.com/event/a1f62dac60f1e6d93a34724500d3ff66) - Download [Putty](https://www.chiark.greenend.org.uk/~sgtatham/putty/) and follow [instructions](https://devops.ionos.com/tutorials/use-ssh-keys-with-putty-on-windows/#connect-to-server-with-private-key) to use ppk key file - [Open SSH connection](https://www.ssh.com/ssh/putty/windows/) to IP address with username "cnsuser"
### Practice time! (Cont) Once connected to Spark server, run: ```bash $ docker exec -it klt-spark-wwym bin/pyspark ``` which should result in: ```bash Python 3.6.11 (default, Jul 15 2020, 18:47:56) [GCC 8.3.0] on linux Type "help", "copyright", "credits" or "license" for more information. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.0.0 /_/ Using Python version 3.6.11 (default, Jul 15 2020 18:47:56) SparkSession available as 'spark'. >>> ```
### Spark works with _resilient distributed datasets_ (RDDs) - Try these on the `pyspark` command line - A line count program example: ```python >>> lines = sc.textFile("data/tutorial.md") # Create an RDD called lines >>> lines.count() # Count the number of items in this RDD 67 >>> lines.first() # First item in this RDD, i.e. first line of tutorial.md '---' ``` - Parallel operation completely transparent! - `sc` is the `SparkContext` _driver_ to access Spark and create RDDs - RDD is broken into pieces to run `count()` in parallel
### Inside Spark
- Try another example with a custom filter function: ```python >>> lines = sc.textFile("data/tutorial.md") >>> sparkLines = lines.filter(lambda line: "Spark" in line) >>> sparkLines.first() 'tools, such as [Apache Spark](https://spark.apache.org/). These tools' ```
### Map-Reduce operations generalize to
Transformation-Action with RDDs _Transformations_ create new RDDs; e.g.: ```python >>> sparkLines = lines.filter(lambda line: "Spark" in line) ``` _Actions_ calculate results from RDDs; e.g.: ```python >>> sparkLines.first() ```
### RDD transformations: `map()` and `filter()`
```python >>> inputRDD = sc.parallelize([1, 2, 3, 4]) # Create a numeric input RDD ``` ```python >>> outputRDD = inputRDD.map(lambda x: x * x) # MAP ``` ```python >>> outputRDD = inputRDD.filter(lambda x: x != 1) # FILTER ``` ```python >>> outputRDD.collect() # Show all results ```
### RDD transformations (Continued) - For `map()`, if multiple outputs for each input, then use `flatMap()` - Set operations: - `distinct()` - `union()` - `intersection()` - `subtract()` - `cartesian()` -> expensive
### RDD actions - Most common `reduce()`: Takes two inputs and outputs one ```python sum = inputRDD.reduce(lambda x, y: x + y) ``` - `fold()` also takes a _zero_ value for initialization ```python sum = inputRDD.fold(0, lambda x, y: x + y) ``` - `aggregate()` asks for accummulation and combine functions. Example that calculates a running sum and count of elements to calculate an average value: ```python sumCount = inputRDD.aggregate((0, 0), (lambda acc, value: (acc[0] + value, acc[1] + 1)), (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))) return sumCount[0] / float(sumCount[1]) ```
### RDD actions (continued) - `collect()`: Return all elements - `count()`: Number of elements - `countByValue()`: Number of times each element occurs in RDD (histogram) - and more...
### RRDs can act as DataFrames! - Can read/write data in JSON, CSV, Hive Tables, and Parquet - _DataFrame_ holds semi-structured data, supports SQL-like querying ```python df = spark.read.json("examples/src/main/resources/people.json") # also see read.csv() # Displays the content of the DataFrame to stdout df.show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+ # # Select only the "name" column df.select("name").show() # Select people older than 21 df.filter(df['age'] > 21).show() ```
### Spark can run SQL queries in parallel - SQL interface to be used inside or outside Spark
(e.g. JDBC connection or from Tableau) ```python # Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people") sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+ ```
### A Spark-ling challenge for you! Load our CSV file with measurements from model neuron simulations: ```python df = spark.read.csv("data/AnalySim.csv") # watch for capitalization # show info df.printSchema() df.show() ``` Question: - Use [RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html) and [Dataframe](https://spark.apache.org/docs/latest/sql-getting-started.html) operations to calculate one of min, max, average, standard deviation for one column, or an arithmetic operation between multiple columns to find a ratio, slopes, etc. - You can also filter rows based on column constraints. - Post your code and results on [NeuroStars](https://neurostars.org/t/cns-2020-tutorial-t6-methods-from-data-science-for-model-simulation-analysis-and-visualization/13337) or Zoom.
### If you want to install Spark locally - Installation could become complex, [Hadoop and Spark](https://spark.apache.org/docs/latest/#downloading) are bundled together - Can also just download and run Docker image from [bitnami/spark](https://hub.docker.com/r/bitnami/spark/), and run it like in our example above.
### Closing notes for Spark
Spark also provides: - Real-time processing via streaming (Chapter 10) - Machine learning library _MLlib_ (Chapter 11) More resources: - [Book's Github repo with examples](https://github.com/databricks/learning-spark) - [Spark API documentation](https://spark.apache.org/docs/latest/) - [Spark Quick Start](https://spark.apache.org/docs/latest/quick-start.html) - [RDD Programming Guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)
### Questions and comments? - Conversation can continue if you post on our [NeuroStars page](https://neurostars.org/t/cns-2020-tutorial-t6-methods-from-data-science-for-model-simulation-analysis-and-visualization/13337). - Please fill [our survey](https://ggc.az1.qualtrics.com/jfe/form/SV_023P8dEuPo8G8N7) before leaving the tutorial - Thank you for coming to our tutorial! :)