From 56821251cc4dc90107b4eb7d50fd6a46c4aeaa37 Mon Sep 17 00:00:00 2001 From: Eric Joanis Date: Thu, 31 Oct 2024 16:59:55 -0400 Subject: [PATCH 1/7] refactor: simplify merge_if_same_label to clearer merge_same_type_tokens While merge_if_same_label was more generic, we never reused it, and it was really hard to understand what it did. --- g2p/mappings/tokenizer.py | 5 ++--- g2p/mappings/utils.py | 29 ++++++++++++++--------------- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/g2p/mappings/tokenizer.py b/g2p/mappings/tokenizer.py index d799241b..688883d7 100644 --- a/g2p/mappings/tokenizer.py +++ b/g2p/mappings/tokenizer.py @@ -13,7 +13,7 @@ from g2p.log import LOGGER from g2p.mappings import Mapping from g2p.mappings.langs import LANGS_NETWORK -from g2p.mappings.utils import get_unicode_category, is_ipa, merge_if_same_label +from g2p.mappings.utils import get_unicode_category, is_ipa, merge_same_type_tokens from g2p.shared_types import BaseTokenizer @@ -57,8 +57,7 @@ def tokenize_text(self, text): and units[i + 1]["is_word"] ): unit["is_word"] = True - units = merge_if_same_label(units, "text", "is_word") - return units + return merge_same_type_tokens(units) class SpecializedTokenizer(Tokenizer): diff --git a/g2p/mappings/utils.py b/g2p/mappings/utils.py index fcd3e294..fd095b68 100644 --- a/g2p/mappings/utils.py +++ b/g2p/mappings/utils.py @@ -596,22 +596,21 @@ def ignore_aliases(self, *_args): return True -def merge_if_same_label(lst_of_dicts, text_key, label_key): - results = [] - current_item = None - for dct in lst_of_dicts: - if label_key not in dct: - dct[label_key] = None - if not current_item: - current_item = deepcopy(dct) - elif dct[label_key] == current_item[label_key]: - current_item[text_key] += dct[text_key] +def merge_same_type_tokens(tokens: list) -> list: + """Merge tokens that have the same type. + + Destroys tokens in the process. + Tokens are represented as dicts {"text": str, "is_word": bool}. + """ + if not tokens: + return + merged_tokens = [tokens[0]] + for token in tokens[1:]: + if token["is_word"] == merged_tokens[-1]["is_word"]: + merged_tokens[-1]["text"] += token["text"] else: - results.append(current_item) - current_item = deepcopy(dct) - if current_item: - results.append(current_item) - return results + merged_tokens.append(token) + return merged_tokens CATEGORIES = { From c419518203f98798f7338e43431ecad98072c157 Mon Sep 17 00:00:00 2001 From: Eric Joanis Date: Thu, 31 Oct 2024 17:03:09 -0400 Subject: [PATCH 2/7] feat: add a lexicon-based tokenizer, esp. for English --- g2p/mappings/tokenizer.py | 81 ++++++++++++++++++++++++++++++++++++- g2p/tests/test_tokenizer.py | 18 +++++++++ 2 files changed, 97 insertions(+), 2 deletions(-) diff --git a/g2p/mappings/tokenizer.py b/g2p/mappings/tokenizer.py index 688883d7..1725334d 100644 --- a/g2p/mappings/tokenizer.py +++ b/g2p/mappings/tokenizer.py @@ -13,7 +13,13 @@ from g2p.log import LOGGER from g2p.mappings import Mapping from g2p.mappings.langs import LANGS_NETWORK -from g2p.mappings.utils import get_unicode_category, is_ipa, merge_same_type_tokens +from g2p.mappings.utils import ( + MAPPING_TYPE, + find_alignment, + get_unicode_category, + is_ipa, + merge_same_type_tokens, +) from g2p.shared_types import BaseTokenizer @@ -97,6 +103,74 @@ def tokenize_aux(self, text): return self.regex.findall(text) +class LexiconTokenizer(Tokenizer): + """Lexicon-based tokenizer will consider any entry in the lexicon a token, + even if it contains punctuation characters. For text not in the lexicon, + falls back to the default tokenization. + """ + + def __init__(self, mapping: Mapping): + super().__init__() + self.mapping = mapping + self.lang = mapping.language_name + + def _recursive_helper(self, units: list, output_units: list): + """Emit the long prefix found in the lexicon, if any, as a token. + If None, emit the first unit as a token. + Recursively process the rest of the units. + """ + if not units: + return + if len(units) == 1: + output_units.append(units[0]) + return + for i in range(len(units), 0, -1): + candidate = "".join([u["text"] for u in units[:i]]) + if find_alignment(self.mapping.alignments, candidate.lower()): + output_units.append({"text": candidate, "is_word": True}) + return self._recursive_helper(units[i:], output_units) + # No prefix found, emit the first unit as a token + output_units.append(units[0]) + self._recursive_helper(units[1:], output_units) + + def split_non_word_units(self, units): + """Split non-word units into characters, to be able to match them in the lexicon.""" + new_units = [] + for unit in units: + if not unit["is_word"]: + new_units.extend( + [{"text": char, "is_word": False} for char in unit["text"]] + ) + else: + new_units.append(unit) + return new_units + + def merge_non_word_units(self, units): + """Merge consecutive non-word units into a single token.""" + if not units: + return units + merged_units = [units[0]] + for unit in units[1:]: + if not unit["is_word"] and not merged_units[-1]["is_word"]: + merged_units[-1]["text"] += unit["text"] + else: + merged_units.append(unit) + return merged_units + + def tokenize_text(self, text): + blocks = re.split(r"(\s+)", text) + output_units = [] + for i, block in enumerate(blocks): + if i % 2 == 1 and block: + output_units.append({"text": block, "is_word": False}) + else: + default_units = super().tokenize_text(block) + candidate_units = self.split_non_word_units(default_units) + self._recursive_helper(candidate_units, output_units) + + return self.merge_non_word_units(output_units) + + class MultiHopTokenizer(SpecializedTokenizer): def __init__(self, mappings: List[Mapping]): self.delim = "" @@ -201,7 +275,10 @@ def make_tokenizer( # noqa C901 # Build a one-hop tokenizer try: mapping = Mapping.find_mapping(in_lang=in_lang, out_lang=out_lang) - self.tokenizers[tokenizer_key] = SpecializedTokenizer(mapping) + if mapping.type == MAPPING_TYPE.lexicon: + self.tokenizers[tokenizer_key] = LexiconTokenizer(mapping) + else: + self.tokenizers[tokenizer_key] = SpecializedTokenizer(mapping) except MappingMissing: self.tokenizers[tokenizer_key] = self.tokenizers[None] LOGGER.warning( diff --git a/g2p/tests/test_tokenizer.py b/g2p/tests/test_tokenizer.py index d51f764c..812f0062 100755 --- a/g2p/tests/test_tokenizer.py +++ b/g2p/tests/test_tokenizer.py @@ -44,6 +44,24 @@ def test_tokenize_eng(self): self.assertFalse(tokens[1]["is_word"]) self.assertEqual(tokens[1]["text"], " ") + def test_lexicon_tokenizer(self): + tokenizer = tok.make_tokenizer("eng") + tests = [ + ("It's", ["It's"]), + ("'cause", ["'cause"]), + ('"\'cause"', ['"', "'cause", '"']), + ("aardvark's", ["aardvark", "'s"]), + ("'aardvark's'", ["'", "aardvark", "'s", "'"]), + ("ten a.m.", ["ten", " ", "a.m."]), + ('ten "a.m.,!"', ["ten", ' "', "a.m.", ',!"']), + ("all-out war", ["all-out", " ", "war"]), # all-out is in the lexicon + ("all-in: nonsense", ["all", "-", "in", ": ", "nonsense"]), # all-in is not + ] + for input_text, expected_tokens in tests: + with self.subTest(input_text=input_text): + tokens = tokenizer.tokenize_text(input_text) + self.assertEqual([x["text"] for x in tokens], expected_tokens) + def test_tokenize_win(self): """win is easy to tokenize because win -> win-ipa exists and has ' in its inventory""" input = "p'ōį̄ą" From d662622f627478906fb4669b9958bb0ce44d838f Mon Sep 17 00:00:00 2001 From: Eric Joanis Date: Thu, 31 Oct 2024 17:19:09 -0400 Subject: [PATCH 3/7] refactor: move merge_non_word_tokens and split_non_word_tokens to utils --- g2p/mappings/tokenizer.py | 61 +++++++++++++-------------------------- g2p/mappings/utils.py | 36 ++++++++++++++++++----- 2 files changed, 49 insertions(+), 48 deletions(-) diff --git a/g2p/mappings/tokenizer.py b/g2p/mappings/tokenizer.py index 1725334d..4e5f9b04 100644 --- a/g2p/mappings/tokenizer.py +++ b/g2p/mappings/tokenizer.py @@ -18,7 +18,9 @@ find_alignment, get_unicode_category, is_ipa, + merge_non_word_tokens, merge_same_type_tokens, + split_non_word_tokens, ) from g2p.shared_types import BaseTokenizer @@ -114,61 +116,38 @@ def __init__(self, mapping: Mapping): self.mapping = mapping self.lang = mapping.language_name - def _recursive_helper(self, units: list, output_units: list): - """Emit the long prefix found in the lexicon, if any, as a token. + def _recursive_helper(self, tokens: list, output_tokens: list): + """Emit the longest prefix found in the lexicon, if any, as a token. If None, emit the first unit as a token. Recursively process the rest of the units. """ - if not units: + if not tokens: return - if len(units) == 1: - output_units.append(units[0]) + if len(tokens) == 1: + output_tokens.append(tokens[0]) return - for i in range(len(units), 0, -1): - candidate = "".join([u["text"] for u in units[:i]]) + for i in range(len(tokens), 0, -1): + candidate = "".join([u["text"] for u in tokens[:i]]) if find_alignment(self.mapping.alignments, candidate.lower()): - output_units.append({"text": candidate, "is_word": True}) - return self._recursive_helper(units[i:], output_units) + output_tokens.append({"text": candidate, "is_word": True}) + return self._recursive_helper(tokens[i:], output_tokens) # No prefix found, emit the first unit as a token - output_units.append(units[0]) - self._recursive_helper(units[1:], output_units) - - def split_non_word_units(self, units): - """Split non-word units into characters, to be able to match them in the lexicon.""" - new_units = [] - for unit in units: - if not unit["is_word"]: - new_units.extend( - [{"text": char, "is_word": False} for char in unit["text"]] - ) - else: - new_units.append(unit) - return new_units - - def merge_non_word_units(self, units): - """Merge consecutive non-word units into a single token.""" - if not units: - return units - merged_units = [units[0]] - for unit in units[1:]: - if not unit["is_word"] and not merged_units[-1]["is_word"]: - merged_units[-1]["text"] += unit["text"] - else: - merged_units.append(unit) - return merged_units + output_tokens.append(tokens[0]) + self._recursive_helper(tokens[1:], output_tokens) def tokenize_text(self, text): blocks = re.split(r"(\s+)", text) - output_units = [] + output_tokens = [] for i, block in enumerate(blocks): if i % 2 == 1 and block: - output_units.append({"text": block, "is_word": False}) + output_tokens.append({"text": block, "is_word": False}) else: - default_units = super().tokenize_text(block) - candidate_units = self.split_non_word_units(default_units) - self._recursive_helper(candidate_units, output_units) + default_tokens = super().tokenize_text(block) + # Split non-word tokens into smaller parts for lexicon lookup + candidate_tokens = split_non_word_tokens(default_tokens) + self._recursive_helper(candidate_tokens, output_tokens) - return self.merge_non_word_units(output_units) + return merge_non_word_tokens(output_tokens) class MultiHopTokenizer(SpecializedTokenizer): diff --git a/g2p/mappings/utils.py b/g2p/mappings/utils.py index fd095b68..462c78c2 100644 --- a/g2p/mappings/utils.py +++ b/g2p/mappings/utils.py @@ -10,7 +10,6 @@ import unicodedata as ud from bisect import bisect_left from collections import defaultdict -from copy import deepcopy from enum import Enum from pathlib import Path from typing import ( @@ -597,13 +596,10 @@ def ignore_aliases(self, *_args): def merge_same_type_tokens(tokens: list) -> list: - """Merge tokens that have the same type. - - Destroys tokens in the process. - Tokens are represented as dicts {"text": str, "is_word": bool}. - """ + """Merge tokens that have the same type. Destroys tokens in the process. + Tokens are represented as dicts {"text": str, "is_word": bool}.""" if not tokens: - return + return [] merged_tokens = [tokens[0]] for token in tokens[1:]: if token["is_word"] == merged_tokens[-1]["is_word"]: @@ -613,6 +609,32 @@ def merge_same_type_tokens(tokens: list) -> list: return merged_tokens +def split_non_word_tokens(tokens: list) -> list: + """Split non-word units into characters. Destroys tokens in the process.""" + new_tokens = [] + for token in tokens: + if not token["is_word"]: + new_tokens.extend( + [{"text": char, "is_word": False} for char in token["text"]] + ) + else: + new_tokens.append(token) + return new_tokens + + +def merge_non_word_tokens(tokens: list) -> list: + """Merge consecutive non-word units into a single token. Destroys tokens in the process.""" + if not tokens: + return tokens + merged_tokens = [tokens[0]] + for token in tokens[1:]: + if not token["is_word"] and not merged_tokens[-1]["is_word"]: + merged_tokens[-1]["text"] += token["text"] + else: + merged_tokens.append(token) + return merged_tokens + + CATEGORIES = { "Cc": "other", # Other, Control "Cf": "other", # Other, Format From 0b2c83c8d0231d357222bc26fb0246fcab9f168e Mon Sep 17 00:00:00 2001 From: Eric Joanis Date: Fri, 1 Nov 2024 15:01:59 -0400 Subject: [PATCH 4/7] test: better unit testing for mappings.utils --- g2p/mappings/utils.py | 32 +++++++++++++++++++++++++------- g2p/tests/test_utils.py | 10 +++++++--- 2 files changed, 32 insertions(+), 10 deletions(-) diff --git a/g2p/mappings/utils.py b/g2p/mappings/utils.py index 462c78c2..de1d1cfe 100644 --- a/g2p/mappings/utils.py +++ b/g2p/mappings/utils.py @@ -150,7 +150,7 @@ def normalize(inp: str, norm_form: Union[str, None]): if norm_form is None or norm_form == "none": return unicode_escape(inp) if norm_form not in ["NFC", "NFD", "NFKC", "NFKD"]: - raise exceptions.InvalidNormalization(normalize) + raise exceptions.InvalidNormalization(norm_form) # Sadly mypy doesn't do narrowing to literals properly norm_form = cast(Literal["NFC", "NFD", "NFKC", "NFKD"], norm_form) normalized = ud.normalize(norm_form, unicode_escape(inp)) @@ -177,8 +177,8 @@ def compose_indices( """Compose indices1 + indices2 into direct arcs from the inputs of indices1 to the outputs of indices 2. - E.g., [(0,1), (1,4)] composed with [(0,0), (1,2), (1,3), (4,2)] is - [(0,2), (0,3), (1,2)] + >>> compose_indices([(0,1), (1,4)], [(0,0), (1,2), (1,3), (4,2)]) + [(0, 2), (0, 3), (1, 2)] """ # for O(1) lookup of arcs leaving indices2 indices2_as_dict = defaultdict(dict) # type: ignore @@ -238,7 +238,7 @@ def normalize_with_indices( return normalize_to_NFD_with_indices(inp, norm_form) if norm_form in ("none", None): return inp, [(i, i) for i in range(len(inp))] - raise exceptions.InvalidNormalization(normalize) + raise exceptions.InvalidNormalization(norm_form) def unicode_escape(text): @@ -597,7 +597,13 @@ def ignore_aliases(self, *_args): def merge_same_type_tokens(tokens: list) -> list: """Merge tokens that have the same type. Destroys tokens in the process. - Tokens are represented as dicts {"text": str, "is_word": bool}.""" + Tokens are represented as dicts {"text": str, "is_word": bool}. + + >>> merge_same_type_tokens([{"text": "test", "is_word": True}, {"text": "b", "is_word": True}, {"text": ":", "is_word": False}, {"text": ",", "is_word": False}]) + [{'text': 'testb', 'is_word': True}, {'text': ':,', 'is_word': False}] + >>> merge_same_type_tokens([]) + [] + """ if not tokens: return [] merged_tokens = [tokens[0]] @@ -610,7 +616,13 @@ def merge_same_type_tokens(tokens: list) -> list: def split_non_word_tokens(tokens: list) -> list: - """Split non-word units into characters. Destroys tokens in the process.""" + """Split non-word units into characters. Destroys tokens in the process. + + >>> split_non_word_tokens([{"text": "test", "is_word": True}, {"text": ":,- ", "is_word": False}, {"text": "", "is_word": False}]) + [{'text': 'test', 'is_word': True}, {'text': ':', 'is_word': False}, {'text': ',', 'is_word': False}, {'text': '-', 'is_word': False}, {'text': ' ', 'is_word': False}] + >>> split_non_word_tokens([]) + [] + """ new_tokens = [] for token in tokens: if not token["is_word"]: @@ -623,7 +635,13 @@ def split_non_word_tokens(tokens: list) -> list: def merge_non_word_tokens(tokens: list) -> list: - """Merge consecutive non-word units into a single token. Destroys tokens in the process.""" + """Merge consecutive non-word units into a single token. Destroys tokens in the process. + + >>> merge_non_word_tokens([{"text": "test", "is_word": True}, {"text": ":", "is_word": False}, {"text": ",", "is_word": False}]) + [{'text': 'test', 'is_word': True}, {'text': ':,', 'is_word': False}] + >>> merge_non_word_tokens([]) + [] + """ if not tokens: return tokens merged_tokens = [tokens[0]] diff --git a/g2p/tests/test_utils.py b/g2p/tests/test_utils.py index 5d9b003a..037c6ffb 100755 --- a/g2p/tests/test_utils.py +++ b/g2p/tests/test_utils.py @@ -14,9 +14,9 @@ from pep440 import is_canonical import g2p +import g2p.exceptions from g2p import get_arpabet_langs from g2p._version import VERSION, version_tuple -from g2p.exceptions import IncorrectFileType, RecursionError from g2p.log import LOGGER from g2p.mappings import Mapping, utils from g2p.mappings.utils import RULE_ORDERING_ENUM, Rule @@ -60,7 +60,7 @@ def test_abb_expand(self): ) # shouldn't allow self-referential abbreviations expanded_plain = utils.expand_abbreviations("test", test_dict) expanded_bad_plain = utils.expand_abbreviations("test", bad_dict) - with self.assertRaises(RecursionError): + with self.assertRaises(g2p.exceptions.RecursionError): utils.expand_abbreviations("HIGH_VOWELS", bad_dict) expanded_non_recursive = utils.expand_abbreviations("HIGH_VOWELS", test_dict) expanded_recursive = utils.expand_abbreviations("VOWELS", test_dict) @@ -156,7 +156,7 @@ def test_escape_special(self): ) def test_load_abbs(self): - with self.assertRaises(IncorrectFileType): + with self.assertRaises(g2p.exceptions.IncorrectFileType): utils.load_abbreviations_from_file( os.path.join(PUBLIC_DIR, "mappings", "abbreviations.json") ) @@ -212,6 +212,10 @@ def test_generated_mapping(self): test_config_added.display_name, "test custom to test-out custom" ) + def test_bad_normalization(self): + with self.assertRaises(g2p.exceptions.InvalidNormalization): + utils.normalize_with_indices("test", "bad") + def test_normalize_to_NFD_with_indices(self): # Usefull site to get combining character code points: # http://www.alanwood.net/unicode/combining_diacritical_marks.html From 163bc3934242f4064ef57374f2205acea3301a93 Mon Sep 17 00:00:00 2001 From: Eric Joanis Date: Fri, 1 Nov 2024 15:10:21 -0400 Subject: [PATCH 5/7] refactor: import utils as a whole instead of each function --- g2p/mappings/tokenizer.py | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/g2p/mappings/tokenizer.py b/g2p/mappings/tokenizer.py index 4e5f9b04..5bc67cb1 100644 --- a/g2p/mappings/tokenizer.py +++ b/g2p/mappings/tokenizer.py @@ -11,17 +11,9 @@ from g2p.exceptions import MappingMissing from g2p.log import LOGGER -from g2p.mappings import Mapping +from g2p.mappings import Mapping, utils from g2p.mappings.langs import LANGS_NETWORK -from g2p.mappings.utils import ( - MAPPING_TYPE, - find_alignment, - get_unicode_category, - is_ipa, - merge_non_word_tokens, - merge_same_type_tokens, - split_non_word_tokens, -) +from g2p.mappings.utils import is_ipa from g2p.shared_types import BaseTokenizer @@ -50,7 +42,7 @@ def is_word_character(self, c): if self.delim and c == self.delim: return True assert len(c) <= 1 - if get_unicode_category(c) in ["letter", "number", "diacritic"]: + if utils.get_unicode_category(c) in ["letter", "number", "diacritic"]: return True return False @@ -65,7 +57,7 @@ def tokenize_text(self, text): and units[i + 1]["is_word"] ): unit["is_word"] = True - return merge_same_type_tokens(units) + return utils.merge_same_type_tokens(units) class SpecializedTokenizer(Tokenizer): @@ -128,7 +120,7 @@ def _recursive_helper(self, tokens: list, output_tokens: list): return for i in range(len(tokens), 0, -1): candidate = "".join([u["text"] for u in tokens[:i]]) - if find_alignment(self.mapping.alignments, candidate.lower()): + if utils.find_alignment(self.mapping.alignments, candidate.lower()): output_tokens.append({"text": candidate, "is_word": True}) return self._recursive_helper(tokens[i:], output_tokens) # No prefix found, emit the first unit as a token @@ -144,10 +136,10 @@ def tokenize_text(self, text): else: default_tokens = super().tokenize_text(block) # Split non-word tokens into smaller parts for lexicon lookup - candidate_tokens = split_non_word_tokens(default_tokens) + candidate_tokens = utils.split_non_word_tokens(default_tokens) self._recursive_helper(candidate_tokens, output_tokens) - return merge_non_word_tokens(output_tokens) + return utils.merge_non_word_tokens(output_tokens) class MultiHopTokenizer(SpecializedTokenizer): @@ -254,7 +246,7 @@ def make_tokenizer( # noqa C901 # Build a one-hop tokenizer try: mapping = Mapping.find_mapping(in_lang=in_lang, out_lang=out_lang) - if mapping.type == MAPPING_TYPE.lexicon: + if mapping.type == utils.MAPPING_TYPE.lexicon: self.tokenizers[tokenizer_key] = LexiconTokenizer(mapping) else: self.tokenizers[tokenizer_key] = SpecializedTokenizer(mapping) From 24a28e00e4c6456ffe0f4349400f84b440a32c5d Mon Sep 17 00:00:00 2001 From: Eric Joanis Date: Mon, 4 Nov 2024 11:16:14 -0500 Subject: [PATCH 6/7] perf: prevent quadratic time cost of degenerate inputs for lexicon-based tok --- g2p/mappings/utils.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/g2p/mappings/utils.py b/g2p/mappings/utils.py index de1d1cfe..2c39d127 100644 --- a/g2p/mappings/utils.py +++ b/g2p/mappings/utils.py @@ -616,19 +616,34 @@ def merge_same_type_tokens(tokens: list) -> list: def split_non_word_tokens(tokens: list) -> list: - """Split non-word units into characters. Destroys tokens in the process. + """Split non-word units into characters. Reuses the word tokens. + + Generates a maximum of 5 units per non-word token: if the input token is + more than 5 non-word characters, the output will be the first two + individually, the middle as a block, and the last two individually, because + lexicon-based tokenization does not need more granularity than that. + This prevents degenerate input like a large number of consecutive punctuation + marks from taking quadratic time in lexicon-based tokenization. >>> split_non_word_tokens([{"text": "test", "is_word": True}, {"text": ":,- ", "is_word": False}, {"text": "", "is_word": False}]) [{'text': 'test', 'is_word': True}, {'text': ':', 'is_word': False}, {'text': ',', 'is_word': False}, {'text': '-', 'is_word': False}, {'text': ' ', 'is_word': False}] >>> split_non_word_tokens([]) [] + >>> split_non_word_tokens([{"text": ".,.,.,.", "is_word": False}]) + [{'text': '.', 'is_word': False}, {'text': ',', 'is_word': False}, {'text': '.,.', 'is_word': False}, {'text': ',', 'is_word': False}, {'text': '.', 'is_word': False}] """ new_tokens = [] for token in tokens: if not token["is_word"]: - new_tokens.extend( - [{"text": char, "is_word": False} for char in token["text"]] - ) + text = token["text"] + if len(text) > 5: + new_tokens.append({"text": text[0], "is_word": False}) + new_tokens.append({"text": text[1], "is_word": False}) + new_tokens.append({"text": text[2:-2], "is_word": False}) + new_tokens.append({"text": text[-2], "is_word": False}) + new_tokens.append({"text": text[-1], "is_word": False}) + else: + new_tokens.extend([{"text": char, "is_word": False} for char in text]) else: new_tokens.append(token) return new_tokens From c3d73bfa8014c34c7dc004637465ca453f9e9cbf Mon Sep 17 00:00:00 2001 From: Eric Joanis Date: Tue, 12 Nov 2024 14:25:57 -0500 Subject: [PATCH 7/7] refactor: change tokens from a a custom dict to a Token class (#406) Also: - resolve ensuing typing errors - Add more typing declarations to make it all coherent - Add a __all__ to g2p/__init__.py because otherwise, mypy doesn't like that we import Token there without using it explicitly: it in indeed imported just so API users can import it, so this is logical. --- docs/package.md | 6 ++--- g2p/__init__.py | 35 +++++++++++++++++------- g2p/api_v2.py | 8 +++--- g2p/mappings/tokenizer.py | 22 +++++++-------- g2p/mappings/utils.py | 48 ++++++++++++++++----------------- g2p/shared_types.py | 54 ++++++++++++++++++++++++++++++++++++- g2p/tests/test_tokenizer.py | 52 +++++++++++++++++------------------ g2p/tests/test_utils.py | 30 +++++++++++++++++++++ g2p/transducer/__init__.py | 10 +++---- 9 files changed, 179 insertions(+), 86 deletions(-) diff --git a/docs/package.md b/docs/package.md index 90a51257..b4bf2c12 100644 --- a/docs/package.md +++ b/docs/package.md @@ -32,10 +32,10 @@ Basic usage for the language-aware tokenizer: from g2p import make_tokenizer tokenizer = make_tokenizer("dan") for token in tokenizer.tokenize_text("Åh, hvordan har du det, Åbenrå?"): - if token["is_word"]: - word = token["text"] + if token.is_word + word = token.text else: - interword_punctuation_and_spaces = token["text"] + interword_punctuation_and_spaces = token.text ``` Note that selecting the tokenizer language is important to make sure punctuation-like letters are handled correctly. For example `:` and `'` are punctuation in English but they will be part of the word tokens in Kanien'kéha (moh): diff --git a/g2p/__init__.py b/g2p/__init__.py index d4513d50..e60a2e13 100644 --- a/g2p/__init__.py +++ b/g2p/__init__.py @@ -16,10 +16,10 @@ from g2p import make_tokenizer tokenizer = make_tokenizer(lang) for token in tokenizer.tokenize_text(input_text): - if token["is_word"]: - word = token["text"] + if token.is_word: + word = token.text else: - interword_punctuation_and_spaces = token["text"] + interword_punctuation_and_spaces = token.text from g2p import get_arpabet_langs LANGS, LANG_NAMES = get_arpabet_langs() @@ -29,7 +29,7 @@ from typing import Dict, Optional, Tuple, Union from g2p.exceptions import InvalidLanguageCode, NoPath -from g2p.shared_types import BaseTokenizer, BaseTransducer +from g2p.shared_types import BaseTokenizer, BaseTransducer, Token if sys.version_info < (3, 7): # pragma: no cover sys.exit( @@ -47,7 +47,7 @@ def make_g2p( # noqa: C901 *, tokenize: bool = True, custom_tokenizer: Optional[BaseTokenizer] = None, -): +) -> BaseTransducer: """Make a g2p Transducer for mapping text from in_lang to out_lang via the shortest path between them. @@ -132,13 +132,13 @@ def make_g2p( # noqa: C901 return transducer -def tokenize_and_map(tokenizer, transducer, input: str): +def tokenize_and_map(tokenizer: BaseTokenizer, transducer: BaseTransducer, input: str): result = "" for token in tokenizer.tokenize_text(input): - if token["is_word"]: - result += transducer(token["text"]).output_string + if token.is_word: + result += transducer(token.text).output_string else: - result += token["text"] + result += token.text return result @@ -213,7 +213,7 @@ def get_arpabet_langs(): return _langs_cache, _lang_names_cache -def make_tokenizer(in_lang=None, out_lang=None, tok_path=None): +def make_tokenizer(in_lang=None, out_lang=None, tok_path=None) -> BaseTokenizer: """Make the tokenizer for input in language in_lang Logic used when only in_lang is provided: @@ -234,3 +234,18 @@ def make_tokenizer(in_lang=None, out_lang=None, tok_path=None): from g2p.mappings.tokenizer import make_tokenizer as _make_tokenizer return _make_tokenizer(in_lang, out_lang, tok_path) + + +# Declare what's actually part of g2p's programmatic API. +# Please don't import anything else from g2p directly. +__all__ = [ + "BaseTokenizer", + "BaseTransducer", + "InvalidLanguageCode", + "NoPath", + "Token", + "get_arpabet_langs", + "make_g2p", + "make_tokenizer", + "tokenize_and_map", +] diff --git a/g2p/api_v2.py b/g2p/api_v2.py index 9ed4551e..680a7f8e 100644 --- a/g2p/api_v2.py +++ b/g2p/api_v2.py @@ -300,7 +300,7 @@ def convert_one_writing_or_phonetic_system_to_another( # noqa: C901 tokenizer = g2p.make_tokenizer(in_lang) tokens = tokenizer.tokenize_text(request.text) else: - tokens = [{"text": request.text, "is_word": True}] + tokens = [g2p.Token(request.text, is_word=True)] except NoPath: raise HTTPException( status_code=400, detail=f"No path from {in_lang} to {out_lang}" @@ -314,8 +314,8 @@ def convert_one_writing_or_phonetic_system_to_another( # noqa: C901 segments: List[Segment] = [] for token in tokens: conversions: List[Conversion] = [] - if not token["is_word"]: # non-word, has no in_lang/out_lang - tg = TransductionGraph(token["text"]) + if not token.is_word: # non-word, has no in_lang/out_lang + tg = TransductionGraph(token.text) conv = Conversion(substring_alignments=tg.substring_alignments()) if request.indices: conv.alignments = tg.alignments() @@ -323,7 +323,7 @@ def convert_one_writing_or_phonetic_system_to_another( # noqa: C901 conv.output_nodes = list(tg.output_string) conversions.append(conv) else: - tg = transducer(token["text"]) + tg = transducer(token.text) if request.compose_from: composed_tiers: List[TransductionGraph] = [] for tr, tier in zip(transducer.transducers, tg.tiers): diff --git a/g2p/mappings/tokenizer.py b/g2p/mappings/tokenizer.py index 5bc67cb1..12b6dd2f 100644 --- a/g2p/mappings/tokenizer.py +++ b/g2p/mappings/tokenizer.py @@ -14,7 +14,7 @@ from g2p.mappings import Mapping, utils from g2p.mappings.langs import LANGS_NETWORK from g2p.mappings.utils import is_ipa -from g2p.shared_types import BaseTokenizer +from g2p.shared_types import BaseTokenizer, Token class Tokenizer(BaseTokenizer): @@ -46,17 +46,13 @@ def is_word_character(self, c): return True return False - def tokenize_text(self, text): + def tokenize_text(self, text: str) -> List[Token]: matches = self.tokenize_aux(text) - units = [{"text": m, "is_word": self.is_word_character(m)} for m in matches] + units = [Token(m, self.is_word_character(m)) for m in matches] if self.dot_is_letter: for i, unit in enumerate(units): - if ( - unit["text"] == "." - and i + 1 < len(units) - and units[i + 1]["is_word"] - ): - unit["is_word"] = True + if unit.text == "." and i + 1 < len(units) and units[i + 1].is_word: + unit.is_word = True return utils.merge_same_type_tokens(units) @@ -119,20 +115,20 @@ def _recursive_helper(self, tokens: list, output_tokens: list): output_tokens.append(tokens[0]) return for i in range(len(tokens), 0, -1): - candidate = "".join([u["text"] for u in tokens[:i]]) + candidate = "".join([u.text for u in tokens[:i]]) if utils.find_alignment(self.mapping.alignments, candidate.lower()): - output_tokens.append({"text": candidate, "is_word": True}) + output_tokens.append(Token(candidate, True)) return self._recursive_helper(tokens[i:], output_tokens) # No prefix found, emit the first unit as a token output_tokens.append(tokens[0]) self._recursive_helper(tokens[1:], output_tokens) - def tokenize_text(self, text): + def tokenize_text(self, text: str) -> List[Token]: blocks = re.split(r"(\s+)", text) output_tokens = [] for i, block in enumerate(blocks): if i % 2 == 1 and block: - output_tokens.append({"text": block, "is_word": False}) + output_tokens.append(Token(block, False)) else: default_tokens = super().tokenize_text(block) # Split non-word tokens into smaller parts for lexicon lookup diff --git a/g2p/mappings/utils.py b/g2p/mappings/utils.py index 2c39d127..7b248dc1 100644 --- a/g2p/mappings/utils.py +++ b/g2p/mappings/utils.py @@ -42,6 +42,7 @@ from g2p import exceptions from g2p.log import LOGGER from g2p.mappings import langs +from g2p.shared_types import Token GEN_DIR = os.path.join(os.path.dirname(langs.__file__), "generated") GEN_CONFIG = os.path.join(GEN_DIR, "config-g2p.yaml") @@ -595,12 +596,11 @@ def ignore_aliases(self, *_args): return True -def merge_same_type_tokens(tokens: list) -> list: +def merge_same_type_tokens(tokens: List[Token]) -> List[Token]: """Merge tokens that have the same type. Destroys tokens in the process. - Tokens are represented as dicts {"text": str, "is_word": bool}. - >>> merge_same_type_tokens([{"text": "test", "is_word": True}, {"text": "b", "is_word": True}, {"text": ":", "is_word": False}, {"text": ",", "is_word": False}]) - [{'text': 'testb', 'is_word': True}, {'text': ':,', 'is_word': False}] + >>> merge_same_type_tokens([Token("test", True), Token("b", True), Token(":", False), Token(",", False)]) + [Token(text='testb', is_word=True), Token(text=':,', is_word=False)] >>> merge_same_type_tokens([]) [] """ @@ -608,14 +608,14 @@ def merge_same_type_tokens(tokens: list) -> list: return [] merged_tokens = [tokens[0]] for token in tokens[1:]: - if token["is_word"] == merged_tokens[-1]["is_word"]: - merged_tokens[-1]["text"] += token["text"] + if token.is_word == merged_tokens[-1].is_word: + merged_tokens[-1].text += token.text else: merged_tokens.append(token) return merged_tokens -def split_non_word_tokens(tokens: list) -> list: +def split_non_word_tokens(tokens: List[Token]) -> List[Token]: """Split non-word units into characters. Reuses the word tokens. Generates a maximum of 5 units per non-word token: if the input token is @@ -625,35 +625,35 @@ def split_non_word_tokens(tokens: list) -> list: This prevents degenerate input like a large number of consecutive punctuation marks from taking quadratic time in lexicon-based tokenization. - >>> split_non_word_tokens([{"text": "test", "is_word": True}, {"text": ":,- ", "is_word": False}, {"text": "", "is_word": False}]) - [{'text': 'test', 'is_word': True}, {'text': ':', 'is_word': False}, {'text': ',', 'is_word': False}, {'text': '-', 'is_word': False}, {'text': ' ', 'is_word': False}] + >>> split_non_word_tokens([Token("test", True), Token(":,- ", False), Token("", False)]) + [Token(text='test', is_word=True), Token(text=':', is_word=False), Token(text=',', is_word=False), Token(text='-', is_word=False), Token(text=' ', is_word=False)] >>> split_non_word_tokens([]) [] - >>> split_non_word_tokens([{"text": ".,.,.,.", "is_word": False}]) - [{'text': '.', 'is_word': False}, {'text': ',', 'is_word': False}, {'text': '.,.', 'is_word': False}, {'text': ',', 'is_word': False}, {'text': '.', 'is_word': False}] + >>> split_non_word_tokens([Token(".,.,.,.", False)]) + [Token(text='.', is_word=False), Token(text=',', is_word=False), Token(text='.,.', is_word=False), Token(text=',', is_word=False), Token(text='.', is_word=False)] """ new_tokens = [] for token in tokens: - if not token["is_word"]: - text = token["text"] + if not token.is_word: + text = token.text if len(text) > 5: - new_tokens.append({"text": text[0], "is_word": False}) - new_tokens.append({"text": text[1], "is_word": False}) - new_tokens.append({"text": text[2:-2], "is_word": False}) - new_tokens.append({"text": text[-2], "is_word": False}) - new_tokens.append({"text": text[-1], "is_word": False}) + new_tokens.append(Token(text[0], False)) + new_tokens.append(Token(text[1], False)) + new_tokens.append(Token(text[2:-2], False)) + new_tokens.append(Token(text[-2], False)) + new_tokens.append(Token(text[-1], False)) else: - new_tokens.extend([{"text": char, "is_word": False} for char in text]) + new_tokens.extend([Token(char, False) for char in text]) else: new_tokens.append(token) return new_tokens -def merge_non_word_tokens(tokens: list) -> list: +def merge_non_word_tokens(tokens: List[Token]) -> List[Token]: """Merge consecutive non-word units into a single token. Destroys tokens in the process. - >>> merge_non_word_tokens([{"text": "test", "is_word": True}, {"text": ":", "is_word": False}, {"text": ",", "is_word": False}]) - [{'text': 'test', 'is_word': True}, {'text': ':,', 'is_word': False}] + >>> merge_non_word_tokens([Token("test", True), Token(":", False), Token(",", False)]) + [Token(text='test', is_word=True), Token(text=':,', is_word=False)] >>> merge_non_word_tokens([]) [] """ @@ -661,8 +661,8 @@ def merge_non_word_tokens(tokens: list) -> list: return tokens merged_tokens = [tokens[0]] for token in tokens[1:]: - if not token["is_word"] and not merged_tokens[-1]["is_word"]: - merged_tokens[-1]["text"] += token["text"] + if not token.is_word and not merged_tokens[-1].is_word: + merged_tokens[-1].text += token.text else: merged_tokens.append(token) return merged_tokens diff --git a/g2p/shared_types.py b/g2p/shared_types.py index 2472d0fb..1296c35f 100644 --- a/g2p/shared_types.py +++ b/g2p/shared_types.py @@ -4,6 +4,43 @@ """ from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import List + +from typing_extensions import deprecated + + +@dataclass +class Token: + """A token from the g2p tokenizer.""" + + text: str + is_word: bool + + @deprecated( + "Accessing g2p Token objects as dicts is deprecated since g2p 2.2.0. " + "Please use the 'text' and 'is_word' attributes instead.", + ) + def __getitem__(self, key): + """For backward compatibility only, allow access as if it were a dict.""" + if key == "text": + return self.text + if key == "is_word": + return self.is_word + raise KeyError(key) + + @deprecated( + "Accessing g2p Token objects as dicts is deprecated since g2p 2.2.0. " + "Please use the 'text' and 'is_word' attributes instead.", + ) + def __setitem__(self, key, value): + """For backward compatibility only, allow setting values as if it were a dict.""" + if key == "text": + self.text = value + elif key == "is_word": + self.is_word = value + else: + raise KeyError(key) class BaseTransducer(ABC): @@ -13,6 +50,21 @@ class BaseTransducer(ABC): def __call__(self, to_convert: str): """Transduce to_convert.""" + @property + @abstractmethod + def transducers(self): + """A list of BaseTransducer objects for each tier in the transducer.""" + + @property + @abstractmethod + def in_lang(self) -> str: + """The input language code of the transducer.""" + + @property + @abstractmethod + def out_lang(self) -> str: + """The output language code of the transducer.""" + class BaseTransductionGraph(ABC): """Base class to typecheck transduction graphs without having to import them.""" @@ -27,5 +79,5 @@ class BaseTokenizer(ABC): """Base class to typecheck tokenizers without having to import them.""" @abstractmethod - def tokenize_text(self, text): + def tokenize_text(self, text: str) -> List[Token]: """Tokenize text.""" diff --git a/g2p/tests/test_tokenizer.py b/g2p/tests/test_tokenizer.py index 812f0062..b15b8611 100755 --- a/g2p/tests/test_tokenizer.py +++ b/g2p/tests/test_tokenizer.py @@ -17,32 +17,32 @@ def test_tokenize_fra(self): tokenizer = tok.make_tokenizer("fra") tokens = tokenizer.tokenize_text(input) self.assertEqual(len(tokens), 8) - self.assertTrue(tokens[0]["is_word"]) - self.assertEqual(tokens[0]["text"], "ceci") - self.assertFalse(tokens[1]["is_word"]) - self.assertEqual(tokens[1]["text"], " ") - self.assertTrue(tokens[2]["is_word"]) - self.assertEqual(tokens[2]["text"], "était") - self.assertFalse(tokens[3]["is_word"]) - self.assertEqual(tokens[3]["text"], " '") - self.assertTrue(tokens[4]["is_word"]) - self.assertEqual(tokens[4]["text"], "un") - self.assertFalse(tokens[5]["is_word"]) - self.assertEqual(tokens[5]["text"], "' ") - self.assertTrue(tokens[6]["is_word"]) - self.assertEqual(tokens[6]["text"], "test") - self.assertFalse(tokens[7]["is_word"]) - self.assertEqual(tokens[7]["text"], ".") + self.assertTrue(tokens[0].is_word) + self.assertEqual(tokens[0].text, "ceci") + self.assertFalse(tokens[1].is_word) + self.assertEqual(tokens[1].text, " ") + self.assertTrue(tokens[2].is_word) + self.assertEqual(tokens[2].text, "était") + self.assertFalse(tokens[3].is_word) + self.assertEqual(tokens[3].text, " '") + self.assertTrue(tokens[4].is_word) + self.assertEqual(tokens[4].text, "un") + self.assertFalse(tokens[5].is_word) + self.assertEqual(tokens[5].text, "' ") + self.assertTrue(tokens[6].is_word) + self.assertEqual(tokens[6].text, "test") + self.assertFalse(tokens[7].is_word) + self.assertEqual(tokens[7].text, ".") def test_tokenize_eng(self): input = "This is éçà test." tokenizer = tok.make_tokenizer("eng") tokens = tokenizer.tokenize_text(input) self.assertEqual(len(tokens), 8) - self.assertTrue(tokens[0]["is_word"]) - self.assertEqual(tokens[0]["text"], "This") - self.assertFalse(tokens[1]["is_word"]) - self.assertEqual(tokens[1]["text"], " ") + self.assertTrue(tokens[0].is_word) + self.assertEqual(tokens[0].text, "This") + self.assertFalse(tokens[1].is_word) + self.assertEqual(tokens[1].text, " ") def test_lexicon_tokenizer(self): tokenizer = tok.make_tokenizer("eng") @@ -60,7 +60,7 @@ def test_lexicon_tokenizer(self): for input_text, expected_tokens in tests: with self.subTest(input_text=input_text): tokens = tokenizer.tokenize_text(input_text) - self.assertEqual([x["text"] for x in tokens], expected_tokens) + self.assertEqual([x.text for x in tokens], expected_tokens) def test_tokenize_win(self): """win is easy to tokenize because win -> win-ipa exists and has ' in its inventory""" @@ -70,8 +70,8 @@ def test_tokenize_win(self): tokenizer = tok.make_tokenizer("win") tokens = tokenizer.tokenize_text(input) self.assertEqual(len(tokens), 1) - self.assertTrue(tokens[0]["is_word"]) - self.assertEqual(tokens[0]["text"], "p'ōį̄ą") + self.assertTrue(tokens[0].is_word) + self.assertEqual(tokens[0].text, "p'ōį̄ą") def test_tokenize_tce(self): """tce is hard to tokenize correctly because we have tce -> tce-equiv -> tce-ipa, and ' is @@ -89,14 +89,14 @@ def test_tokenize_tce(self): tokenizer = tok.make_tokenizer("tce") tokens = tokenizer.tokenize_text(input) self.assertEqual(len(tokens), 1) - self.assertTrue(tokens[0]["is_word"]) - self.assertEqual(tokens[0]["text"], "ts'nj") + self.assertTrue(tokens[0].is_word) + self.assertEqual(tokens[0].text, "ts'nj") def test_tokenize_tce_equiv(self): input = "ts'e ts`e ts‘e ts’" self.assertEqual(len(tok.make_tokenizer("fra").tokenize_text(input)), 14) # tce_tokens = tok.make_tokenizer("tce").tokenize_text(input) - # LOGGER.warning([x["text"] for x in tce_tokens]) + # LOGGER.warning([x.text for x in tce_tokens]) self.assertEqual(len(tok.make_tokenizer("tce").tokenize_text(input)), 7) def test_tokenizer_identity_tce(self): diff --git a/g2p/tests/test_utils.py b/g2p/tests/test_utils.py index 037c6ffb..44c8ce2d 100755 --- a/g2p/tests/test_utils.py +++ b/g2p/tests/test_utils.py @@ -4,9 +4,11 @@ """ import doctest +import io import os import re from collections import defaultdict +from contextlib import redirect_stderr from pathlib import Path from unittest import TestCase, main @@ -327,6 +329,34 @@ def test_scm_pretend_version_is_up_to_date(self): # This is fine, it's only used in development pass + def test_token_class(self): + from g2p.shared_types import Token + + t1 = Token("test", True) + t2 = Token(":", False) + + f = io.StringIO() + with redirect_stderr(f): + # Current usage and deprecated usage + for t in t1, t2: + self.assertEqual(t.text, t["text"]) + self.assertEqual(t.is_word, t["is_word"]) + # new way to set + t1.text = "test2" + t1.is_word = False + self.assertEqual(t1.text, "test2") + self.assertEqual(t1.is_word, False) + # deprecated way to set + t1["text"] = "test3" + t1["is_word"] = True + self.assertEqual(t1.text, "test3") + self.assertEqual(t1.is_word, True) + + with self.assertRaises(KeyError): + t1["bad_key"] = "test" + with self.assertRaises(KeyError): + _ = t2["bad_key"] + if __name__ == "__main__": main() diff --git a/g2p/transducer/__init__.py b/g2p/transducer/__init__.py index e02835a9..8a167660 100644 --- a/g2p/transducer/__init__.py +++ b/g2p/transducer/__init__.py @@ -1219,11 +1219,11 @@ def __call__(self, to_convert: str): tg.clear_debugger() # clear the meaningless initial debugger for token in self._tokenizer.tokenize_text(to_convert): - if token["is_word"]: - word_tg = self._transducer(token["text"]) + if token.is_word: + word_tg = self._transducer(token.text) tg += word_tg else: - non_word_tg = TransductionGraph(token["text"]) + non_word_tg = TransductionGraph(token.text) tg += non_word_tg return tg @@ -1256,8 +1256,8 @@ def check(self, tg: TransductionGraph, shallow=False, display_warnings=False): # by step. I don't like this solution, but I don't see how to get around it. result = True for token in self._tokenizer.tokenize_text(tg.input_string): - if token["is_word"] and not self._transducer.check( - self._transducer(token["text"]), + if token.is_word and not self._transducer.check( + self._transducer(token.text), shallow, display_warnings=display_warnings, ):