diff --git a/airflow/dags/reporting_database_generation/generate-elastic-database.py b/airflow/dags/reporting_database_generation/generate-elastic-database.py index 6bf43202..99f7b797 100644 --- a/airflow/dags/reporting_database_generation/generate-elastic-database.py +++ b/airflow/dags/reporting_database_generation/generate-elastic-database.py @@ -419,6 +419,102 @@ class SQLTemplatedPythonOperator(PythonOperator): } ) +g_brf_sum_delta = SQLTemplatedPythonOperator( + task_id='', + python_callable=validate_query.validate_and_execute, + dag=elastic_prep_dag, + op_kwargs={ + 'filename': '13_01_g_brf_sum_text_increment.sql' + }, + templates_dict={ + 'source_sql': '13_01_g_brf_sum_text_increment.sql' + } +) + +g_claims_delta = SQLTemplatedPythonOperator( + task_id='', + python_callable=validate_query.validate_and_execute, + dag=elastic_prep_dag, + op_kwargs={ + 'filename': '13_02_g_claim_increment.sql' + }, + templates_dict={ + 'source_sql': '13_02_g_claim_increment.sql' + } +) + +g_detail_desc_delta = SQLTemplatedPythonOperator( + task_id='', + python_callable=validate_query.validate_and_execute, + dag=elastic_prep_dag, + op_kwargs={ + 'filename': '13_03_g_detail_desc_text_increment.sql' + }, + templates_dict={ + 'source_sql': '13_03_g_detail_desc_text_increment.sql' + } +) + +g_draw_desc_delta = SQLTemplatedPythonOperator( + task_id='', + python_callable=validate_query.validate_and_execute, + dag=elastic_prep_dag, + op_kwargs={ + 'filename': '13_04_g_draw_desc_text_increment.sql' + }, + templates_dict={ + 'source_sql': '13_04_g_draw_desc_text_increment.sql' + } +) + +pg_brf_sum_delta = SQLTemplatedPythonOperator( + task_id='', + python_callable=validate_query.validate_and_execute, + dag=elastic_prep_dag, + op_kwargs={ + 'filename': '13_05_pg_brf_sum_text_increment.sql' + }, + templates_dict={ + 'source_sql': '13_05_pg_brf_sum_text_increment.sql' + } +) + +pg_claims_delta = SQLTemplatedPythonOperator( + task_id='', + python_callable=validate_query.validate_and_execute, + dag=elastic_prep_dag, + op_kwargs={ + 'filename': '13_06_pg_claim_increment.sql' + }, + templates_dict={ + 'source_sql': '13_06_pg_claim_increment.sql' + } +) + +pg_detail_desc_delta = SQLTemplatedPythonOperator( + task_id='', + python_callable=validate_query.validate_and_execute, + dag=elastic_prep_dag, + op_kwargs={ + 'filename': '13_07_pg_detail_desc_text_increment.sql' + }, + templates_dict={ + 'source_sql': '13_07_pg_detail_desc_text_increment.sql' + } +) + +pg_draw_desc_delta = SQLTemplatedPythonOperator( + task_id='', + python_callable=validate_query.validate_and_execute, + dag=elastic_prep_dag, + op_kwargs={ + 'filename': '13_08_pg_draw_desc_text_increment.sql' + }, + templates_dict={ + 'source_sql': '13_08_pg_draw_desc_text_increment.sql' + } +) + elastic_patent_db_qa = PythonOperator( task_id='elastic_patent_DB_QA', python_callable=run_elastic_db_qa, @@ -468,39 +564,37 @@ class SQLTemplatedPythonOperator(PythonOperator): endpoint_patent_GI_table,fcitation_endpoint_fcitation_table,otherreference_endpoint_otherreference_table, relapptext_endpoint_relapptext_table,patentcitation_endpoint_patentcitation_table,applicationcitation_endpoint_applicationcitation_table] - - -operator_sequence_groups['publications_endpoint'] =[endpoint_publications_assignee,endpoint_publications_inventor,endpoint_publications_cpc, +operator_sequence_groups['publications_endpoint'] = [endpoint_publications_assignee,endpoint_publications_inventor,endpoint_publications_cpc, endpoint_publications_gi,endpoint_publications_us_parties, endpoint_rel_app_text_pgpub] +operator_sequence_groups['g_text_endpoints'] = [g_brf_sum_delta, g_claims_delta, g_detail_desc_delta, g_draw_desc_delta] +operator_sequence_groups['pg_text_endpoints'] = [pg_brf_sum_delta, pg_claims_delta, pg_detail_desc_delta, pg_draw_desc_delta] + for operator in operator_sequence_groups['first_step']: operator.set_upstream(db_creation) + # Set elastic_patent_db_qa upstream to each operator in 'first_step' group + elastic_patent_db_qa.set_upstream(operator) + for operator in operator_sequence_groups['endpoint_patent_steps']: operator.set_upstream(endpoint_patent_patents_table) - + # Set elastic_patent_db_qa upstream to each operator in 'endpoint_patent_steps' group + elastic_patent_db_qa.set_upstream(operator) endpoint_publications_publication_views.set_upstream(endpoint_publications_publication) for operator in operator_sequence_groups['publications_endpoint']: operator.set_upstream(endpoint_publications_publication_views) + # Set elastic_pgpubs_db_qa upstream to each operator in 'publications_endpoint' group + elastic_pgpubs_db_qa.set_upstream(operator) -for operator in operator_sequence_groups['publications_endpoint']: - operator.set_upstream(endpoint_publications_publication_views) - -# Set elastic_patent_db_qa upstream to each operator in 'publications_endpoint' group -for operator in operator_sequence_groups['publications_endpoint']: - elastic_patent_db_qa.set_upstream(operator) - -# Set elastic_patent_db_qa upstream to each operator in 'endpoint_patent_steps' group -for operator in operator_sequence_groups['endpoint_patent_steps']: +for operator in operator_sequence_groups['g_text_endpoints']: + operator.set_upstream(db_creation) elastic_patent_db_qa.set_upstream(operator) -# Set elastic_patent_db_qa upstream to each operator in 'first_step' group -for operator in operator_sequence_groups['first_step']: - elastic_patent_db_qa.set_upstream(operator) +for operator in operator_sequence_groups['pg_text_endpoints']: + operator.set_upstream(db_creation) + elastic_pgpubs_db_qa.set_upstream(operator) -elastic_pgpubs_db_qa.set_upstream(elastic_patent_db_qa) - # elastic tasks with crosswalk dependencies endpoint_patent_views.set_upstream(generate_crosswalk_task) endpoint_publications_publication_views.set_upstream(generate_crosswalk_task)