Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions data_scraper/core/ci_logs_scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
class CILogsRecord(TypedDict):
"""CILogs data point."""
url: str
topic: str
test_name: str
text: str
components: list[str]
kind: str
Expand All @@ -37,7 +37,7 @@ def get_records(self, documents: list[dict]) -> list[CILogsRecord]:
for document in documents:
ci_logs_records.append({
"url": document["url"],
"topic": document["test_name"],
"test_name": document["test_name"],
"text": document["traceback"],
"components": [],
"kind": "zuul_jobs",
Expand Down
15 changes: 9 additions & 6 deletions data_scraper/core/scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ def record_postprocessing(self, record: dict) -> None:
after it has been created but before storing in vectorDB."""
raise NotImplementedError

def store_records(self, records: list, recreate: bool = True) -> None:
def store_records(self,
records: list,
record_fields_for_key: tuple[str, ...],
recreate: bool = True) -> None:
"""Process text and store embeddings in database."""
vector_size = self.get_embedding_dimension()

Expand All @@ -107,9 +110,9 @@ def store_records(self, records: list, recreate: bool = True) -> None:
raise IOError

for record in tqdm(records, desc="Processing embeddings"):
if record['url']:
record_id = str(uuid.uuid5(uuid.NAMESPACE_URL, record["url"]))
else:
combined_key = "_".join([record[field] for field in record_fields_for_key])
record_id = str(uuid.uuid5(uuid.NAMESPACE_URL, combined_key))
if not record['url']:
LOG.error("Missing required URL field")
continue

Expand Down Expand Up @@ -148,7 +151,7 @@ def get_records(self, documents: List[Dict]) -> list[dict]:
"""Convert raw data into list of dictionaries."""
raise NotImplementedError

def run(self):
def run(self, record_fields_for_key=("url",)):
"""Main execution method."""
documents = self.get_documents()
if not documents:
Expand All @@ -159,7 +162,7 @@ def run(self):
records = self.cleanup_records(records)

# Process and store embeddings
self.store_records(records, self.config["recreate_collection"])
self.store_records(records, record_fields_for_key, self.config["recreate_collection"])

# Print final stats
stats = self.db_manager.get_collection_stats(self.config["db_collection_name"])
Expand Down
2 changes: 1 addition & 1 deletion data_scraper/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def ci_logs_scraper() -> None:

# when json is ready, proceed with tracebacks and store them to QdrantDB
scraper = CILogsScraper(config_args)
scraper.run()
scraper.run(("url", "test_name"))


def solutions_scraper() -> None:
Expand Down
4 changes: 2 additions & 2 deletions data_scraper/processors/ci_logs_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@

# Test path constants
TEST_OPERATOR_PATH = "logs/controller-0/ci-framework-data/tests/test_operator"
TEMPEST_TEST_PATTERN = "tempest-tests"
TOBIKO_TEST_PATTERN = "tobiko-tests"
TEMPEST_TEST_PATTERN = "tempest-"
TOBIKO_TEST_PATTERN = "tobiko-"

async def fetch_with_gssapi(url, params=None, timeout=30.0):
"""
Expand Down
Loading