|
5 | 5 | import logging
|
6 | 6 | import logging.config
|
7 | 7 | from pathlib import Path
|
8 |
| -from typing import Generator, Optional, Sequence, Tuple, Union, cast |
| 8 | +from typing import Optional, Sequence, Tuple, Union, cast |
9 | 9 |
|
10 |
| -import numpy as np |
11 | 10 | import click
|
| 11 | +import numpy as np |
12 | 12 | from cloudpathlib import S3Path
|
13 | 13 | from tqdm.auto import tqdm
|
14 |
| -from cpr_data_access.parser_models import ParserOutput, PDFTextBlock, CONTENT_TYPE_HTML, CONTENT_TYPE_PDF |
| 14 | +from cpr_data_access.parser_models import ParserOutput |
15 | 15 |
|
16 | 16 | from src.index.opensearch import populate_opensearch
|
17 |
| -from src.index_mapping import COMMON_FIELDS |
18 |
| -from src import config |
19 |
| -from src.utils import filter_on_block_type |
| 17 | +from src.index.vespa import populate_vespa |
20 | 18 |
|
21 | 19 | LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
|
22 | 20 | DEFAULT_LOGGING = {
|
|
41 | 39 | logging.config.dictConfig(DEFAULT_LOGGING)
|
42 | 40 |
|
43 | 41 |
|
44 |
| -def get_metadata_dict(task: ParserOutput) -> dict: |
45 |
| - """ |
46 |
| - Get key-value pairs for metadata fields: fields which are not required for search. |
47 |
| -
|
48 |
| - :param task: task from the document parser |
49 |
| - :return dict: key-value pairs for metadata fields |
50 |
| - """ |
51 |
| - |
52 |
| - task_dict = { |
53 |
| - **{k: v for k, v in task.dict().items() if k != "document_metadata"}, |
54 |
| - **{f"document_{k}": v for k, v in task.document_metadata.dict().items()}, |
55 |
| - } |
56 |
| - task_dict["document_name_and_slug"] = f"{task.document_name} {task.document_slug}" |
57 |
| - required_fields = [field for fields in COMMON_FIELDS.values() for field in fields] |
58 |
| - |
59 |
| - return {k: v for k, v in task_dict.items() if k in required_fields} |
60 |
| - |
61 |
| - |
62 |
| -def get_core_document_generator( |
63 |
| - tasks: Sequence[ParserOutput], embedding_dir_as_path: Union[Path, S3Path] |
64 |
| -) -> Generator[dict, None, None]: |
65 |
| - """ |
66 |
| - Generator for core documents to index |
67 |
| -
|
68 |
| - Documents to index are those with fields `for_search_document_name` and |
69 |
| - `for_search_document_description`. |
70 |
| -
|
71 |
| - :param tasks: list of tasks from the document parser |
72 |
| - :param embedding_dir_as_path: directory containing embeddings .npy files. |
73 |
| - These are named with IDs corresponding to the IDs in the tasks. |
74 |
| - :yield Generator[dict, None, None]: generator of Opensearch documents |
75 |
| - """ |
76 |
| - |
77 |
| - for task in tasks: |
78 |
| - all_metadata = get_metadata_dict(task) |
79 |
| - embeddings = np.load(str(embedding_dir_as_path / f"{task.document_id}.npy")) |
80 |
| - |
81 |
| - # Generate document name doc |
82 |
| - yield { |
83 |
| - **{"for_search_document_name": task.document_name}, |
84 |
| - **all_metadata, |
85 |
| - } |
86 |
| - |
87 |
| - # Generate document description doc |
88 |
| - yield { |
89 |
| - **{"for_search_document_description": task.document_description}, |
90 |
| - **all_metadata, |
91 |
| - **{"document_description_embedding": embeddings[0, :].tolist()}, |
92 |
| - } |
93 |
| - |
94 |
| - |
95 |
| -def get_text_document_generator( |
96 |
| - tasks: Sequence[ParserOutput], |
97 |
| - embedding_dir_as_path: Union[Path, S3Path], |
98 |
| - translated: Optional[bool] = None, |
99 |
| - content_types: Optional[Sequence[str]] = None, |
100 |
| -) -> Generator[dict, None, None]: |
101 |
| - """ |
102 |
| - Get generator for text documents to index. |
103 |
| -
|
104 |
| - Documents to index are those containing text passages and their embeddings. |
105 |
| - Optionally filter by whether text passages have been translated and/or the |
106 |
| - document content type. |
107 |
| -
|
108 |
| - :param tasks: list of tasks from the document parser |
109 |
| - :param embedding_dir_as_path: directory containing embeddings .npy files. |
110 |
| - These are named with IDs corresponding to the IDs in the tasks. |
111 |
| - :param translated: optionally filter on whether text passages are translated |
112 |
| - :param content_types: optionally filter on content types |
113 |
| - :yield Generator[dict, None, None]: generator of Opensearch documents |
114 |
| - """ |
115 |
| - |
116 |
| - if translated is not None: |
117 |
| - tasks = [task for task in tasks if task.translated is translated] |
118 |
| - |
119 |
| - if content_types is not None: |
120 |
| - tasks = [task for task in tasks if task.document_content_type in content_types] |
121 |
| - |
122 |
| - _LOGGER.info( |
123 |
| - "Filtering unwanted text block types.", |
124 |
| - extra={"props": {"BLOCKS_TO_FILTER": config.BLOCKS_TO_FILTER}}, |
125 |
| - ) |
126 |
| - tasks = filter_on_block_type( |
127 |
| - inputs=tasks, remove_block_types=config.BLOCKS_TO_FILTER |
128 |
| - ) |
129 |
| - |
130 |
| - for task in tasks: |
131 |
| - all_metadata = get_metadata_dict(task) |
132 |
| - embeddings = np.load(str(embedding_dir_as_path / f"{task.document_id}.npy")) |
133 |
| - |
134 |
| - # Generate text block docs |
135 |
| - text_blocks = task.vertically_flip_text_block_coords().get_text_blocks() |
136 |
| - |
137 |
| - for text_block, embedding in zip(text_blocks, embeddings[1:, :]): |
138 |
| - block_dict = { |
139 |
| - **{ |
140 |
| - "text_block_id": text_block.text_block_id, |
141 |
| - "text": text_block.to_string(), |
142 |
| - "text_embedding": embedding.tolist(), |
143 |
| - }, |
144 |
| - **all_metadata, |
145 |
| - } |
146 |
| - if isinstance(text_block, PDFTextBlock): |
147 |
| - block_dict = { |
148 |
| - **block_dict, |
149 |
| - **{ |
150 |
| - "text_block_coords": text_block.coords, |
151 |
| - "text_block_page": text_block.page_number, |
152 |
| - } |
153 |
| - } |
154 |
| - yield block_dict |
155 |
| - |
156 |
| - |
157 |
| - |
158 | 42 | def _get_index_tasks(
|
159 | 43 | text2embedding_output_dir: str,
|
160 | 44 | s3: bool,
|
@@ -190,72 +74,8 @@ def _get_index_tasks(
|
190 | 74 | return tasks, embedding_dir_as_path
|
191 | 75 |
|
192 | 76 |
|
193 |
| -def main_opensearch( |
194 |
| - text2embedding_output_dir: str, |
195 |
| - s3: bool, |
196 |
| - files_to_index: Optional[str] = None, |
197 |
| - limit: Optional[int] = None, |
198 |
| -) -> None: |
199 |
| - """ |
200 |
| - Index documents into Opensearch. |
201 |
| -
|
202 |
| - :param pdf_parser_output_dir: directory or S3 folder containing output JSON |
203 |
| - files from the PDF parser. |
204 |
| - :param embedding_dir: directory or S3 folder containing embeddings from the |
205 |
| - text2embeddings CLI. |
206 |
| - """ |
207 |
| - tasks, embedding_dir_as_path = _get_index_tasks( |
208 |
| - text2embedding_output_dir, s3, files_to_index, limit |
209 |
| - ) |
210 |
| - |
211 |
| - indices_to_populate: Sequence[Tuple[str, Generator[dict, None, None]]] = [ |
212 |
| - ( |
213 |
| - f"{config.OPENSEARCH_INDEX_PREFIX}_core", |
214 |
| - get_core_document_generator(tasks, embedding_dir_as_path), |
215 |
| - ), |
216 |
| - ( |
217 |
| - f"{config.OPENSEARCH_INDEX_PREFIX}_pdfs_non_translated", |
218 |
| - get_text_document_generator( |
219 |
| - tasks, |
220 |
| - embedding_dir_as_path, |
221 |
| - translated=False, |
222 |
| - content_types=[CONTENT_TYPE_PDF], |
223 |
| - ), |
224 |
| - ), |
225 |
| - ( |
226 |
| - f"{config.OPENSEARCH_INDEX_PREFIX}_pdfs_translated", |
227 |
| - get_text_document_generator( |
228 |
| - tasks, |
229 |
| - embedding_dir_as_path, |
230 |
| - translated=True, |
231 |
| - content_types=[CONTENT_TYPE_PDF], |
232 |
| - ), |
233 |
| - ), |
234 |
| - ( |
235 |
| - f"{config.OPENSEARCH_INDEX_PREFIX}_htmls_non_translated", |
236 |
| - get_text_document_generator( |
237 |
| - tasks, |
238 |
| - embedding_dir_as_path, |
239 |
| - translated=False, |
240 |
| - content_types=[CONTENT_TYPE_HTML], |
241 |
| - ), |
242 |
| - ), |
243 |
| - ( |
244 |
| - f"{config.OPENSEARCH_INDEX_PREFIX}_htmls_translated", |
245 |
| - get_text_document_generator( |
246 |
| - tasks, |
247 |
| - embedding_dir_as_path, |
248 |
| - translated=True, |
249 |
| - content_types=[CONTENT_TYPE_HTML], |
250 |
| - ), |
251 |
| - ), |
252 |
| - ] |
253 |
| - |
254 |
| - populate_opensearch(indices_to_populate) |
255 |
| - |
256 |
| - |
257 | 77 | @click.command()
|
258 |
| -@click.argument("text2embedding-output-dir") |
| 78 | +@click.argument("indexer_input_dir") |
259 | 79 | @click.option(
|
260 | 80 | "--s3",
|
261 | 81 | is_flag=True,
|
@@ -283,19 +103,25 @@ def main_opensearch(
|
283 | 103 | help="Which search database type to populate.",
|
284 | 104 | )
|
285 | 105 | def run_as_cli(
|
286 |
| - text2embedding_output_dir: str, |
| 106 | + indexer_input_dir: str, |
287 | 107 | s3: bool,
|
288 | 108 | files_to_index: Optional[str],
|
289 | 109 | limit: Optional[int],
|
290 | 110 | index_type: str,
|
291 | 111 | ) -> None:
|
292 | 112 | if index_type.lower() == "opensearch":
|
293 |
| - main_opensearch(text2embedding_output_dir, s3, files_to_index, limit) |
| 113 | + tasks, embedding_dir_as_path = _get_index_tasks( |
| 114 | + indexer_input_dir, s3, files_to_index, limit |
| 115 | + ) |
| 116 | + populate_opensearch(tasks=tasks, embedding_dir_as_path=embedding_dir_as_path) |
| 117 | + sys.exit(0) |
| 118 | + elif index_type.lower() == "vespa": |
| 119 | + _LOGGER.warning("Vespa indexing still experimental") |
| 120 | + tasks, embedding_dir_as_path = _get_index_tasks( |
| 121 | + indexer_input_dir, s3, files_to_index, limit |
| 122 | + ) |
| 123 | + populate_vespa(tasks=tasks, embedding_dir_as_path=embedding_dir_as_path) |
294 | 124 | sys.exit(0)
|
295 |
| - if index_type.lower() == "vespa": |
296 |
| - # TODO: implement main_vespa(...) |
297 |
| - _LOGGER.error("Vespa indexing not yet implemented") |
298 |
| - sys.exit(1) |
299 | 125 | _LOGGER.error(f"Unknown index type: {index_type}")
|
300 | 126 | sys.exit(1)
|
301 | 127 |
|
|
0 commit comments