Skip to content

Commit

Permalink
Merge pull request #281 from coderxio/issue-270
Browse files Browse the repository at this point in the history
all changes necessary to implement rxnorm indexes
  • Loading branch information
jrlegrand authored May 1, 2024
2 parents 9895df8 + 4d99d16 commit 8c67909
Show file tree
Hide file tree
Showing 7 changed files with 375 additions and 2 deletions.
330 changes: 330 additions & 0 deletions airflow/config/airflow.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,330 @@
[core]
dags_folder = /opt/airflow/dags
hostname_callable = airflow.utils.net.getfqdn
default_timezone = utc
executor = LocalExecutor
parallelism = 32
max_active_tasks_per_dag = 16
dags_are_paused_at_creation = True
max_active_runs_per_dag = 16
load_examples = False
plugins_folder = /opt/airflow/plugins
execute_tasks_new_python_interpreter = False
#fernet_key =
donot_pickle = True
dagbag_import_timeout = 30.0
dagbag_import_error_tracebacks = True
dagbag_import_error_traceback_depth = 2
dag_file_processor_timeout = 50
task_runner = StandardTaskRunner
#default_impersonation =
#security =
unit_test_mode = False
enable_xcom_pickling = False
allowed_deserialization_classes = airflow\..*
killed_task_cleanup_time = 60
dag_run_conf_overrides_params = True
dag_discovery_safe_mode = True
dag_ignore_file_syntax = regexp
default_task_retries = 0
default_task_retry_delay = 300
default_task_weight_rule = downstream
#default_task_execution_timeout =
min_serialized_dag_update_interval = 30
compress_serialized_dags = False
min_serialized_dag_fetch_interval = 10
max_num_rendered_ti_fields_per_task = 30
check_slas = True
xcom_backend = airflow.models.xcom.BaseXCom
lazy_load_plugins = True
lazy_discover_providers = True
hide_sensitive_var_conn_fields = True
#sensitive_var_conn_names =
default_pool_task_slot_count = 128
max_map_length = 1024
daemon_umask = 0o077

[database]
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
sql_engine_encoding = utf-8
sql_alchemy_pool_enabled = True
sql_alchemy_pool_size = 5
sql_alchemy_max_overflow = 10
sql_alchemy_pool_recycle = 1800
sql_alchemy_pool_pre_ping = True
#sql_alchemy_schema =
load_default_connections = True
max_db_retries = 3

[logging]
base_log_folder = /opt/airflow/logs
remote_logging = False
#remote_log_conn_id =
#google_key_path =
#remote_base_log_folder =
encrypt_s3_logs = False
logging_level = INFO
#celery_logging_level =
fab_logging_level = WARNING
#logging_config_class =
colored_console_log = True
colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
dag_processor_log_target = file
dag_processor_log_format = [%%(asctime)s] [SOURCE:DAG_PROCESSOR] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
log_formatter_class = airflow.utils.log.timezone_aware.TimezoneAware
#task_log_prefix_template =
log_filename_template = dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{%% if ti.map_index >= 0 %%}map_index={{ ti.map_index }}/{%% endif %%}attempt={{ try_number }}.log
log_processor_filename_template = {{ filename }}.log
dag_processor_manager_log_location = /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log
task_log_reader = task
#extra_logger_names =
worker_log_server_port = 8793

[metrics]
statsd_on = False
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow
#statsd_allow_list =
#stat_name_handler =
statsd_datadog_enabled = False
#statsd_datadog_tags =

[secrets]
#backend =
#backend_kwargs =

[cli]
api_client = airflow.api.client.local_client
endpoint_url = http://localhost:8080

[debug]
fail_fast = False

[api]
enable_experimental_api = False
auth_backends = airflow.api.auth.backend.session
maximum_page_limit = 100
fallback_page_limit = 100
#google_oauth2_audience =
#google_key_path =
#access_control_allow_headers =
#access_control_allow_methods =
#access_control_allow_origins =

[lineage]
#backend =

[atlas]
sasl_enabled = False
#host =
port = 21000
#username =
#password =

[operators]
default_owner = airflow
default_cpus = 1
default_ram = 512
default_disk = 512
default_gpus = 0
default_queue = default
allow_illegal_arguments = False

[hive]
#default_hive_mapred_queue =

[webserver]
base_url = http://localhost:8080
default_ui_timezone = UTC
web_server_host = 0.0.0.0
web_server_port = 8080
#web_server_ssl_cert =
#web_server_ssl_key =
session_backend = database
web_server_master_timeout = 120
web_server_worker_timeout = 120
worker_refresh_batch_size = 1
worker_refresh_interval = 6000
reload_on_plugin_change = False
secret_key = 9laH5A+16Kut0POYw/430w==
workers = 4
worker_class = sync
access_logfile = -
error_logfile = -
#access_logformat =
expose_config = False
expose_hostname = True
expose_stacktrace = False
dag_default_view = grid
dag_orientation = LR
log_fetch_timeout_sec = 5
log_fetch_delay_sec = 2
log_auto_tailing_offset = 30
log_animation_speed = 1000
hide_paused_dags_by_default = False
page_size = 100
navbar_color = #fff
default_dag_run_display_number = 25
enable_proxy_fix = False
proxy_fix_x_for = 1
proxy_fix_x_proto = 1
proxy_fix_x_host = 1
proxy_fix_x_port = 1
proxy_fix_x_prefix = 1
cookie_secure = False
cookie_samesite = Lax
default_wrap = False
x_frame_enabled = True
show_recent_stats_for_completed_runs = True
update_fab_perms = True
session_lifetime_minutes = 43200
instance_name_has_markup = False
auto_refresh_interval = 3
warn_deployment_exposure = True
audit_view_excluded_events = gantt,landing_times,tries,duration,calendar,graph,grid,tree,tree_data

[email]
email_backend = airflow.utils.email.send_email_smtp
email_conn_id = smtp_default
default_email_on_retry = True
default_email_on_failure = True

[smtp]
smtp_host = localhost
smtp_starttls = True
smtp_ssl = False
smtp_port = 25
smtp_mail_from = [email protected]
smtp_timeout = 30
smtp_retry_limit = 5

[sentry]
sentry_on = False
#sentry_dsn =

[local_kubernetes_executor]
kubernetes_queue = kubernetes

[celery_kubernetes_executor]
kubernetes_queue = kubernetes

[celery]
celery_app_name = airflow.executors.celery_executor
worker_concurrency = 16
worker_prefetch_multiplier = 1
worker_enable_remote_control = True
broker_url = redis://redis:6379/0
flower_host = 0.0.0.0
#flower_url_prefix =
flower_port = 5555
#flower_basic_auth =
sync_parallelism = 0
celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
ssl_active = False
#ssl_key =
#ssl_cert =
#ssl_cacert =
pool = prefork
operation_timeout = 1.0
task_track_started = True
task_adoption_timeout = 600
stalled_task_timeout = 0
task_publish_max_retries = 3
worker_precheck = False

[celery_broker_transport_options]

[dask]
cluster_address = 127.0.0.1:8786
#tls_ca =
#tls_cert =
#tls_key =

[scheduler]
job_heartbeat_sec = 5
scheduler_heartbeat_sec = 5
num_runs = -1
scheduler_idle_sleep_time = 1
min_file_process_interval = 30
parsing_cleanup_interval = 60
dag_dir_list_interval = 300
print_stats_interval = 30
pool_metrics_interval = 5.0
scheduler_health_check_threshold = 30
enable_health_check = False
scheduler_health_check_server_port = 8974
orphaned_tasks_check_interval = 300.0
child_process_log_directory = /opt/airflow/logs/scheduler
scheduler_zombie_task_threshold = 300
zombie_detection_interval = 10.0
catchup_by_default = True
ignore_first_depends_on_past_by_default = True
max_tis_per_query = 512
use_row_level_locking = False
max_dagruns_to_create_per_loop = 10
max_dagruns_per_loop_to_schedule = 20
schedule_after_task_execution = True
parsing_processes = 2
file_parsing_sort_mode = modified_time
standalone_dag_processor = False
max_callbacks_per_loop = 20
dag_stale_not_seen_duration = 600
use_job_schedule = True
allow_trigger_in_future = False
trigger_timeout_check_interval = 15

[triggerer]
default_capacity = 1000

[kerberos]
ccache = /tmp/airflow_krb5_ccache
principal = airflow
reinit_frequency = 3600
kinit_path = kinit
keytab = airflow.keytab
forwardable = True
include_ip = True

[elasticsearch]
#host =
log_id_template = {dag_id}-{task_id}-{run_id}-{map_index}-{try_number}
end_of_log_mark = end_of_log
#frontend =
write_stdout = False
json_format = False
json_fields = asctime, filename, lineno, levelname, message
host_field = host
offset_field = offset

[elasticsearch_configs]
use_ssl = False
verify_certs = True

[kubernetes_executor]
#pod_template_file =
#worker_container_repository =
#worker_container_tag =
namespace = default
delete_worker_pods = True
delete_worker_pods_on_failure = False
worker_pods_creation_batch_size = 1
multi_namespace_mode = False
in_cluster = True
#kube_client_request_args =
#delete_option_kwargs =
enable_tcp_keepalive = True
tcp_keep_idle = 120
tcp_keep_intvl = 30
tcp_keep_cnt = 6
verify_ssl = True
worker_pods_pending_timeout = 300
worker_pods_pending_timeout_check_interval = 120
worker_pods_queued_check_interval = 60
worker_pods_pending_timeout_batch_size = 100

[sensors]
default_timeout = 604800
16 changes: 16 additions & 0 deletions airflow/dags/rxnorm/load_rxnconso.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,19 @@ blank TEXT

COPY sagerx_lake.rxnorm_rxnconso FROM '{{ ti.xcom_pull(task_ids='extract') }}/rrf/RXNCONSO.RRF' CSV DELIMITER '|' ENCODING 'UTF8' ESCAPE E'\b' QUOTE E'\b';
--ESCAPE and QOUTE characters are dummy to remove default

CREATE INDEX IF NOT EXISTS rxnconso_str
ON sagerx_lake.rxnorm_rxnconso(str);


CREATE INDEX IF NOT EXISTS rxnconso_rxcui
ON sagerx_lake.rxnorm_rxnconso(rxcui);


CREATE INDEX IF NOT EXISTS rxnconso_tty
ON sagerx_lake.rxnorm_rxnconso(tty);


CREATE INDEX IF NOT EXISTS rxnconso_code
ON sagerx_lake.rxnorm_rxnconso(code);
--IF NOT EXISTS added if in future table is not always dropped first
2 changes: 1 addition & 1 deletion airflow/dags/rxnorm/load_rxncui.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ DROP TABLE IF EXISTS sagerx_lake.rxnorm_rxncui CASCADE;

COPY sagerx_lake.rxnorm_rxncui
FROM '{{ ti.xcom_pull(task_ids='extract') }}/rrf/RXNCUI.RRF'CSV DELIMITER '|' ENCODING 'UTF8' ESCAPE E'\b' QUOTE E'\b';
--ESCAPE and QOUTE characters are dummy to remove default
--ESCAPE and QOUTE characters are dummy to remove default
9 changes: 9 additions & 0 deletions airflow/dags/rxnorm/load_rxnrel.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,12 @@ CREATE TABLE sagerx_lake.rxnorm_rxnrel (
COPY sagerx_lake.rxnorm_rxnrel
FROM '{{ ti.xcom_pull(task_ids='extract') }}/rrf/RXNREL.RRF' CSV DELIMITER '|' ENCODING 'UTF8' ESCAPE E'\b' QUOTE E'\b';
--ESCAPE and QOUTE characters are dummy to remove default

CREATE INDEX IF NOT EXISTS rxnrel_rxcui1
ON sagerx_lake.rxnorm_rxnrel(rxcui1);

CREATE INDEX IF NOT EXISTS rxnrel_rxcui2
ON sagerx_lake.rxnorm_rxnrel(rxcui2);

CREATE INDEX IF NOT EXISTS rxnrel_rela
ON sagerx_lake.rxnorm_rxnrel(rela);
11 changes: 10 additions & 1 deletion airflow/dags/rxnorm/load_rxnrxnatomarchive.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,13 @@ CREATE TABLE sagerx_lake.rxnorm_rxnatomarchive (

COPY sagerx_lake.rxnorm_rxnatomarchive
FROM '{{ ti.xcom_pull(task_ids='extract') }}/rrf/RXNATOMARCHIVE.RRF' CSV DELIMITER '|' ENCODING 'UTF8' ESCAPE E'\b' QUOTE E'\b';
--ESCAPE and QOUTE characters are dummy to remove default
--ESCAPE and QOUTE characters are dummy to remove default

CREATE INDEX IF NOT EXISTS rxnrel_rxaui
ON sagerx_lake.rxnorm_rxnatomarchive(rxaui);

CREATE INDEX IF NOT EXISTS rxnrel_rxcui
ON sagerx_lake.rxnorm_rxnatomarchive(rxcui);

CREATE INDEX IF NOT EXISTS rxnrel_mergedcui
ON sagerx_lake.rxnorm_rxnatomarchive(merged_to_rxcui);
Loading

0 comments on commit 8c67909

Please sign in to comment.