diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..540914d --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.env +filtered/__pycache__ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..2588741 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,24 @@ +FROM --platform=linux/x86_64 ubuntu:24.04 + +ENV DEBIAN_FRONTEND=noninteractive +ENV TZ=UTC + +RUN apt-get update && \ + apt-get install -y \ + wget \ + xz-utils \ + bzip2 \ + git \ + python3-pip \ + python3 \ + && apt-get install -y software-properties-common \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . + +RUN pip install -r requirements.txt --break-system-packages + +COPY filtered/ ./filtered/ + +CMD ["python3.11", "-m", "celery", "-A", "filtered.worker", "worker", "--loglevel=info", "--concurrency=1"] \ No newline at end of file diff --git a/filtered/__init__.py b/filtered/__init__.py new file mode 100644 index 0000000..e305d91 --- /dev/null +++ b/filtered/__init__.py @@ -0,0 +1,4 @@ +from .distributed import * +from .filter import * +from .split import * +from .worker import * \ No newline at end of file diff --git a/filtered/distributed.py b/filtered/distributed.py new file mode 100644 index 0000000..12caa63 --- /dev/null +++ b/filtered/distributed.py @@ -0,0 +1,85 @@ +from distributask.distributask import create_from_config +from .filter import read_json_in_batches +from .worker import run_job +from tqdm import tqdm +import time + +if __name__ == "__main__": + + input_filename = "datasets/cap3d_captions.json" + batch_size = 10000 + + distributask = create_from_config() + + max_price = 0.25 + max_nodes = 25 + docker_image = "antbaez/filter-worker:latest" + module_name = "filtered.worker" + + redis_client = distributask.get_redis_connection() + + rented_nodes = distributask.rent_nodes( + max_price, max_nodes, docker_image, module_name + ) + print("Total nodes rented: ", len(rented_nodes)) + + distributask.register_function(run_job) + + while True: + user_input = input("press r when workers are ready: ") + if user_input == "r": + break + + total_batches = 0 + + print("Sending tasks") + tasks = [] + + json_batches = [batch for batch in read_json_in_batches(input_filename, batch_size)] + print(f"number of batches: {len(json_batches)}") + + num_batches = len(json_batches) + for i in range(num_batches): + + batch = json_batches[i] + total_batches += 1 + + print(total_batches) + task = distributask.execute_function( + "run_job", {"batch_index": total_batches, "batch": batch} + ) + + tasks.append(task) + + first_task_done = False + print("Tasks sent. Starting monitoring") + + inactivity_log = {node["instance_id"]: 0 for node in rented_nodes} + + start_time = time.time() + with tqdm(total=len(tasks), unit="task") as pbar: + while not all(task.ready() for task in tasks): + + current_tasks = sum([task.ready() for task in tasks]) + pbar.update(current_tasks - pbar.n) + + time.sleep(1) + + current_time = time.time() + if current_time - start_time > 60: + start_time = time.time() + + for node in rented_nodes: + log_response = distributask.get_node_log(node) + if log_response.status_code == 200: + try: + last_msg = log_response.text.splitlines()[-1] + if ("Task complete" in last_msg and inactivity_log[node["instance_id"]] == 0): + inactivity_log[node["instance_id"]] = 1 + elif ("Task complete" in last_msg and inactivity_log[node["instance_id"]] == 1): + distributask.terminate_nodes([node]) + print("node terminated") + else: + inactivity_log[node["instance_id"]] == 0 + except: + pass diff --git a/filtered/filter.py b/filtered/filter.py index 584960a..44ab912 100644 --- a/filtered/filter.py +++ b/filtered/filter.py @@ -18,7 +18,7 @@ def detect_objects(self, caption): if not objects: objects = self._extract_noun_phrases(caption) - print("These are the objects:", objects) + # print("These are the objects:", objects) return objects def _extract_noun_phrases(self, text): @@ -147,12 +147,12 @@ def test_caption_filtering(): if total_filtered_count >= write_batch_size or current_batch == total_batches: write_filtered_json(output_filename, filtered_data, first_batch=first_batch, last_batch=(current_batch == total_batches)) - print(f"Wrote batch {current_batch}/{total_batches} with {total_filtered_count} filtered captions") + # print(f"Wrote batch {current_batch}/{total_batches} with {total_filtered_count} filtered captions") filtered_data = {} total_filtered_count = 0 first_batch = False - print("Filtering and writing completed.") + # print("Filtering and writing completed.") # Optionally, you can keep the test function call if you want to run tests # test_caption_filtering() \ No newline at end of file diff --git a/filtered/worker.py b/filtered/worker.py new file mode 100644 index 0000000..c59cc18 --- /dev/null +++ b/filtered/worker.py @@ -0,0 +1,27 @@ +import sys +from .filter import filter_captions, write_filtered_json + +def run_job(batch_index, batch): + + if len(str(batch_index)) == 1: + batch_num = f"0{batch_index}" + else: + batch_num = f"{batch_index}" + + output_filename = f"batch_{batch_num}" + + filtered_batch = filter_captions(batch) + write_filtered_json(output_filename, filtered_batch) + + distributask.upload_file(output_filename) + + return "Task complete" + + +if __name__ == "__main__" or any("celery" in arg for arg in sys.argv): + from distributask.distributask import create_from_config + + distributask = create_from_config() + distributask.register_function(run_job) + + celery = distributask.app \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..8b3b0ef --- /dev/null +++ b/requirements.txt @@ -0,0 +1,10 @@ +requests +fsspec +celery +redis +huggingface_hub +python-dotenv +omegaconf +tqdm +gliner +distributask