12
12
from optimus .engines .spark .create import Create
13
13
from optimus .engines .spark .io .jdbc import JDBC
14
14
from optimus .engines .spark .io .load import Load
15
- from optimus .engines .spark .ml .models import ML
16
15
from optimus .engines .spark .spark import Spark
17
16
from optimus .helpers .constants import *
18
17
from optimus .helpers .core import val_to_list
@@ -68,8 +67,6 @@ def __init__(self, session=None, master="local[*]", app_name="optimus", checkpoi
68
67
:type jars: (list[str])
69
68
70
69
"""
71
- self .engine = Engine .SPARK .value
72
- self .client = session
73
70
self .preserve = False
74
71
75
72
if jars is None :
@@ -130,26 +127,41 @@ def __init__(self, session=None, master="local[*]", app_name="optimus", checkpoi
130
127
# logger.print("Spark session")
131
128
Spark .instance = Spark ().load (session )
132
129
130
+ self .client = Spark .instance ._spark
131
+
133
132
# Initialize Spark
134
133
logger .print ("""
135
- ____ __ _
134
+ ____ __ _
136
135
/ __ \____ / /_(_)___ ___ __ _______
137
136
/ / / / __ \/ __/ / __ `__ \/ / / / ___/
138
- / /_/ / /_/ / /_/ / / / / / / /_/ (__ )
139
- \____/ .___/\__/_/_/ /_/ /_/\__,_/____/
140
- /_/
137
+ / /_/ / /_/ / /_/ / / / / / / /_/ (__ )
138
+ \____/ .___/\__/_/_/ /_/ /_/\__,_/____/
139
+ /_/
141
140
""" )
142
-
143
141
logger .print (STARTING_OPTIMUS )
144
-
145
142
logger .print (SUCCESS )
146
143
147
- self .create = Create (self )
144
+ @property
145
+ def F (self ):
146
+ from optimus .engines .spark .functions import SparkFunctions
147
+ return SparkFunctions (self )
148
+
149
+ @property
150
+ def create (self ):
151
+ return Create (self )
148
152
149
153
@property
150
154
def load (self ):
151
155
return Load (self )
152
156
157
+ @property
158
+ def engine (self ):
159
+ return Engine .SPARK .value
160
+
161
+ @property
162
+ def engine_label (self ):
163
+ return EnginePretty .SPARK .value
164
+
153
165
@staticmethod
154
166
def connect (driver = None , host = None , database = None , user = None , password = None , port = None , schema = "public" ,
155
167
oracle_tns = None , oracle_service_name = None , oracle_sid = None , presto_catalog = None ,
@@ -361,7 +373,7 @@ def _create_session(self):
361
373
:return:
362
374
"""
363
375
364
- ## Get python.exe fullpath
376
+ # Get python.exe fullpath
365
377
os .environ ['PYSPARK_PYTHON' ] = sys .executable
366
378
367
379
# Remove duplicated strings
@@ -424,10 +436,10 @@ def compare(df1, df2, method="json"):
424
436
:param method: json or a
425
437
:return:
426
438
"""
427
- if method is "json" :
439
+ if method == "json" :
428
440
diff = DeepDiff (df1 .to_json (), df2 .to_json (), ignore_order = False )
429
441
print (output_json (diff ))
430
- elif method is "collect" :
442
+ elif method == "collect" :
431
443
if df1 .collect () == df2 .collect ():
432
444
print ("Dataframes are equal" )
433
445
return True
@@ -437,12 +449,3 @@ def compare(df1, df2, method="json"):
437
449
438
450
else :
439
451
RaiseIt .type_error (method , ["json" , "collect" ])
440
-
441
- @property
442
- def F (self ):
443
- from optimus .engines .spark .functions import SparkFunctions
444
- return SparkFunctions (self )
445
-
446
- @property
447
- def engine_label (self ):
448
- return EnginePretty .SPARK .value
0 commit comments