Skip to content
This repository was archived by the owner on Jul 26, 2019. It is now read-only.

Wc.go hadoop prep #2

Merged
merged 9 commits into from
Jul 25, 2016
Binary file removed GoWorkspace/bin/facebookanalysis
Binary file not shown.
723 changes: 0 additions & 723 deletions GoWorkspace/pkg/linux_amd64/github.com/ajstarks/svgo.a

This file was deleted.

Binary file not shown.
260 changes: 0 additions & 260 deletions GoWorkspace/pkg/linux_amd64/github.com/golang/freetype.a

This file was deleted.

542 changes: 0 additions & 542 deletions GoWorkspace/pkg/linux_amd64/github.com/golang/freetype/raster.a

This file was deleted.

950 changes: 0 additions & 950 deletions GoWorkspace/pkg/linux_amd64/github.com/golang/freetype/truetype.a

This file was deleted.

642 changes: 0 additions & 642 deletions GoWorkspace/pkg/linux_amd64/github.com/llgcode/draw2d.a

This file was deleted.

522 changes: 0 additions & 522 deletions GoWorkspace/pkg/linux_amd64/github.com/llgcode/draw2d/draw2dbase.a

This file was deleted.

497 changes: 0 additions & 497 deletions GoWorkspace/pkg/linux_amd64/github.com/llgcode/draw2d/draw2dimg.a

This file was deleted.

1,698 changes: 0 additions & 1,698 deletions GoWorkspace/pkg/linux_amd64/github.com/vdobler/chart.a

This file was deleted.

758 changes: 0 additions & 758 deletions GoWorkspace/pkg/linux_amd64/github.com/vdobler/chart/imgg.a

This file was deleted.

494 changes: 0 additions & 494 deletions GoWorkspace/pkg/linux_amd64/github.com/vdobler/chart/svgg.a

This file was deleted.

355 changes: 0 additions & 355 deletions GoWorkspace/pkg/linux_amd64/github.com/vdobler/chart/txtg.a

This file was deleted.

1,800 changes: 0 additions & 1,800 deletions GoWorkspace/pkg/linux_amd64/golang.org/x/image/draw.a

This file was deleted.

163 changes: 0 additions & 163 deletions GoWorkspace/pkg/linux_amd64/golang.org/x/image/font.a

This file was deleted.

Binary file not shown.
Binary file not shown.
472 changes: 0 additions & 472 deletions GoWorkspace/pkg/linux_amd64/graphbuilder.a

This file was deleted.

463 changes: 0 additions & 463 deletions GoWorkspace/pkg/linux_amd64/mapreduce.a

This file was deleted.

10 changes: 2 additions & 8 deletions GoWorkspace/src/facebookanalysis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,16 @@ This project....
│   └── wc.go
├── graphbuilder
│   └── graphbuilder.go
└── mapreduce
├── common.go
├── mapreduce.go
├── master.go
├── test_test.go
└── worker.go
```


## Useage:

1. Preprocess your facebook data to create input data file; see ```fbparser/README```

2. Use MapReduce to create prefix trees of all your conversations; One file will be produced per person. Run ```go run wc.go master <PARSED_INPUT_FILE.txt> sequential```
2. Use MapReduce to create prefix trees of all your conversations; One file will be produced per person. Run ```cat input.txt | go run wc.go map | sort | go run wc.go reduce | sort > output.txt```

3. Generate a stacked bar graph via: ```go run wc.go graph stack-hist mrtmp.<PARSED_INPUT_FILE.txt> <Number of friends to evaluate>```
3. Generate a stacked bar graph via: ```go run wc.go graph stack-hist output.txt 3```

* Note: E.g., 'Number of friends' to evalutate is usually 3

Expand Down
5 changes: 5 additions & 0 deletions GoWorkspace/src/facebookanalysis/deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash

go get ./
go build wc.go
aws s3 cp wc s3://facebook-analysis/bin/wc
7 changes: 5 additions & 2 deletions GoWorkspace/src/facebookanalysis/fbparser/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# Preprocessing with fbparser.py

To preprocess data:
```pip install -r requirements.txt```
Installation:
```
./install_deps.sh
pip install -r requirements.txt
```

# assuming that messages.htm is in the current directory...
```python fbparser.py > input_data.txt```
Expand Down
2 changes: 2 additions & 0 deletions GoWorkspace/src/facebookanalysis/fbparser/install_deps.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/bin/bash
sudo apt-get install python-dev libxml2-dev libxslt1-dev zlib1g-dev
2 changes: 2 additions & 0 deletions GoWorkspace/src/facebookanalysis/start_cluster.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/bin/bash
aws emr create-cluster --applications Name=Hadoop --ec2-attributes '{"InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"subnet-460d951f","EmrManagedSlaveSecurityGroup":"sg-574b8031","EmrManagedMasterSecurityGroup":"sg-564b8030"}' --service-role EMR_DefaultRole --enable-debugging --release-label emr-4.7.2 --log-uri 's3n://aws-logs-744984309673-us-west-2/elasticmapreduce/' --steps '[{"Args":["hadoop-streaming","-files","s3://facebook-analysis/bin/wc,s3://facebook-analysis/bin/wc","-mapper","wc map","-reducer","wc reduce","-input","s3://facebook-analysis/input/input.txt","-output","s3://facebook-analysis/output_'$1'"],"Type":"CUSTOM_JAR","ActionOnFailure":"TERMINATE_CLUSTER","Jar":"command-runner.jar","Properties":"","Name":"Streaming program"}]' --name 'JordanWCGO' --instance-groups '[{"InstanceCount":1,"InstanceGroupType":"MASTER","InstanceType":"m3.xlarge","Name":"Master Instance Group"},{"InstanceCount":2,"InstanceGroupType":"CORE","InstanceType":"m3.xlarge","Name":"Core Instance Group"}]' --auto-terminate --region us-west-2
135 changes: 93 additions & 42 deletions GoWorkspace/src/facebookanalysis/wc.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
package main

import "os"
import "io/ioutil"
import "fmt"
import "mapreduce"
import "graphbuilder"
import "container/list"
import "regexp"
import "strings"
import "strconv"
import "log"
import "encoding/json"
import "github.com/fvbock/trie"
import "github.com/jordan-heemskerk/trie"

var pattern *regexp.Regexp

type KeyValue struct {
Key string
Value string
}

type NameTreePair struct {
Total int
PrefixTree string
}

func Map(value string) *list.List {
func Map(value string) {

fs := strings.Split(value,"\n")

var l = new(list.List)

for _, v := range fs {

// Slice with: [wholematch, "Name", "DoW", "HH", "MM"]
Expand Down Expand Up @@ -56,22 +59,16 @@ func Map(value string) *list.List {
// build value
v := sHH + MM

l.PushBack(mapreduce.KeyValue{Key: name, Value: v})
fmt.Printf("%s\t%s\n", name, v)

} else {
fmt.Printf("Empty line; Unable to parse a line of the file: " + v)
}
}
/* for k, v := range counts {
l.PushBack(mapreduce.KeyValue{Key: k, Value: strconv.Itoa(v)})
}
*/

return l
}

func Reduce(key string, values *list.List) string {
// Create a new key result trie
func Reduce(key string, values *list.List) {

// Create a new key result trie
t := trie.NewTrie()
total := 0

Expand All @@ -81,45 +78,99 @@ func Reduce(key string, values *list.List) string {
t.Add(e.Value.(string))
}

// Key value pair to output; total number of messages and string dump of tree
// The vdobler trie library does not support direct encoding/decoding of prefix trees
// as such, we can't just store a dump of the tree in the struct. We write to a file to be read later
treeFile := "00-" + key + "-Trie"
t.DumpToFile(treeFile)
ntp := NameTreePair{Total: total, PrefixTree: treeFile}

b64_trie, _ := t.ToBase64String()
ntp := NameTreePair{Total: total, PrefixTree: b64_trie}

encoded, err := json.Marshal(ntp)

if err != nil {
fmt.Println("Error: ", err)
log.Fatal(err)
}

kv := KeyValue{Key: key, Value: string(encoded)}

kv_encoded, err := json.Marshal(kv)

if err != nil {
log.Fatal(err)
}

fmt.Printf("%s\n", string(kv_encoded))
}

func MapReduce(operation string) {

// TODO: buffer this
bytes, err := ioutil.ReadAll(os.Stdin)

if err != nil {
log.Fatal(err)
}

input := string(bytes)

if (operation == "map") {

Map(input)

}

if (operation == "reduce") {

fs := strings.Split(input,"\n")

vals := list.New()

current_key := strings.Split(fs[0],"\t")[0]


// Hadoop will sort the output from map phase
// by key first, making this possible
for _, v := range fs {

if len(v) == 0 {
continue
}

split := strings.Split(v, "\t")
k := split[0]
v := split[1]

vals.PushBack(v)

if (k != current_key) {

// Call our old reduce function
Reduce(current_key, vals)
current_key = k
vals = list.New()

}

}

}

return string(encoded)

}

// Mapper can be run in 3 ways:
// 1) Sequential (e.g., go run wc.go master x.txt sequential)
// 2) Master (e.g., go run wc.go master x.txt localhost:7777)
// 3) Worker (e.g., go run wc.go worker localhost:7777 localhost:7778 &)
// Graphbuilder can be built using the base input file name via:
// 1) go run wc.go graph stack-hist x.txt 3
/**
* This program can be run in two ways:
* 1) MapReduce
* a) go run wc.go map
* b) go run wc.go reduce
*
* 2) Graphbuilder can be built using the base input file name via:
* go run wc.go graph stack-hist x.txt 3
*
*/
func main() {
pattern = regexp.MustCompile("(.*);([\\w]*),[\\w]*,[\\w]*,[\\w]*,(\\d*):(\\d*)")

switch len(os.Args) {
case 4:
// Original implementation
if os.Args[1] == "master" {
if os.Args[3] == "sequential" {
mapreduce.RunSingle(5, 3, os.Args[2], Map, Reduce)
} else {
mr := mapreduce.MakeMapReduce(5, 3, os.Args[2], os.Args[3])
// Wait until MR is done
<-mr.DoneChannel
}
} else {
mapreduce.RunWorker(os.Args[2], os.Args[3], Map, Reduce, 100)
}
case 2:
MapReduce(os.Args[1])
case 5:
// Graph building
if os.Args[2] == "stack-hist" {
Expand Down
1 change: 0 additions & 1 deletion GoWorkspace/src/github.com/ajstarks/svgo
Submodule svgo deleted from 672fe5
1 change: 0 additions & 1 deletion GoWorkspace/src/github.com/fvbock/trie
Submodule trie deleted from 1d4023
1 change: 0 additions & 1 deletion GoWorkspace/src/github.com/golang/freetype
Submodule freetype deleted from 38b4c3
1 change: 0 additions & 1 deletion GoWorkspace/src/github.com/llgcode/draw2d
Submodule draw2d deleted from 13548b
1 change: 0 additions & 1 deletion GoWorkspace/src/github.com/vdobler/chart
Submodule chart deleted from 293b01
1 change: 0 additions & 1 deletion GoWorkspace/src/golang.org/x/image
Submodule image deleted from c34825
72 changes: 33 additions & 39 deletions GoWorkspace/src/graphbuilder/graphbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"strconv"
"strings"

"github.com/fvbock/trie"
"github.com/jordan-heemskerk/trie"
"github.com/ajstarks/svgo"
"github.com/vdobler/chart"
"github.com/vdobler/chart/imgg"
Expand Down Expand Up @@ -190,7 +190,7 @@ func BakePie(fileName string) {
}

func TallyHourlyMessages(item ParsedPair) []float64{
itemTrie, _ := trie.LoadFromFile(item.Trie.PrefixTree)
itemTrie, _ := trie.FromBase64String(item.Trie.PrefixTree)

var hourList []float64
for h := 0; h < 24; h++ {
Expand Down Expand Up @@ -238,43 +238,37 @@ func BuildGraph(baseFileName string, nFiles string, graphType string) {

var firstPair, secondPair, thirdPair ParsedPair

NFiles, _ := strconv.Atoi(nFiles)

for i:=0; i < NFiles; i++ {
n := strconv.Itoa(i)

fileName := baseFileName + "-res-" + n

// open and read results file
file, _ := os.Open(fileName)
defer file.Close()

scanner := bufio.NewScanner(file)
var line string

for scanner.Scan() {
line = scanner.Text()

var curr ParsedPair
var fp FilePair
var ntp NameTreePair

// read line: {Key, Value} pair
_ = json.Unmarshal([]byte(line), &fp)
curr.Key = fp.Key

// read value's name tree pair
_ = json.Unmarshal([]byte(fp.Value), &ntp)
curr.Trie = ntp

// compare and determine if current trie is within (ordered) top 3
if curr.Trie.Total > firstPair.Trie.Total {
firstPair = curr
} else if curr.Trie.Total > secondPair.Trie.Total {
secondPair = curr
} else if curr.Trie.Total > thirdPair.Trie.Total {
thirdPair = curr
}
fileName := baseFileName

// open and read results file
file, _ := os.Open(fileName)
defer file.Close()

scanner := bufio.NewScanner(file)
var line string

for scanner.Scan() {
line = scanner.Text()

var curr ParsedPair
var fp FilePair
var ntp NameTreePair

// read line: {Key, Value} pair
_ = json.Unmarshal([]byte(line), &fp)
curr.Key = fp.Key

// read value's name tree pair
_ = json.Unmarshal([]byte(fp.Value), &ntp)
curr.Trie = ntp

// compare and determine if current trie is within (ordered) top 3
if curr.Trie.Total > firstPair.Trie.Total {
firstPair = curr
} else if curr.Trie.Total > secondPair.Trie.Total {
secondPair = curr
} else if curr.Trie.Total > thirdPair.Trie.Total {
thirdPair = curr
}
}

Expand Down
Loading