Skip to content

Commit

Permalink
Update luigi metabase project files.
Browse files Browse the repository at this point in the history
  • Loading branch information
javainthinking committed Sep 3, 2021
1 parent 24a277a commit b25ab6e
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 50 deletions.
16 changes: 15 additions & 1 deletion luigi_metabase/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,27 @@ Password: N1cetest
Business Owner user: [email protected]
Password: N1cetest

```
cd ~/adbpg_client_package/bin
./psql -hpgm-3nsa364dun8rza5k168190.pg.rds.aliyuncs.com -p1921 -Udemo sales_dw
\i ~/opensource_with_apsaradb/luigi_metabase/sales_dw_ddl.sql
```

Execute the command to start luigi daemon. Once it is up and running, navigate to ``http://<ECS_EIP>:8082/``

Execute the command to start luigi daemon.

```
luigid
```

Once it is up and running, navigate to ``http://<ECS_EIP>:8082/``

```
cd ~/opensource_with_apsaradb/luigi_metabase
PYTHONPATH='.' luigi --module data_pipeline CompleteDataDumpLoad --date 2018-03-30
```

```
cd ~/adbpg_client_package/bin
./psql -hpgm-3nsa364dun8rza5k168190.pg.rds.aliyuncs.com -p1921 -Udemo sales_dw
Expand Down
98 changes: 49 additions & 49 deletions luigi_metabase/data_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def run(self):
countries[country.name] = country.alpha_2

data['Country_Code'] = data.country.map(countries)
data.rename(columns={"customerid": "Customer_ID", "country": "Country_Name"}, inplace=True)
data.rename(columns={"customerid": "customer_id", "country": "country_name"}, inplace=True)

data.to_csv(self.input()[0].path + "_processed", encoding="utf-8", header=False, index=None)

Expand Down Expand Up @@ -118,8 +118,8 @@ def run(self):
"""
data = pandas.read_csv(self.input()[1].path)
data.invoicedate = pandas.to_datetime(data.invoicedate)
data.rename(columns={"invoiceno": "Invoice_No", "stockcode": "Stock_Code",
"invoicedate": "Invoice_Date", "customerid": "Customer_Id"}, inplace=True)
data.rename(columns={"invoiceno": "invoice_no", "stockcode": "stock_code",
"invoicedate": "invoice_date", "customerid": "customer_id"}, inplace=True)

data.to_csv(self.input()[1].path + "_processed", encoding="utf-8", header=False, index=None)

Expand Down Expand Up @@ -159,16 +159,16 @@ def run(self):

data.invoicedate = pandas.to_datetime(data.invoicedate)
data_splitted = pandas.DataFrame({
"Invoice_Date": data.invoicedate,
"DayOfWeek": data.invoicedate.dt.dayofweek,
"Year": data.invoicedate.dt.year,
"Month": data.invoicedate.dt.month,
"Day": data.invoicedate.dt.day,
"Hour": data.invoicedate.dt.hour,
"Minute": data.invoicedate.dt.minute,
"DayOfYear": data.invoicedate.dt.dayofyear,
"Week": data.invoicedate.dt.week,
"Quarter": data.invoicedate.dt.quarter
"invoice_date": data.invoicedate,
"dayofweek": data.invoicedate.dt.dayofweek,
"year": data.invoicedate.dt.year,
"month": data.invoicedate.dt.month,
"day": data.invoicedate.dt.day,
"hour": data.invoicedate.dt.hour,
"minute": data.invoicedate.dt.minute,
"dayofyear": data.invoicedate.dt.dayofyear,
"week": data.invoicedate.dt.week,
"quarter": data.invoicedate.dt.quarter
})

data_splitted.to_csv(self.input()[1].path + "_time", sep="\t", encoding="utf-8", header=False, index=None)
Expand Down Expand Up @@ -205,7 +205,7 @@ def run(self):

data.description.replace('\s+', ' ', regex=True, inplace=True)
data.description = data.description.str.strip()
data.rename(columns={"stockcode": "Stock_Code", "unitprice": "Unit_Price"}, inplace=True)
data.rename(columns={"stockcode": "stock_code", "unitprice": "unit_price"}, inplace=True)

data.to_csv(self.input()[2].path + "_processed", sep="\t", encoding="utf-8", header=False, index=None)

Expand Down Expand Up @@ -233,9 +233,9 @@ class CustomerInfoLoading(luigi.contrib.postgres.CopyToTable):
table = Config.customer_info_table
column_separator = ","

columns = [("Customer_ID", "INT"),
("Country", "TEXT"),
("Country_Code", "TEXT")]
columns = [("customer_id", "INT"),
("country", "TEXT"),
("country_code", "TEXT")]

def requires(self):
"""
Expand All @@ -258,11 +258,11 @@ class InvoiceLoading(luigi.contrib.postgres.CopyToTable):
table = Config.invoice_table
column_separator = ","

columns = [("Invoice_No", "TEXT"),
("Stock_Code", "TEXT"),
("Quantity", "INT"),
("Invoice_Date", "TEXT"),
("Customer_Id", "INT")]
columns = [("invoice_no", "TEXT"),
("stock_code", "TEXT"),
("quantity", "INT"),
("invoice_date", "TEXT"),
("customer_id", "INT")]

def requires(self):
"""
Expand All @@ -285,16 +285,16 @@ class InvoiceTimeLoading(luigi.contrib.postgres.CopyToTable):
table = Config.invoice_time_table
column_separator = "\t"

columns = [("Day", "INT"),
("DayOfWeek", "INT"),
("DayOfYear", "INT"),
("Hour", "INT"),
("Invoice_Date", "TEXT"),
("Minute", "INT"),
("Month", "TEXT"),
("Quarter", "INT"),
("WeekOfYear", "INT"),
("Year", "INT")]
columns = [("day", "INT"),
("dayofweek", "INT"),
("dayofyear", "INT"),
("hour", "INT"),
("invoice_date", "TEXT"),
("minute", "INT"),
("month", "TEXT"),
("quarter", "INT"),
("weekofyear", "INT"),
("year", "INT")]

def requires(self):
"""
Expand All @@ -317,9 +317,9 @@ class ProductInfoLoading(luigi.contrib.postgres.CopyToTable):
table = Config.product_info_table
column_separator = "\t"

columns = [("Stock_Code", "TEXT"),
("Description", "TEXT"),
("Unit_Price", "FLOAT")]
columns = [("stock_code", "TEXT"),
("description", "TEXT"),
("unit_price", "FLOAT")]

def requires(self):
"""
Expand Down Expand Up @@ -455,15 +455,15 @@ class AssociationRulesLoading(luigi.contrib.postgres.CopyToTable):
table = Config.association_rules_table
column_separator = ","

columns = [("Antecedants", "TEXT"),
("Consequents", "TEXT"),
("Antecedent_Support", "FLOAT"),
("Consequent_Support", "FLOAT"),
("Support", "FLOAT"),
("Confidence", "FLOAT"),
("Lift", "FLOAT"),
("Leverage", "FLOAT"),
("Conviction", "FLOAT")]
columns = [("antecedants", "TEXT"),
("consequents", "TEXT"),
("antecedent_support", "FLOAT"),
("consequent_support", "FLOAT"),
("support", "FLOAT"),
("confidence", "FLOAT"),
("lift", "FLOAT"),
("leverage", "FLOAT"),
("conviction", "FLOAT")]

def requires(self):
"""
Expand All @@ -486,11 +486,11 @@ class OutliersLoading(luigi.contrib.postgres.CopyToTable):
table = Config.outliers_table
column_separator = ","

columns = [("Invoice_No", "TEXT"),
("Stock_Code", "TEXT"),
("Quantity", "INT"),
("Invoice_Date", "TEXT"),
("Customer_Id", "INT")]
columns = [("invoice_no", "TEXT"),
("stock_code", "TEXT"),
("quantity", "INT"),
("invoice_date", "TEXT"),
("customer_id", "INT")]

def requires(self):
"""
Expand Down

0 comments on commit b25ab6e

Please sign in to comment.