### CNS*2021 Satellite Tutorial SA4 ## Methods from Data Science for Model Simulation, Analysis, and Visualization **Schedule:** 8-11:30am New York time on June 30, 2021 1. *Cengiz Gunay*, "From High Performance Computing to Hadoop and Spark" 2. *Anca Doloc-Mihu*, "High-dimensional data visualizations" (Lectures followed by hands-on practice and 15 min break)
### CNS*2021 Tutorial SA4 - Session 1 ## From High Performance Computing to Hadoop and Spark #### Cengiz Gunay, June 30, 2021
### Data! - Prediction: the world will have 44 _zettabytes_ by year 2020`$^*$` - Scale: mega, giga, tera, peta, exa, zetta (`$10^{21}$`) Data producers: - NY Stock Exchange: 4-5 terabytes/day - Facebook: 7 petabytes/month - Ancestry.com: 10 petabytes - Internet Archive: 18 petabytes - Computational neuroscience: ??? `$^*$` Hadoop: The definitive guide (2015); seem to match [current data](https://techjury.net/blog/big-data-statistics/#gref)
### Large-scale simulations and data analysis in computational neuroscience Typical needs: - Running large number of simulations (e.g. parameter search) - Analyzing large batches of data - Others? Workflows commonly used: 1. High-performance computing clusters (SGE, PBS, NSG, ...) 2. Cloud deployment of virtual machines (AWS, GCP, Azure, ...) 3. Specialized distributed computing platforms (Hadoop, Spark, ...)
### High-performance computing (HPC) Classical *scientific computing* environment: - Cluster of similar computers - Submit "jobs", scheduler software will run on available nodes - Results can be collected on network drives, or collected at end Cons: - Cluster usually administered by central authority - Can't install custom programs into OS, except in home folder - Must compete with other users and wait turn in queue - Errors come with delay
### Cloud deployment of virtual machines Popular and "easy" method on several vendors: - Amazon Web Services, Google Cloud Platform, Microsoft Azure, etc - Freedom to deploy own virtual machine (VM) "images" - Can quickly create copies of same machine - No waiting, but have to pay Cons: - More expensive than the HPC approach - More administrative work needed to orchestrate VMs - Not efficient because it involves creating whole machines
### Data science field has parallel computing solutions Not unique to computational neuroscience: - Large data requires more than one machine - Parallelization is painful - Load balancing is a problem, otherwise you wait for the slowest one A general solution: - **MapReduce** algorithm, implemented in **Apache Hadoop** - Divides data and processing across physical machines - Based on 2004 paper from Google, widely adopted in industry
### A MapReduce example for finding average spike rates Let's assume our input files look like this: ``` neuronID trialnum numspikes n678 1 12 n554 3 0 n554 4 2 n678 2 15 ... ``` What would the Map and Reduce steps would look like? - Map: Associate neuron ID with number of spikes: `(n678=>12)`, ... - Reduce: Average for each neuron ID: `n678=>avg(12,15)`, ...
### What are advantages of using MapReduce? - Works with large files without any consideration - Parallelizes task on multiple machines to cut down processing time - Parallel operation is automatic! But.. - Algorithm must be converted into MapReduce
### Data is distributed automatically on the Hadoop Distributed Filesystem (HDFS) - Optimized to work on parallel hardware:
### HDFS _local_ data replication - Data shipped out to each machine via _input splits_ for Map tasks
### HDFS input _splits_ merged into output _part_
### Workflow with multiple Reduce tasks - Output of *map* can be used for different *reduce* steps:
### Back to spike rates example for multiple reduce tasks Keep single map step: - Map: Associate neuron ID with number of spikes: `(n678=>12)`, ... Add multiple reduce steps: 1. Reduce: **Average** for each neuron ID: `n678=>avg(12,15)`, ... 1. Reduce: **Max** for each neuron ID: `n678=>max(12,15)`, ... 1. Reduce: **Mode** for each neuron ID: `n678=>mode(12,15)`, ... ...
### Reduce step can be omitted - Sometimes *map* is enough for "embarassingly parallel" simulations!
### Example without reduce step Assume input files with maximal conductance parameters for model: ``` gmaxNa gmaxK1 gmaxK2 ... 5e-9 1.2e-9 1.4e-9 ... 6e-9 1.2e-9 1.4e-9 ... 7e-9 1.2e-9 1.4e-9 ... ... ``` What would a Map step look like? - Map: run simulation with parameter values, and output results file
### Hadoop is dead. Long live Hadoop! - Hadoop created an ecosystem of projects: - Avro: Data serialization system - Flume: Work with data streams - Sqoop: Interface with traditional relational DBs - Hive: SQL queries converted to MapReduce - [Pig](https://www.slideshare.net/kevinweil/hadoop-pig-and-twitter-nosql-east-2009/): Hadoop processing with custom high-level language - Parquet: Columnar storage for nested data - Crunch: High-level API for using Hadoop - [Kafka](https://kafka.apache.org/): Distributed streaming platform - [Spark](https://spark.apache.org/): Another distributed computing framework - We will give a brief intro to Spark
### Spark Features - Developed in 2009, using Scala running on JVM - Speed: - Up to 100 times faster than Hadoop - In-memory storage of intermediate results - Multiple languages supported: - Python, Scala, R, Java - Advanced Analytics: - SQL queries, Streaming data - Machine learning, Graph algorithms
### Spark and Hadoop Differences Hadoop: - Batch process - Processes data from the HDFS and writes intermediate results to HDFS - Results are written to the HDFS Spark: - Can read data from streaming, local file system, HDFS or other distributed file system
### Spark Components
### Resilient Distributed Datasets (RDD) - Fundamental data structure for Spark - Immutable distributed collection of objects - Divided into logical partitions - Can be computed on different nodes - Can contain any data type or Python, Java, or Scala - Can contain User Defined Types (UDTs)
### Spark SQL - Integrates relational processing - Supports queries in SQL and HQL (Hive Query Language) - Supports various data sources - CSV, Text, JSON, etc. - Libraries - DataSource API - DataFrame API - Interpreter and Optimizer - SQL Service
### Spark Streaming - Process real-time streaming data - Fundamental stream unit is DStream - Series of RDDs to process real-time data - Viewed as a continually growing table
### MLib (Machine Learning Library) - Tools - ML Algorithms - Classification, Regression, Clustering, Collaborative Filtering - Featurization - Feature extraction, transformations, and dimension reduction - Pipelines - Contruct and training ML pipelines - Persistence - Saving and loading algorithms, models and pipelines - Utilities - Linear algebra, statistics, data handling
### GraphX - API for graphs and graph-parallel computation - Extends RDD with a Resilient Distributed Property Graph - Can have parallel edges to define multiple relationships between vertices ### GraphFrames - Similar to GraphX but built on top of DataFrames
### DataSet and DataFrame - DataSet - Distributed collection of data - Strong typing - Available in Scala and Java - DataFrame - DataSet organized into named columns - Representation of DataSet in rows - Available in Scala, Java, Python and R
### Practice time! Connect to our Spark server - See our [Sched page](https://cns2021online.sched.com/event/kFFq/sa4-methods-from-data-science-for-model-simulation-analysis-and-visualization) for the IP address and downloads On Mac and Linux: - Download `spark-key` from [Sched](https://cns2021online.sched.com/event/kFFq/sa4-methods-from-data-science-for-model-simulation-analysis-and-visualization) - Open terminal and run: ```bash $ ssh -i spark-key cnsuser@IP address ``` On Windows: - Download `spark-windows.ppk` from [Sched](https://cns2021online.sched.com/event/kFFq/sa4-methods-from-data-science-for-model-simulation-analysis-and-visualization) - Download [Putty](https://www.chiark.greenend.org.uk/~sgtatham/putty/) and follow [instructions](https://devops.ionos.com/tutorials/use-ssh-keys-with-putty-on-windows/#connect-to-server-with-private-key) to use ppk key file - [Open SSH connection](https://www.ssh.com/ssh/putty/windows/) to IP address with username "cnsuser"
### Practice time! (Cont) Once connected to Spark server, run: ```bash $ docker exec -it klt-spark-wwym bin/pyspark ``` which should result in: ```bash Python 3.6.11 (default, Jul 15 2020, 18:47:56) [GCC 8.3.0] on linux Type "help", "copyright", "credits" or "license" for more information. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.0.0 /_/ Using Python version 3.6.11 (default, Jul 15 2020 18:47:56) SparkSession available as 'spark'. >>> ```
### Spark works with _resilient distributed datasets_ (RDDs) - Try these on the `pyspark` command line: - A line count program example: ```python >>> lines = sc.textFile("data/tutorial.md") # Create an RDD called lines >>> lines.count() # Count the number of items in this RDD 67 >>> lines.first() # First item in this RDD, i.e. first line of tutorial.md '---' ``` - Parallel operation completely transparent! - `sc` is the `SparkContext` _driver_ to access Spark and create RDDs - RDD is broken into pieces to run `count()` in parallel
### Inside Spark
- Try another example with a custom filter function: ```python >>> sparkLines = lines.filter(lambda line: "Spark" in line) # like grep on Hadoop >>> sparkLines.count() # a lower number than before >>> sparkLines.first() 'tools, such as [Apache Spark](https://spark.apache.org/). These tools' ```
### Map-Reduce operations generalize to
Transformation-Action with RDDs _Transformations_ create new RDDs; e.g.: ```python >>> sparkLines = lines.filter(lambda line: "Spark" in line) ``` _Actions_ calculate results from RDDs; e.g.: ```python >>> sparkLines.first() ```
### RDD transformations: `map()` and `filter()`
```python >>> inputRDD = sc.parallelize([1, 2, 3, 4]) # Create a numeric input RDD ``` ```python >>> outputRDD = inputRDD.map(lambda x: x * x) # MAP ``` ```python >>> outputRDD.collect() # Show all results ``` ```python >>> outputRDD = inputRDD.filter(lambda x: x != 1) # FILTER, run collect() again ```
### RDD transformations (Continued) - For `map()`, if multiple outputs for each input, then use `flatMap()` - Set operations: - `distinct()` - `union()` - `intersection()` - `subtract()` - `cartesian()` -> expensive
### RDD actions - `reduce()`: Takes two inputs and outputs one; output is a scalar ```python >>> sum = inputRDD.reduce(lambda x, y: x + y) >>> sum 10 ``` - `fold()`: Similar, also takes a _zero_ value for initialization ```python sum = inputRDD.fold(0, lambda x, y: x + y) ``` - `aggregate()` asks for accummulation and combine functions. Example that calculates a running sum and count of elements to calculate an average value: ```python sumCount = inputRDD.aggregate((0, 0), (lambda acc, value: (acc[0] + value, acc[1] + 1)), (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))) return sumCount[0] / float(sumCount[1]) ```
### RDD actions (continued) - `collect()`: Return all elements - `count()`: Number of elements - `countByValue()`: Number of times each element occurs in RDD (histogram) - and more...
### RDDs can act as DataFrames! - Can read/write data in JSON, CSV, Hive Tables, and Parquet - _DataFrame_ holds semi-structured data, supports SQL-like querying ```python df = spark.read.json("examples/src/main/resources/people.json") # also see read.csv() # Displays the content of the DataFrame to stdout df.show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+ # # Select only the "name" column df.select("name").show() # Select people older than 21 df.filter(df['age'] > 21).show() ```
### Spark can run SQL queries in parallel - SQL interface to be used inside or outside Spark
(e.g. JDBC connection or from Tableau) ```python # Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people") sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+ ```
### Spark Dataframes can be convert from and to Pandas Dataframes - [Pandas](https://pandas.pydata.org/) provides the standard Dataframe object in Python - Enable Apache Arrow in Spark [to use Pandas objects](https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html) ```python import numpy as np import pandas as pd # Enable Arrow-based columnar data transfers spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") # Generate a Pandas DataFrame pdf = pd.DataFrame(np.random.rand(100, 3)) # Create a Spark DataFrame from a Pandas DataFrame using Arrow df = spark.createDataFrame(pdf) # Convert the Spark DataFrame back to a Pandas DataFrame using Arrow result_pdf = df.select("*").toPandas() ```
### A Spark-ling challenge for you! Load our CSV file with measurements from model neuron simulations: ```python df = spark.read.csv("data/AnalySim.csv", header="true") # case sensitive # show info df.printSchema() df.show() ``` Question: - Use [RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html) and [Dataframe](https://spark.apache.org/docs/latest/sql-getting-started.html) operations to calculate one of min, max, average, standard deviation for one column, or an arithmetic operation between multiple columns to find a ratio, slopes, etc. - You can also filter rows based on column constraints. - Post your code and results on the Discord `#sa4` channel or Zoom.
### If you want to install Spark locally - Installation could become complex, [Hadoop and Spark](https://spark.apache.org/docs/latest/#downloading) are bundled together - Can also just download and run Docker image from [bitnami/spark](https://hub.docker.com/r/bitnami/spark/), and run it like in our example above.
### Closing notes for Spark
Spark also provides: - Real-time processing via streaming (Chapter 10) - Machine learning library _MLlib_ (Chapter 11) More resources: - [Book's Github repo with examples](https://github.com/databricks/learning-spark) - [Spark API documentation](https://spark.apache.org/docs/latest/) - [Spark Quick Start](https://spark.apache.org/docs/latest/quick-start.html) - [RDD Programming Guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)
### Questions and comments? - Conversation can continue if you post on our the Discord `#sa4` channel or post comments on our [tutorial website](https://cengique.github.io/cns2020-tutorials-website/2021/06/30/cns2021-tutorial-data-science-methods.html). - Please fill [our survey](https://ggc.az1.qualtrics.com/jfe/form/SV_b9R3rk4dE3Fxx7E) before leaving the tutorial - Thank you for coming to our tutorial! :)
### What is MapReduce? An example A weather dataset from White Chapter 2 - Live streaming data from weather stations all around world - Each row is one reading from one station at a time
### Back to the command-line: inspect the data ```bash % ls raw/1990 | head 010010-99999-1990.gz 010014-99999-1990.gz 010015-99999-1990.gz 010016-99999-1990.gz 010017-99999-1990.gz 010030-99999-1990.gz 010040-99999-1990.gz 010080-99999-1990.gz 010100-99999-1990.gz 010150-99999-1990.gz ```
- Many small files; can be analyzed sequentially with `awk`: ```bash % ./max_temperature.sh 1901 317 1902 244 1903 289 1904 256 1905 283 ... ```
### Enter Map+Reduce: Can be partitioned to run on parallel hardware
### Map in Java: (ID, row of text) `$\Rightarrow$` (year, temp) ```java public class MaxTemperatureMapper extends Mapper
{ private static final int MISSING = 9999; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if (airTemperature != MISSING && quality.matches("[01459]")) { context.write(new Text(year), new IntWritable(airTemperature)); } } } ```
### Reduce: (year, [temps]) `$\Rightarrow$` (year, max temp) ```java public class MaxTemperatureReducer extends Reducer
{ @Override public void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue)); } } ```
### Putting it all together ```java public class MaxTemperature { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: MaxTemperature
"); System.exit(-1); } Job job = new Job(); job.setJarByClass(MaxTemperature.class); job.setJobName("Max temperature"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } } ```
### A sample run ```bash % export HADOOP_CLASSPATH=hadoop-examples.jar % hadoop MaxTemperature input/ncdc/sample.txt output 14/09/16 09:48:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/09/16 09:48:40 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 14/09/16 09:48:40 INFO input.FileInputFormat: Total input paths to process : 1 14/09/16 09:48:40 INFO mapreduce.JobSubmitter: number of splits:1 14/09/16 09:48:40 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local26392882_0001 14/09/16 09:48:40 INFO mapreduce.Job: The url to track the job: http://localhost:8080/ 14/09/16 09:48:40 INFO mapreduce.Job: Running job: job_local26392882_0001 14/09/16 09:48:40 INFO mapred.LocalJobRunner: OutputCommitter set in config null 14/09/16 09:48:40 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 14/09/16 09:48:40 INFO mapred.LocalJobRunner: Waiting for map tasks 14/09/16 09:48:40 INFO mapred.LocalJobRunner: Starting task: attempt_local26392882_0001_m_000000_0 14/09/16 09:48:40 INFO mapred.Task: Using ResourceCalculatorProcessTree : null 14/09/16 09:48:40 INFO mapred.LocalJobRunner: 14/09/16 09:48:40 INFO mapred.Task: Task:attempt_local26392882_0001_m_000000_0 is done. And is in the process of committing 14/09/16 09:48:40 INFO mapred.LocalJobRunner: map 14/09/16 09:48:40 INFO mapred.Task: Task 'attempt_local26392882_0001_m_000000_0' done. 14/09/16 09:48:40 INFO mapred.LocalJobRunner: Finishing task: attempt_local26392882_0001_m_000000_0 14/09/16 09:48:40 INFO mapred.LocalJobRunner: map task executor complete. 14/09/16 09:48:40 INFO mapred.LocalJobRunner: Waiting for reduce tasks 14/09/16 09:48:40 INFO mapred.LocalJobRunner: Starting task: attempt_local26392882_0001_r_000000_0 14/09/16 09:48:40 INFO mapred.Task: Using ResourceCalculatorProcessTree : null 14/09/16 09:48:40 INFO mapred.LocalJobRunner: 1 / 1 copied. 14/09/16 09:48:40 INFO mapred.Merger: Merging 1 sorted segments 14/09/16 09:48:40 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 50 bytes 14/09/16 09:48:40 INFO mapred.Merger: Merging 1 sorted segments 14/09/16 09:48:40 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 50 bytes 14/09/16 09:48:40 INFO mapred.LocalJobRunner: 1 / 1 copied. 14/09/16 09:48:40 INFO mapred.Task: Task:attempt_local26392882_0001_r_000000_0 is done. And is in the process of committing 14/09/16 09:48:40 INFO mapred.LocalJobRunner: 1 / 1 copied. 14/09/16 09:48:40 INFO mapred.Task: Task attempt_local26392882_0001_r_000000_0 is allowed to commit now 14/09/16 09:48:40 INFO output.FileOutputCommitter: Saved output of task 'attempt...local26392882_0001_r_000000_0' to file:/Users/tom/book-workspace/ hadoop-book/output/_temporary/0/task_local26392882_0001_r_000000 14/09/16 09:48:40 INFO mapred.LocalJobRunner: reduce > reduce 14/09/16 09:48:40 INFO mapred.Task: Task 'attempt_local26392882_0001_r_000000_0' done. 14/09/16 09:48:40 INFO mapred.LocalJobRunner: Finishing task: attempt_local26392882_0001_r_000000_0 14/09/16 09:48:40 INFO mapred.LocalJobRunner: reduce task executor complete. 14/09/16 09:48:41 INFO mapreduce.Job: Job job_local26392882_0001 running in uber mode : false 14/09/16 09:48:41 INFO mapreduce.Job: map 100% reduce 100% 14/09/16 09:48:41 INFO mapreduce.Job: Job job_local26392882_0001 completed successfully 14/09/16 09:48:41 INFO mapreduce.Job: Counters: 30 File System Counters FILE: Number of bytes read=377168 FILE: Number of bytes written=828464 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 Map-Reduce Framework Map input records=5 Map output records=5 Map output bytes=45 Map output materialized bytes=61 Input split bytes=129 Combine input records=0 Combine output records=0 Reduce input groups=2 Reduce shuffle bytes=61 Reduce input records=5 Reduce output records=2 Spilled Records=10 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=39 Total committed heap usage (bytes)=226754560 File Input Format Counters Bytes Read=529 File Output Format Counters Bytes Written=29 ```