Skip to content

Commit

Permalink
Use filename from movefiles (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
judoole committed Jun 23, 2022
1 parent 773b63a commit ead2932
Showing 1 changed file with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,8 @@ def __init__(self,
self.single_file = single_file
if self.single_file:
self._source_objects = [XCOM_TOP_FILE]
self.configuration = {**self.configuration, 'filename': XCOM_TOP_FILE}
else:
self._source_objects = f"{{{{ ti.xcom_pull(task_ids='{SENSOR_TASK_ID}') }}}}"
self.configuration = {**self.configuration,
'filenames': f"{{{{ ti.xcom_pull(task_ids='{SENSOR_TASK_ID}') }}}}"}

def create_sensor(self) -> BaseSensorOperator:
return GCSObjectsWithPrefixExistenceSensor(
Expand All @@ -76,7 +73,7 @@ def create_sensor(self) -> BaseSensorOperator:
)

def create_downstream_sensor(self) -> Union[TaskMixin, Sequence[TaskMixin]]:
return GoogleCloudStorageCopyOperator(
move_files = GoogleCloudStorageCopyOperator(
task_id="move_files",
source_bucket=self.bucket,
source_objects=self._source_objects,
Expand All @@ -87,6 +84,16 @@ def create_downstream_sensor(self) -> Union[TaskMixin, Sequence[TaskMixin]]:
delegate_to=self.delegate_to
)

# Add filename(s) to the dag_conf
if self.single_file:
self.configuration = {**self.configuration,
'filename': "{{ ti.xcom_pull(task_ids='move_files')[0] }}"}
else:
self.configuration = {**self.configuration,
'filenames': "{{ ti.xcom_pull(task_ids='move_files') }}"}

return move_files

def default_doc_md(self):
return f"""# Triggering DAG for {self.trigger_dag_id}
Expand Down

0 comments on commit ead2932

Please sign in to comment.