From 8b7815e144520bf7393865d0f504b1930f0f1cc8 Mon Sep 17 00:00:00 2001 From: Satarupa Guha Date: Wed, 21 Feb 2024 18:16:05 -0800 Subject: [PATCH] protect urls during chunking (#635) --- scripts/data_utils.py | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/scripts/data_utils.py b/scripts/data_utils.py index 4a4c1b928b..e9a3cabbc7 100644 --- a/scripts/data_utils.py +++ b/scripts/data_utils.py @@ -104,10 +104,26 @@ def extract_caption(self, text): return caption + def mask_urls(self, text) -> Tuple[Dict[str, str], str]: + + def find_urls(string): + regex = r"(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^()\s<>]+|\(([^()\s<>]+|(\([^()\s<>]+\)))*\))+(?:\(([^()\s<>]+|(\([^()\s<>]+\)))*\)|[^()\s`!()\[\]{};:'\".,<>?«»“”‘’]))" + urls = re.findall(regex, string) + return [x[0] for x in urls] + url_dict = {} + masked_text = text + urls = set(find_urls(text)) + + for i, url in enumerate(urls): + masked_text = masked_text.replace(url, f"##URL{i}##") + url_dict[f"##URL{i}##"] = url + return url_dict, masked_text + def split_text(self, text: str) -> List[str]: + url_dict, masked_text = self.mask_urls(text) start_tag = self._table_tags["table_open"] end_tag = self._table_tags["table_close"] - splits = text.split(start_tag) + splits = masked_text.split(start_tag) final_chunks = self.chunk_rest(splits[0]) # the first split is before the first table tag so it is regular text @@ -128,7 +144,7 @@ def split_text(self, text: str) -> List[str]: table_caption_prefix = "" - final_final_chunks = [chunk for chunk, chunk_size in merge_chunks_serially(final_chunks, self._chunk_size)] + final_final_chunks = [chunk for chunk, chunk_size in merge_chunks_serially(final_chunks, self._chunk_size, url_dict)] return final_final_chunks @@ -593,11 +609,17 @@ def extract_pdf_content(file_path, form_recognizer_client, use_layout=False): full_text = "".join([page_text for _, _, page_text in page_map]) return full_text -def merge_chunks_serially(chunked_content_list: List[str], num_tokens: int) -> Generator[Tuple[str, int], None, None]: +def merge_chunks_serially(chunked_content_list: List[str], num_tokens: int, url_dict: Dict[str, str]={}) -> Generator[Tuple[str, int], None, None]: + def unmask_urls(text, url_dict={}): + if "##URL" in text: + for key, value in url_dict.items(): + text = text.replace(key, value) + return text # TODO: solve for token overlap current_chunk = "" total_size = 0 for chunked_content in chunked_content_list: + chunked_content = unmask_urls(chunked_content, url_dict) chunk_size = TOKEN_ESTIMATOR.estimate_tokens(chunked_content) if total_size > 0: new_size = total_size + chunk_size