-
Notifications
You must be signed in to change notification settings - Fork 0
/
udf_types.py
102 lines (73 loc) · 3.24 KB
/
udf_types.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
from typing import Iterator, Tuple
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.pandas.functions import pandas_udf
import pandas as pd
from common import create_spark_session
@pandas_udf(returnType=T.IntegerType())
def trip_duration(start_time: pd.Series, end_time: pd.Series) -> pd.Series:
print(start_time)
print(end_time)
result = (end_time - start_time).dt.seconds
print(result)
print("!=====================================!")
return result
@pandas_udf(returnType=T.IntegerType())
def trip_duration_alt(start_and_end_times: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for start_times, end_times in start_and_end_times:
print(start_times)
print(end_times)
print("!=====================================!")
yield (end_times - start_times).dt.seconds
def calculate_trip_duration(spark: SparkSession):
trips_data = spark.read.parquet("data/ny_taxi_trips/*.parquet").limit(20).repartition(5)
result_data = trips_data.withColumn(
"duration", trip_duration_alt(F.col("tpep_pickup_datetime"), F.col("tpep_dropoff_datetime"))
)
result_data.show(truncate=False)
@pandas_udf(T.DoubleType())
def average_udf(values: pd.Series) -> float:
return values.mean()
def calculate_avg_trip_duration(spark: SparkSession):
trips_data = spark.read.parquet("data/ny_taxi_trips/*.parquet").limit(20).repartition(5)
result_data = trips_data.withColumn(
"duration", trip_duration_alt(F.col("tpep_pickup_datetime"), F.col("tpep_dropoff_datetime"))
).groupby("VendorID").agg(average_udf(F.col("duration")).alias("avg_duration"))
result_data.show(truncate=False)
@pandas_udf(returnType=T.FloatType())
def tip_percent_of_fare(fares_breakdown: pd.DataFrame) -> pd.Series:
print(fares_breakdown)
result = (fares_breakdown['tip_amount']/fares_breakdown['total_amount'] * 100)
print("!============================================!")
return result
def calculate_tip_percent(spark: SparkSession):
trips_data = spark.read.parquet("data/ny_taxi_trips/*.parquet").limit(20).repartition(5)
result_data = (
trips_data
.withColumn('fares', F.struct('total_amount', 'tip_amount'))
.withColumn("tip_percent", tip_percent_of_fare(F.col("fares")))
)
result_data.show(truncate=False)
@pandas_udf(returnType=T.TimestampType())
def add_5_min(time_iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
num_series = 0
print("!--------------------------------------!")
for item in time_iterator:
num_series += 1
print(num_series)
print(item)
yield item + pd.Timedelta(minutes=5)
def add_5_min_test(spark: SparkSession):
trips_data = spark.read.parquet("data/ny_taxi_trips/*.parquet").limit(20).repartition(5)
result_data = (
trips_data
.withColumn("five_min_after_start_time", add_5_min(F.col("tpep_pickup_datetime")))
)
result_data.show(truncate=False)
if __name__ == '__main__':
spark = create_spark_session("Pandas UDF tutorials")
# calculate_trip_duration(spark=spark)
# calculate_tip_percent(spark=spark)
# add_5_min_test(spark=spark)
calculate_avg_trip_duration(spark=spark)