Skip to content
This repository was archived by the owner on Jul 26, 2019. It is now read-only.

Commit 284b516

Browse files
authored
Merge pull request #2 from jordan-heemskerk/wc.go_hadoop_prep
Wc.go hadoop prep (whoop).
2 parents 4f59cf2 + b519a3f commit 284b516

File tree

37 files changed

+142
-11413
lines changed

37 files changed

+142
-11413
lines changed

GoWorkspace/bin/facebookanalysis

-12.6 MB
Binary file not shown.

GoWorkspace/pkg/linux_amd64/github.com/ajstarks/svgo.a

-723
This file was deleted.
Binary file not shown.

GoWorkspace/pkg/linux_amd64/github.com/golang/freetype.a

-260
This file was deleted.

GoWorkspace/pkg/linux_amd64/github.com/golang/freetype/raster.a

-542
This file was deleted.

GoWorkspace/pkg/linux_amd64/github.com/golang/freetype/truetype.a

-950
This file was deleted.

GoWorkspace/pkg/linux_amd64/github.com/llgcode/draw2d.a

-642
This file was deleted.

GoWorkspace/pkg/linux_amd64/github.com/llgcode/draw2d/draw2dbase.a

-522
This file was deleted.

GoWorkspace/pkg/linux_amd64/github.com/llgcode/draw2d/draw2dimg.a

-497
This file was deleted.

GoWorkspace/pkg/linux_amd64/github.com/vdobler/chart.a

-1,698
This file was deleted.

GoWorkspace/pkg/linux_amd64/github.com/vdobler/chart/imgg.a

-758
This file was deleted.

GoWorkspace/pkg/linux_amd64/github.com/vdobler/chart/svgg.a

-494
This file was deleted.

GoWorkspace/pkg/linux_amd64/github.com/vdobler/chart/txtg.a

-355
This file was deleted.

GoWorkspace/pkg/linux_amd64/golang.org/x/image/draw.a

-1,800
This file was deleted.

GoWorkspace/pkg/linux_amd64/golang.org/x/image/font.a

-163
This file was deleted.
Binary file not shown.
Binary file not shown.

GoWorkspace/pkg/linux_amd64/graphbuilder.a

-472
This file was deleted.

GoWorkspace/pkg/linux_amd64/mapreduce.a

-463
This file was deleted.

GoWorkspace/src/facebookanalysis/README.md

+2-8
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,16 @@ This project....
1515
│   └── wc.go
1616
├── graphbuilder
1717
│   └── graphbuilder.go
18-
└── mapreduce
19-
├── common.go
20-
├── mapreduce.go
21-
├── master.go
22-
├── test_test.go
23-
└── worker.go
2418
```
2519

2620

2721
## Useage:
2822

2923
1. Preprocess your facebook data to create input data file; see ```fbparser/README```
3024

31-
2. Use MapReduce to create prefix trees of all your conversations; One file will be produced per person. Run ```go run wc.go master <PARSED_INPUT_FILE.txt> sequential```
25+
2. Use MapReduce to create prefix trees of all your conversations; One file will be produced per person. Run ```cat input.txt | go run wc.go map | sort | go run wc.go reduce | sort > output.txt```
3226

33-
3. Generate a stacked bar graph via: ```go run wc.go graph stack-hist mrtmp.<PARSED_INPUT_FILE.txt> <Number of friends to evaluate>```
27+
3. Generate a stacked bar graph via: ```go run wc.go graph stack-hist output.txt 3```
3428

3529
* Note: E.g., 'Number of friends' to evalutate is usually 3
3630

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#!/bin/bash
2+
3+
go get ./
4+
go build wc.go
5+
aws s3 cp wc s3://facebook-analysis/bin/wc

GoWorkspace/src/facebookanalysis/fbparser/README.md

+5-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
# Preprocessing with fbparser.py
22

3-
To preprocess data:
4-
```pip install -r requirements.txt```
3+
Installation:
4+
```
5+
./install_deps.sh
6+
pip install -r requirements.txt
7+
```
58

69
# assuming that messages.htm is in the current directory...
710
```python fbparser.py > input_data.txt```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
#!/bin/bash
2+
sudo apt-get install python-dev libxml2-dev libxslt1-dev zlib1g-dev
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
#!/bin/bash
2+
aws emr create-cluster --applications Name=Hadoop --ec2-attributes '{"InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"subnet-460d951f","EmrManagedSlaveSecurityGroup":"sg-574b8031","EmrManagedMasterSecurityGroup":"sg-564b8030"}' --service-role EMR_DefaultRole --enable-debugging --release-label emr-4.7.2 --log-uri 's3n://aws-logs-744984309673-us-west-2/elasticmapreduce/' --steps '[{"Args":["hadoop-streaming","-files","s3://facebook-analysis/bin/wc,s3://facebook-analysis/bin/wc","-mapper","wc map","-reducer","wc reduce","-input","s3://facebook-analysis/input/input.txt","-output","s3://facebook-analysis/output_'$1'"],"Type":"CUSTOM_JAR","ActionOnFailure":"TERMINATE_CLUSTER","Jar":"command-runner.jar","Properties":"","Name":"Streaming program"}]' --name 'JordanWCGO' --instance-groups '[{"InstanceCount":1,"InstanceGroupType":"MASTER","InstanceType":"m3.xlarge","Name":"Master Instance Group"},{"InstanceCount":2,"InstanceGroupType":"CORE","InstanceType":"m3.xlarge","Name":"Core Instance Group"}]' --auto-terminate --region us-west-2

GoWorkspace/src/facebookanalysis/wc.go

+93-42
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,33 @@
11
package main
22

33
import "os"
4+
import "io/ioutil"
45
import "fmt"
5-
import "mapreduce"
66
import "graphbuilder"
77
import "container/list"
88
import "regexp"
99
import "strings"
1010
import "strconv"
1111
import "log"
1212
import "encoding/json"
13-
import "github.com/fvbock/trie"
13+
import "github.com/jordan-heemskerk/trie"
1414

1515
var pattern *regexp.Regexp
1616

17+
type KeyValue struct {
18+
Key string
19+
Value string
20+
}
21+
1722
type NameTreePair struct {
1823
Total int
1924
PrefixTree string
2025
}
2126

22-
func Map(value string) *list.List {
27+
func Map(value string) {
2328

2429
fs := strings.Split(value,"\n")
2530

26-
var l = new(list.List)
27-
2831
for _, v := range fs {
2932

3033
// Slice with: [wholematch, "Name", "DoW", "HH", "MM"]
@@ -56,22 +59,16 @@ func Map(value string) *list.List {
5659
// build value
5760
v := sHH + MM
5861

59-
l.PushBack(mapreduce.KeyValue{Key: name, Value: v})
62+
fmt.Printf("%s\t%s\n", name, v)
6063

61-
} else {
62-
fmt.Printf("Empty line; Unable to parse a line of the file: " + v)
6364
}
6465
}
65-
/* for k, v := range counts {
66-
l.PushBack(mapreduce.KeyValue{Key: k, Value: strconv.Itoa(v)})
67-
}
68-
*/
6966

70-
return l
7167
}
7268

73-
func Reduce(key string, values *list.List) string {
74-
// Create a new key result trie
69+
func Reduce(key string, values *list.List) {
70+
71+
// Create a new key result trie
7572
t := trie.NewTrie()
7673
total := 0
7774

@@ -81,45 +78,99 @@ func Reduce(key string, values *list.List) string {
8178
t.Add(e.Value.(string))
8279
}
8380

84-
// Key value pair to output; total number of messages and string dump of tree
85-
// The vdobler trie library does not support direct encoding/decoding of prefix trees
86-
// as such, we can't just store a dump of the tree in the struct. We write to a file to be read later
87-
treeFile := "00-" + key + "-Trie"
88-
t.DumpToFile(treeFile)
89-
ntp := NameTreePair{Total: total, PrefixTree: treeFile}
81+
82+
b64_trie, _ := t.ToBase64String()
83+
ntp := NameTreePair{Total: total, PrefixTree: b64_trie}
9084

9185
encoded, err := json.Marshal(ntp)
9286

9387
if err != nil {
94-
fmt.Println("Error: ", err)
88+
log.Fatal(err)
89+
}
90+
91+
kv := KeyValue{Key: key, Value: string(encoded)}
92+
93+
kv_encoded, err := json.Marshal(kv)
94+
95+
if err != nil {
96+
log.Fatal(err)
97+
}
98+
99+
fmt.Printf("%s\n", string(kv_encoded))
100+
}
101+
102+
func MapReduce(operation string) {
103+
104+
// TODO: buffer this
105+
bytes, err := ioutil.ReadAll(os.Stdin)
106+
107+
if err != nil {
108+
log.Fatal(err)
109+
}
110+
111+
input := string(bytes)
112+
113+
if (operation == "map") {
114+
115+
Map(input)
116+
117+
}
118+
119+
if (operation == "reduce") {
120+
121+
fs := strings.Split(input,"\n")
122+
123+
vals := list.New()
124+
125+
current_key := strings.Split(fs[0],"\t")[0]
126+
127+
128+
// Hadoop will sort the output from map phase
129+
// by key first, making this possible
130+
for _, v := range fs {
131+
132+
if len(v) == 0 {
133+
continue
134+
}
135+
136+
split := strings.Split(v, "\t")
137+
k := split[0]
138+
v := split[1]
139+
140+
vals.PushBack(v)
141+
142+
if (k != current_key) {
143+
144+
// Call our old reduce function
145+
Reduce(current_key, vals)
146+
current_key = k
147+
vals = list.New()
148+
149+
}
150+
151+
}
152+
95153
}
96154

97-
return string(encoded)
155+
98156
}
99157

100-
// Mapper can be run in 3 ways:
101-
// 1) Sequential (e.g., go run wc.go master x.txt sequential)
102-
// 2) Master (e.g., go run wc.go master x.txt localhost:7777)
103-
// 3) Worker (e.g., go run wc.go worker localhost:7777 localhost:7778 &)
104-
// Graphbuilder can be built using the base input file name via:
105-
// 1) go run wc.go graph stack-hist x.txt 3
158+
/**
159+
* This program can be run in two ways:
160+
* 1) MapReduce
161+
* a) go run wc.go map
162+
* b) go run wc.go reduce
163+
*
164+
* 2) Graphbuilder can be built using the base input file name via:
165+
* go run wc.go graph stack-hist x.txt 3
166+
*
167+
*/
106168
func main() {
107169
pattern = regexp.MustCompile("(.*);([\\w]*),[\\w]*,[\\w]*,[\\w]*,(\\d*):(\\d*)")
108170

109171
switch len(os.Args) {
110-
case 4:
111-
// Original implementation
112-
if os.Args[1] == "master" {
113-
if os.Args[3] == "sequential" {
114-
mapreduce.RunSingle(5, 3, os.Args[2], Map, Reduce)
115-
} else {
116-
mr := mapreduce.MakeMapReduce(5, 3, os.Args[2], os.Args[3])
117-
// Wait until MR is done
118-
<-mr.DoneChannel
119-
}
120-
} else {
121-
mapreduce.RunWorker(os.Args[2], os.Args[3], Map, Reduce, 100)
122-
}
172+
case 2:
173+
MapReduce(os.Args[1])
123174
case 5:
124175
// Graph building
125176
if os.Args[2] == "stack-hist" {
-1
This file was deleted.
-1
This file was deleted.
This file was deleted.
This file was deleted.
-1
This file was deleted.

GoWorkspace/src/golang.org/x/image

-1
This file was deleted.

GoWorkspace/src/graphbuilder/graphbuilder.go

+33-39
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
"strconv"
1414
"strings"
1515

16-
"github.com/fvbock/trie"
16+
"github.com/jordan-heemskerk/trie"
1717
"github.com/ajstarks/svgo"
1818
"github.com/vdobler/chart"
1919
"github.com/vdobler/chart/imgg"
@@ -190,7 +190,7 @@ func BakePie(fileName string) {
190190
}
191191

192192
func TallyHourlyMessages(item ParsedPair) []float64{
193-
itemTrie, _ := trie.LoadFromFile(item.Trie.PrefixTree)
193+
itemTrie, _ := trie.FromBase64String(item.Trie.PrefixTree)
194194

195195
var hourList []float64
196196
for h := 0; h < 24; h++ {
@@ -238,43 +238,37 @@ func BuildGraph(baseFileName string, nFiles string, graphType string) {
238238

239239
var firstPair, secondPair, thirdPair ParsedPair
240240

241-
NFiles, _ := strconv.Atoi(nFiles)
242-
243-
for i:=0; i < NFiles; i++ {
244-
n := strconv.Itoa(i)
245-
246-
fileName := baseFileName + "-res-" + n
247-
248-
// open and read results file
249-
file, _ := os.Open(fileName)
250-
defer file.Close()
251-
252-
scanner := bufio.NewScanner(file)
253-
var line string
254-
255-
for scanner.Scan() {
256-
line = scanner.Text()
257-
258-
var curr ParsedPair
259-
var fp FilePair
260-
var ntp NameTreePair
261-
262-
// read line: {Key, Value} pair
263-
_ = json.Unmarshal([]byte(line), &fp)
264-
curr.Key = fp.Key
265-
266-
// read value's name tree pair
267-
_ = json.Unmarshal([]byte(fp.Value), &ntp)
268-
curr.Trie = ntp
269-
270-
// compare and determine if current trie is within (ordered) top 3
271-
if curr.Trie.Total > firstPair.Trie.Total {
272-
firstPair = curr
273-
} else if curr.Trie.Total > secondPair.Trie.Total {
274-
secondPair = curr
275-
} else if curr.Trie.Total > thirdPair.Trie.Total {
276-
thirdPair = curr
277-
}
241+
fileName := baseFileName
242+
243+
// open and read results file
244+
file, _ := os.Open(fileName)
245+
defer file.Close()
246+
247+
scanner := bufio.NewScanner(file)
248+
var line string
249+
250+
for scanner.Scan() {
251+
line = scanner.Text()
252+
253+
var curr ParsedPair
254+
var fp FilePair
255+
var ntp NameTreePair
256+
257+
// read line: {Key, Value} pair
258+
_ = json.Unmarshal([]byte(line), &fp)
259+
curr.Key = fp.Key
260+
261+
// read value's name tree pair
262+
_ = json.Unmarshal([]byte(fp.Value), &ntp)
263+
curr.Trie = ntp
264+
265+
// compare and determine if current trie is within (ordered) top 3
266+
if curr.Trie.Total > firstPair.Trie.Total {
267+
firstPair = curr
268+
} else if curr.Trie.Total > secondPair.Trie.Total {
269+
secondPair = curr
270+
} else if curr.Trie.Total > thirdPair.Trie.Total {
271+
thirdPair = curr
278272
}
279273
}
280274

0 commit comments

Comments
 (0)