Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge pull request #1 from MIDS-scaling-up/master #38

Open
wants to merge 26 commits into
base: hw7-fixed
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
26cb334
Merge pull request #1 from MIDS-scaling-up/master
RajeshThallam Jun 15, 2015
3509fb0
Merge pull request #33 from MIDS-scaling-up/hw7-fixed
michaeldye Jun 17, 2015
8577351
added preconditions, etc.
michaeldye Jun 17, 2015
2159967
Merge pull request #34 from michaeldye/content/spark_spam_add
michaeldye Jun 17, 2015
2c806ab
removed redundant preconditions
michaeldye Jun 17, 2015
61a7bbb
Merge pull request #35 from michaeldye/content/remove_reduntant_spam_…
michaeldye Jun 17, 2015
f06a21e
Fixed code to run across the cluster
jredmann Jun 18, 2015
a224afe
Cleaned up week6 lab code
jredmann Jun 18, 2015
88bd47f
Updated README.md to reflect fixed code for week 6 lab
jredmann Jun 18, 2015
2a26ade
Fixed line to run spark job in week 6 lab
jredmann Jun 18, 2015
6f065f3
Update README.md
jon-da-thon Jun 19, 2015
444a3ac
Update README.md
rboberg Jun 19, 2015
da5c850
removed iterables
jon-da-thon Jun 20, 2015
9675fbd
Merge pull request #37 from dyejon/wk7-fixes
jon-da-thon Jun 20, 2015
c855195
First Version
rbraddes Jun 23, 2015
3f0093a
data_xfer_perf lab
michaeldye Jun 24, 2015
59bf51a
added Rsync Investigation lab
michaeldye Jun 24, 2015
66fa116
Merge pull request #40 from michaeldye/content/week7_labs
michaeldye Jun 24, 2015
85f165b
First Version fixed "turn-in" section
rbraddes Jun 24, 2015
a3c6596
Merge pull request #36 from rboberg/patch-2
jon-da-thon Jun 24, 2015
2a75c7b
Merge pull request #39 from MIDS-scaling-up/hw8
rbraddes Jun 24, 2015
cf4f38d
improved directions in rsync lab
michaeldye Jun 24, 2015
fbd2693
Merge pull request #41 from michaeldye/content/lab7_instructions_impr…
michaeldye Jun 24, 2015
e04d01d
added link to week 8 hw
michaeldye Jun 24, 2015
4eff8b6
Merge pull request #42 from michaeldye/content/week8_main_page_link
michaeldye Jun 24, 2015
2835e75
Merge pull request #2 from MIDS-scaling-up/master
RajeshThallam Jun 28, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,25 @@ _There will be no in-class lab for this assignment_

0. [Apache Spark Introduction](week6/hw/apache_spark_introduction)

### Labs

0. [Machine Learning with Spark and MLLib](week6/labs/Spam)

## Week 7: Object Storage

### Homework

0. [Object Storage](week7/hw)

### Labs

(Complete the following in order)

0. [Data Transfer Performance](week7/labs/data_xfer_perf)
0. [Rsync Investigation](week7/labs/rsync_investigation)

## Week 8: NoSQL

### Homework

0. [NoSQL](week8/hw)
130 changes: 66 additions & 64 deletions week6/labs/Spam/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,32 @@ In most big data analytics processes we develop a model to do the analysis as a
3. Learning algorithm testing
4. Model persistence

In this lab we will see a simple version of each of these steps executed in Spark.
In this lab we will see a simple version of each of these steps executed in Spark.

The main advantage of using Spark's **_MLLib package_** is that it's machine learning algorithms are all **_parallelized already_**. You do not have to write map and reduce functions to make them work in parallel.

## Preconditions

You must have a Spark cluster setup including the MLLib library. You will also need Python 2.7 with Numpy 1.7 and PySpark provisioned on each machine. It is assumed that you have had prior exposure to machine learning algorithms (this lab will not cover the details of how these algorithms work).
You must have a Spark cluster setup including the MLLib library (see [Apache Spark Introduction](../../hw/apache_spark_introduction) if you need to set up a Spark cluster). You will also need Python 2.7 with Numpy 1.7 on each machine. You will need Git on the master It is assumed that you have had prior exposure to machine learning algorithms (this lab will not cover the details of how these algorithms work).

To install Numpy use yum:
To install Numpy and Git use yum:

$ yum install numpy
$ yum update
$ yum install numpy

On the master only:

$ yum install git

You will then need to clone the course GitHub repo:

$ git clone https://github.com/MIDS-scaling-up/coursework.git

After cloning you need to copy the data directory, hamster.py, and spamFilter.py to the root directory

$ cp -r coursework/week6/labs/Spam/data ~/
$ cp coursework/week6/labs/Spam/hamster.py ~/
$ cp coursework/week6/labs/Spam/spamFilter.py ~/

## Scenario: Spam Filter

Expand All @@ -29,30 +44,38 @@ The spam data are available here https://spamassassin.apache.org/publiccorpus/20

The ham data are available here https://spamassassin.apache.org/publiccorpus/20030228_easy_ham_2.tar.bz2

**EXERCISE:** Run the spamFilter.py script available in the GitHub project. You should see an error rate for the spam filter in the output. To run Python scripts in Spark use:
**EXERCISE:** Run the spamFilter.py script available in the GitHub project. You should see an error rate for the spam filter in the output.

First run hamster.py and then copy the data to the worker nodes. This needs to be done because we do not have a distributed file system or a database setup:

$ python hamster
$ scp -r data/ root@spark2:/root
$ scp -r data/ root@spark3:/root

To run Python scripts in Spark use:

$SPARK_HOME/bin/spark-submit --master spark://spark1:7077 spamFilter.py

$SPARK_HOME/bin/spark-submit spamFilter.py

Now lets walk through the code in spamFilter.py and see what its doing.

## Part 1: The Spark Context

This driver program has your program's main method and is executed on the Spark Master. The **_SparkContext_** is the object representing the driver program's connection to the rest of the cluster. In our applications we will need to create a Spark Context like this:
This driver program has your program's main method and is executed on the Spark Master. The **_SparkContext_** is the object representing the driver program's connection to the rest of the cluster. In our applications we will need to create a Spark Context like this:

from pyspark import SparkContext

sc = SparkContext( appName="Spam Filter")

There are several optional arguments that can be given to the constructor of the Spark Context. There are also several helper methods that can be called on the Spark Context. Read about them [here](https://spark.apache.org/docs/1.0.2/api/python/pyspark.context.SparkContext-class.html) in the pyspark reference.


## Part 2: Data Preprocessing

The training and test data need to be in a format that can be easily used to generate feature vectors for training the learning algorithm.
The training and test data need to be in a format that can be easily used to generate feature vectors for training the learning algorithm.

### Consolidating Emails to a Single Data Source ([Data Munging](http://en.wikipedia.org/wiki/Data_wrangling))

The textFile method will make an RDD from a text file. It will by default treat each line of the text file as a one data point. So we need to transform each email such that it will appear as a single line of text (no returns / line feeds) and combine all the emails of a given class (spam or ham) into a single input file for that class. Since we will use this process twice we will define it as a separate function.
The textFile method will make an RDD from a text file. It will by default treat each line of the text file as a one data point. So we need to transform each email such that it will appear as a single line of text (no returns / line feeds) and combine all the emails of a given class (spam or ham) into a single input file for that class. Since we will use this process twice we will define it as a separate function and run it in a separate script, as we did above (hamster.py).

def makeDataFileFromEmails( dir_path, out_file_path ):
"""
Expand All @@ -72,36 +95,37 @@ The textFile method will make an RDD from a text file. It will by default treat
text = in_file.read().replace( '\n',' ' ).replace( '\r', ' ' )
text = text + "\n"
# Write each email out to a single file
out_file.write( text )
out_file.write( text )

With larger data sets we would want to load the data in to a database and then have Spark interact with the database. SparkSQL is a good way to interact with databases in Spark, which you can read about [here](https://spark.apache.org/docs/1.1.0/sql-programming-guide.html#other-sql-interfaces). NoSQL databases can be interfaced with through configuration of the RDD using the conf property. [Here](https://github.com/apache/spark/blob/master/examples/src/main/python/cassandra_inputformat.py) is an example

### Loading Data in Spark

Now that we have the emails consolidated to two single data sources (spam and ham),
we can load the data into Spark RDDs. Once our data is in an RDD we can work on it.
we can load the data into Spark RDDs. Once our data is in an RDD we can work on it.

The simplest way to create an RDD is by loading data in from a text file:

# Read the spam data file created above into an RDD
spam = sc.textFile( "spam.txt" )

# Read the ham data file created above into an RDD
ham = sc.textFile( "ham.txt" )

There are alternative methods by which we might load the separate data files into an RDD in a way that would not require using the function we wrote. Consider the these two functions.

myRDD = sc.parallelize( some_data_structure )
myRDD = sc.parallelize( some_data_structure )
[parallelize reference](https://spark.apache.org/docs/1.0.2/api/python/pyspark.context.SparkContext-class.html#parallelize)

myRDD = sc.wholeTextFiles( some_directory )
myRDD = sc.wholeTextFiles( some_directory )
[wholeTextFiles reference](https://spark.apache.org/docs/1.0.2/api/python/pyspark.context.SparkContext-class.html#wholeTextFiles)

**Be careful** as you change how you load the data into an RDD it may also require changes to how you process the RDD later.


### Feature Generation Techniques in MLLib

Now that our raw data is in an RDD we must generate features from the data to use for the learning algorithm. In this case the raw data is not easily broken into features. The simplest thing to do is vectorize the text using an MLLib class specifically for this, [HashingTF](https://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.mllib.feature.HashingTF).
Now that our raw data is in an RDD we must generate features from the data to use for the learning algorithm. In this case the raw data is not easily broken into features. The simplest thing to do is vectorize the text using an MLLib class specifically for this, [HashingTF](https://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.mllib.feature.HashingTF).

# Create a HashingTF instance to map email text to vectors of 10,000 features.
tf = HashingTF(numFeatures = 10000)
Expand All @@ -121,24 +145,24 @@ We now need to pair our data points with labels (spam or ham) encoded at 1 for s
# Create LabeledPoint datasets for positive (spam) and negative (ham) data points.
positiveExamples = spamFeatures.map(lambda features: LabeledPoint(1, features))
negativeExamples = hamFeatures.map(lambda features: LabeledPoint(0, features))

### Build Training and Testing Datasets

Now that our data points are labeled we can combine the spam and ham datasets so that we can then re-split them into training and testing datasets that contain both types. Spark provide a [randomSplit](https://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=randomsplit#pyspark.RDD.randomSplit) function to split datasets into randomly selected groups of the original RDDs.

# Combine positive and negative datasets into one RDD
data = positiveExamples.union(negativeExamples)

# Split the data into two RDDs. 70% for training and 30% test data sets
# Split the data into two RDDs. 70% for training and 30% test data sets
( trainingData, testData ) = data.randomSplit( [0.7, 0.3] )

## Part 3: Training the Learning Algorithm

From a programming perspective this is the simplest step. However, in data mining one of the most difficult issues is picking a good learning algorithm for the problem you are trying to solve. Here we use logistic regression.

# Train the model with the SGD Logistic Regression algorithm.
model = LogisticRegressionWithSGD.train(trainingData)

## Part 4: Testing the Learning Algorithm

Now we test the model first by making a new RDD of tuples containing the actual label and the predicted label. Remember that testData is an RDD of LabeledPoint objects, we can use a lambda function to create these tuples by extracting the label for each email and then the then using the model to predict on the features extracted from the labeled point. The RDDs map function applies this lambda function to each element on the testData RDD.
Expand All @@ -150,8 +174,8 @@ Next we need to count up the number of times the model made an incorrect predict
error_rate = labels_and_predictions.filter( lambda (val, pred): val != pred ).count() / float(testData.count() )

print( "Error Rate: " + str( error_rate ) )
There are additional metrics available in MLLib if you are using Scala or Java, [work is being done](https://github.com/apache/spark/blob/master/python/pyspark/mllib/evaluation.py) to make these metrics available in PySpark as well.

There are additional metrics available in MLLib if you are using Scala or Java, [work is being done](https://github.com/apache/spark/blob/master/python/pyspark/mllib/evaluation.py) to make these metrics available in PySpark as well.

**EXERCISE:** There are other learning algorithms provided by MLLib that we could use. Look through the MLLib documentation for support vector machines (SVMs) [here](http://spark.apache.org/docs/1.0.1/api/python/pyspark.mllib.classification.SVMWithSGD-class.html) and change the code to use an SVM instead of logistic regression. Compare the error rates of the two different algorithms.

Expand All @@ -161,7 +185,7 @@ Now that we have trained a model with an acceptable error rate, we need to save

# Serialize the model for presistance
pickle.dump( model, open( "spamFilter.pkl", "wb" ))

There is [work being done](https://issues.apache.org/jira/browse/SPARK-1406) to add support in MLLib for [PMML](http://en.wikipedia.org/wiki/Predictive_Model_Markup_Language), which is a serialization format specifically for statistical and machine learning models.

## Addendum: Original Code
Expand All @@ -173,50 +197,24 @@ Note the import statements at the beginning and the call to stop on the SparkCon
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark import SparkContext
import os
import pickle


def makeDataFileFromEmails( dir_path, out_file_path ):
"""
Iterate over files converting them to a single line
then write the set of files to a single output file
"""

with open( out_file_path, 'w' ) as out_file:

# Iterate over the files in directory 'path'
for file_name in os.listdir( dir_path ):

# Open each file for reading
with open( dir_path + file_name ) as in_file:

# Reformat emails as a single line of text
text = in_file.read().replace( '\n',' ' ).replace( '\r', ' ' )
text = text + "\n"
# Write each email out to a single file
out_file.write( text )
import pickle


def main():
"""
Driver program for a spam filter using Spark and MLLib
"""

# Consolidate the individual email files into a single spam file
# and a single ham file
makeDataFileFromEmails( "data/spam_2/", "data/spam.txt")
makeDataFileFromEmails( "data/easy_ham_2/", "data/ham.txt" )

# Create the Spark Context for parallel processing
sc = SparkContext( appName="Spam Filter")

# Load the spam and ham data files into RDDs
spam = sc.textFile( "data/spam.txt" )
ham = sc.textFile( "data/ham.txt" )

# Create a HashingTF instance to map email text to vectors of 10,000 features.
tf = HashingTF(numFeatures = 10000)

# Each email is split into words, and each word is mapped to one feature.
spamFeatures = spam.map(lambda email: tf.transform(email.split(" ")))
hamFeatures = ham.map(lambda email: tf.transform(email.split(" ")))
Expand All @@ -228,11 +226,11 @@ Note the import statements at the beginning and the call to stop on the SparkCon
# Combine positive and negative datasets into one
data = positiveExamples.union(negativeExamples)

# Split the data into 70% for training and 30% test data sets
# Split the data into 70% for training and 30% test data sets
( trainingData, testData ) = data.randomSplit( [0.7, 0.3] )

# Cache the training data to optmize the Logistic Regression
trainingData.cache()
trainingData.cache()

# Train the model with Logistic Regression using the SGD algorithm.
model = LogisticRegressionWithSGD.train(trainingData)
Expand All @@ -242,19 +240,23 @@ Note the import statements at the beginning and the call to stop on the SparkCon

# Calculate the error rate as number wrong / total number
error_rate = labels_and_predictions.filter( lambda (val, pred): val != pred ).count() / float(testData.count() )
print( "Error Rate: " + str( error_rate ) )

# Serialize the model for presistance
pickle.dump( model, open( "spamFilter.pkl", "wb" ))

# Stop the SparkContext execution
# End the Spark Context
sc.stop()

# Print out the error rate
print( "*********** SPAM FILTER RESULTS **********" )
print( "\n" )
print( "Error Rate: " + str( error_rate ) )
print( "\n" )

# Serialize the model for presistance
pickle.dump( model, open( "spamFilter.pkl", "wb" ) )

if __name__ == "__main__":
main()






35 changes: 35 additions & 0 deletions week6/labs/Spam/hamster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import os

def makeDataFileFromEmails( dir_path, out_file_path ):
"""
Iterate over files converting them to a single line
then write the set of files to a single output file
"""

with open( out_file_path, 'w' ) as out_file:

# Iterate over the files in directory 'path'
for file_name in os.listdir( dir_path ):

# Open each file for reading
with open( dir_path + file_name ) as in_file:

# Reformat emails as a single line of text
text = in_file.read().replace( '\n',' ' ).replace( '\r', ' ' )
text = text + "\n"
# Write each email out to a single file
out_file.write( text )


def main():
"""
Driver program for a spam filter using Spark and MLLib
"""

# Consolidate the individual email files into a single spam file
# and a single ham file
makeDataFileFromEmails( "data/spam_2/", "data/spam.txt")
makeDataFileFromEmails( "data/easy_ham_2/", "data/ham.txt" )

if __name__ == "__main__":
main()
32 changes: 5 additions & 27 deletions week6/labs/Spam/spamFilter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,11 @@
import pickle


def makeDataFileFromEmails( dir_path, out_file_path ):
"""
Iterate over files converting them to a single line
then write the set of files to a single output file
"""

with open( out_file_path, 'w' ) as out_file:

# Iterate over the files in directory 'path'
for file_name in os.listdir( dir_path ):

# Open each file for reading
with open( dir_path + file_name ) as in_file:

# Reformat emails as a single line of text
text = in_file.read().replace( '\n',' ' ).replace( '\r', ' ' )
text = text + "\n"
# Write each email out to a single file
out_file.write( text )


def main():
"""
Driver program for a spam filter using Spark and MLLib
"""

# Consolidate the individual email files into a single spam file
# and a single ham file
makeDataFileFromEmails( "data/spam_2/", "data/spam.txt")
makeDataFileFromEmails( "data/easy_ham_2/", "data/ham.txt" )

# Create the Spark Context for parallel processing
sc = SparkContext( appName="Spam Filter")

Expand Down Expand Up @@ -72,6 +46,11 @@ def main():

# Calculate the error rate as number wrong / total number
error_rate = labels_and_predictions.filter( lambda (val, pred): val != pred ).count() / float(testData.count() )

# End the Spark Context
sc.stop()

# Print out the error rate
print( "*********** SPAM FILTER RESULTS **********" )
print( "\n" )
print( "Error Rate: " + str( error_rate ) )
Expand All @@ -80,7 +59,6 @@ def main():
# Serialize the model for presistance
pickle.dump( model, open( "spamFilter.pkl", "wb" ) )

sc.stop()

if __name__ == "__main__":
main()
Expand Down
Loading