-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathsparkly with new spark.R
136 lines (93 loc) · 3.22 KB
/
sparkly with new spark.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
ls()
rm(list=ls())
gc()
getwd()
setwd("C:/Users/KOGENTIX/Documents")
#install.packages("sparklyr")
library(sparklyr)
sc <- spark_connect(master = "local")
#install.packages(c("nycflights13", "Lahman"))
library(dplyr)
iris_tbl <- copy_to(sc, iris,overwrite = TRUE)
flights_tbl <- copy_to(sc, nycflights13::flights, "flights",overwrite = TRUE)
batting_tbl <- copy_to(sc, Lahman::Batting, "batting",overwrite = TRUE)
src_tbls(sc)
# filter by departure delay and print the first few records
flights_tbl %>% filter(dep_delay == 2)
library(DBI)
iris_preview <- dbGetQuery(sc, "SELECT * FROM iris LIMIT 10")
iris_preview
# copy mtcars into spark
mtcars_tbl <- copy_to(sc, mtcars,overwrite = TRUE)
# transform our data set, and then partition into 'training', 'test'
partitions <- mtcars_tbl %>%
filter(hp >= 100) %>%
mutate(cyl8 = cyl == 8) %>%
sdf_partition(training = 0.5, test = 0.5, seed = 1099)
# fit a linear model to the training dataset
fit <- partitions$training %>%
ml_linear_regression(response = "mpg", features = c("wt", "cyl"))
fit
summary(fit)
spark_apply(iris_tbl, function(data) {
data[1:4] + rgamma(1,2)
})
kmeans_model <- iris_tbl %>%
select(Petal_Width, Petal_Length) %>%
ml_kmeans(centers = 3)
# print our model fit
print(kmeans_model)
# predict the associated class
predicted <- sdf_predict(kmeans_model, iris_tbl) %>%
collect
table(predicted$Species, predicted$prediction)
pca_model <- tbl(sc, "iris") %>%
select(-Species) %>%
ml_pca()
print(pca_model)
rf_model <- iris_tbl %>%
ml_random_forest(Species ~ Petal_Length + Petal_Width, type = "classification")
rf_predict <- sdf_predict(rf_model, iris_tbl) %>%
ft_string_indexer("Species", "Species_idx") %>%
collect
table(rf_predict$Species_idx, rf_predict$prediction)
partitions <- tbl(sc, "iris") %>%
sdf_partition(training = 0.75, test = 0.25, seed = 1099)
fit <- partitions$training %>%
ml_linear_regression(Petal_Length ~ Petal_Width)
estimate_mse <- function(df){
sdf_predict(fit, df) %>%
mutate(resid = Petal_Length - prediction) %>%
summarize(mse = mean(resid ^ 2)) %>%
collect
}
sapply(partitions, estimate_mse)
#
library(magrittr)
library(sparklyr)
library(dplyr)
library(ggplot2)
#sc <- spark_connect("local", version = "1.6.1")
mtcars_tbl <- copy_to(sc, mtcars, "mtcars", overwrite = TRUE)
# transform our data set, and then partition into 'training', 'test'
partitions <- mtcars_tbl %>%
filter(hp >= 100) %>%
sdf_mutate(cyl8 = ft_bucketizer(cyl, c(0,8,12))) %>%
sdf_partition(training = 0.5, test = 0.5, seed = 888)
# fit a linear mdoel to the training dataset
fit <- partitions$training %>%ml_linear_regression(mpg ~ wt + cyl)
# summarize the model
summary(fit)
#Score the data
pred <- sdf_predict(fit, partitions$test) %>%collect
# Plot the predicted versus actual mpg
ggplot(pred, aes(x = mpg, y = prediction)) +
geom_abline(lty = "dashed", col = "red") +
geom_point() +
theme(plot.title = element_text(hjust = 0.5)) +
coord_fixed(ratio = 1) +
labs(
x = "Actual Fuel Consumption",
y = "Predicted Fuel Consumption",
title = "Predicted vs. Actual Fuel Consumption"
)