Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Improve chardict.py to add n-gram counts #9

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ json:
@mkdir -p json
@echo "Creating JSON dicts..."
@bin/chardict.py
@echo "Merging JSON dicts..."
@echo "... de_modern"
@bin/merge.py txt/deu_*.json > json/de_modern.json
@echo "... en_modern"
Expand Down
154 changes: 97 additions & 57 deletions bin/chardict.py
Original file line number Diff line number Diff line change
@@ -1,78 +1,118 @@
#!/usr/bin/env python3
"""Turn corpus texts into dictionaries of symbols, bigrams and trigrams."""
"""Turn corpus texts into dictionaries of n-grams."""

import json
from os import listdir, path
from pathlib import Path
from sys import argv

IGNORED_CHARS = "1234567890 \t\r\n\ufeff"


def parse_corpus(file_path):
"""Count symbols, bigrams and trigrams in a text file."""

symbols = {}
bigrams = {}
trigrams = {}
char_count = 0
prev_symbol = None
prev_prev_symbol = None

# get a dictionary of all symbols (letters, punctuation marks...)
file = open(file_path, "r", encoding="utf-8")
for char in file.read():
symbol = char.lower()
if char not in IGNORED_CHARS:
char_count += 1
if symbol not in symbols:
symbols[symbol] = 0
symbols[symbol] += 1
if prev_symbol is not None:
bigram = prev_symbol + symbol
if bigram not in bigrams:
bigrams[bigram] = 0
bigrams[bigram] += 1
if prev_prev_symbol is not None:
trigram = prev_prev_symbol + bigram
if trigram not in trigrams:
trigrams[trigram] = 0
trigrams[trigram] += 1
prev_prev_symbol = prev_symbol
prev_symbol = symbol
else:
prev_symbol = None
file.close()
NGRAM_MAX_LENGTH = 4 # trigrams
IGNORED_CHARS = "1234567890 \t\r\n\ufeff↵"
APP_NAME = "kalamine"
APP_AUTHOR = "1dk"


def parse_corpus(txt: str) -> dict:
"""Count ngrams in a string.
retuns a dict of ngrams
ngrams[1]=symbols
ngrams[2]=bigrames
ngrams[3]=trigrams
etc., up to NGRAM_MAX_LENGTH
ngrams[2] is shaped as { "aa": count }
"""

ngrams = {}
ngrams_count = {} # ngrams_count counts the total number of ngrams[i] in corpus.

txt = txt.lower() # we want to be case **in**sensitive

for ngram in range(1, NGRAM_MAX_LENGTH):
ngrams[ngram] = {}
ngrams_count[ngram] = 0

def get_ngram(txt: str, ngram_start: int, ngram_length: int) -> str:
"""get a ngram of a given length at given position in txt
returns empty string if ngram cannot be provided"""
if txt[ngram_start] in IGNORED_CHARS:
return ""
if ngram_length <= 0:
return ""
if ngram_start + ngram_length >= len(txt):
return ""

ngram = txt[ngram_start : ngram_start + ngram_length]

for n in ngram[1:]: # 1st char already tested
if n in IGNORED_CHARS:
return ""

return ngram

# get all n-grams
for ngram_start in range(len(txt)):
for ngram_length in range(NGRAM_MAX_LENGTH):
_ngram = get_ngram(txt, ngram_start, ngram_length)

if not _ngram: # _ngram is ""
continue

if _ngram not in ngrams[ngram_length]:
ngrams[ngram_length][_ngram] = 0

ngrams[ngram_length][_ngram] += 1
ngrams_count[ngram_length] += 1
Comment on lines +49 to +61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works and the logic is simple (I like that), but I’m not a fan of how it’s implemented. If that’s okay with you:

  • please address the rest of this review
  • rebase on the latest main branch (this PR is based on a previous version)
  • and I’ll add a commit to your PR to suggest another implementation of the same logic.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eager to see what changes you are thinking about.
I was pretty sure this was simple enough to be accepted but maybe it’s performance-wise that you would like to improve things ?


# sort the dictionary by symbol frequency (requires CPython 3.6+)
def sort_by_frequency(table, precision=3):
def sort_by_frequency(table: dict, char_count: int, precision: int = 3) -> dict:
sorted_dict = {}
for key, count in sorted(table.items(), key=lambda x: -x[1]):
freq = round(100 * count / char_count, precision)
if freq > 0:
sorted_dict[key] = freq
return sorted_dict

results = {}
results["corpus"] = file_path
results["symbols"] = sort_by_frequency(symbols)
results["bigrams"] = sort_by_frequency(bigrams, 4)
results["trigrams"] = sort_by_frequency(trigrams)
return results
for ngram in range(1, NGRAM_MAX_LENGTH):
ngrams[ngram] = sort_by_frequency(ngrams[ngram], ngrams_count[ngram], 4)

return ngrams, ngrams_count


def read_corpus(file: Path, name: str = "", encoding="utf-8") -> dict:
"""read a .txt file and provide a dictionary of n-grams"""
try:
if not file.is_file:
raise Exception("Error, this is not a file")
if not name:
name = file.stem
with file.open("r", encoding=encoding) as f:
corpus_txt = "↵".join(f.readlines())

except Exception as e:
print(f"file does not exist or could not be read.\n {e}")

ngrams_freq, ngrams_count = parse_corpus(corpus_txt)
return {
"name": name,
"freq": ngrams_freq,
"count": ngrams_count,
}


if __name__ == "__main__":
if len(argv) == 2: # convert one file
data = parse_corpus(argv[1])
file = Path(argv[1])
data = read_corpus(file)
output_file_path = file.parent / f"{file.stem}.json"
with open(output_file_path, "w", encoding="utf-8") as outfile:
json.dump(data, outfile, indent=4, ensure_ascii=False)
print(json.dumps(data, indent=4, ensure_ascii=False))

else: # converts all *.txt files in the script directory
bin_dir = path.dirname(__file__)
destdir = path.join(bin_dir, "..", "txt")
txtdir = path.join(bin_dir, "..", "txt")
for filename in listdir(txtdir):
if filename.endswith(".txt"):
basename = filename[:-4]
print(f"... {basename}")
data = parse_corpus(path.join(txtdir, filename))
destfile = path.join(destdir, basename + ".json")
with open(destfile, "w", encoding="utf-8") as outfile:
txt_dir = Path(__file__).resolve().parent.parent / "txt"
for file in sorted(txt_dir.glob("*.txt")):
if file.is_file():
print(f"... {file.stem}")
data = read_corpus(file)
output_file_path = txt_dir / f"{file.stem}.json"
with open(output_file_path, "w", encoding="utf-8") as outfile:
json.dump(data, outfile, indent=4, ensure_ascii=False)
120 changes: 89 additions & 31 deletions bin/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,101 @@

import json
from sys import argv
from pathlib import Path


def merge(filenames, filecount):
merged = {
"symbols": {},
"bigrams": {},
"trigrams": {},
}
# sort the merged dictionary by symbol frequency (requires CPython 3.6+)
def _sort_ngram_by_frequency(table, precision=3):
sorted_dict = {}
for key, count in sorted(table.items(), key=lambda x: -x[1]):
freq = round(count, precision)
if freq > 0:
sorted_dict[key] = freq
return sorted_dict

# merge dictionaries

def sort_by_frequency(corpus: dict, precision=3):
for ngram in range(1, len(corpus["freq"].keys())+1):
ngram = str(ngram)
corpus["freq"][ngram] = _sort_ngram_by_frequency(
corpus["freq"][ngram], precision
)
return corpus

def read_corpora(filenames: list[Path]) -> list[dict]:
"""open a collection of corpus from path and dump its content in a dictionary"""
corpora_dict = {}
for filename in filenames:
with open(filename, "r") as corpus:
data = json.load(corpus)
for section in merged.keys():
for key, count in data[section].items():
if key not in merged[section]:
merged[section][key] = 0.0
merged[section][key] += count / filecount

# sort the merged dictionary by symbol frequency (requires CPython 3.6+)
def sort_by_frequency(table, precision=2):
sorted_dict = {}
for key, count in sorted(table.items(), key=lambda x: -x[1]):
freq = round(count, precision)
if freq > 0:
sorted_dict[key] = freq
return sorted_dict

results = {}
results["corpus"] = ""
results["symbols"] = sort_by_frequency(merged["symbols"])
results["bigrams"] = sort_by_frequency(merged["bigrams"], 4)
results["trigrams"] = sort_by_frequency(merged["trigrams"])
return results
try:
with open(filename) as f:
corpus = json.load(f)
corpora_dict[corpus["name"]] = corpus
except:
print(
f"Warning: cannot open the `{filename.stem}` corpus; skipping this file"
)
continue

if len(corpora_dict) < 2:
print("Error: at least 2 corpuses are needed to merge, aborting")
return []

# removing corpus that do not have the same ngram lenght
ngram_length = len( # 1st corpus in corpora
next(iter(corpora_dict.values()))["freq"]
)
for key in corpora_dict.keys():
corpus = corpora_dict[key]
if len(corpus["freq"]) != ngram_length:
_name = corpus["name"]
corpora_dict.pop(_name)
print(f"Warning: removing {_name} from corpora because ngram length is different")

if len(corpora_dict) >= 2:
return list(corpora_dict.values())

print("Error: at least 2 corpuses are needed to merge, aborting")
return []

def mix(corpora:list[dict], name:str="mixed", ratio:list[float]=[]) -> dict:
"""merge corpora of same n-gram length, optionally with a giver ratio"""
if ratio == []:
# merge with same weight by default
ratio = [ 1/len(corpora) ] * len(corpora)
elif round(sum(ratio),1) != 1:
print("Error: provided merge ratio do not add-up to 1; aborting merge")

output_corpus = corpora[0].copy()
output_corpus["name"] = name

# manage 1st corpus
for index in output_corpus["freq"]:
n = str(index)
for ngram in output_corpus["freq"][n]:
output_corpus["freq"][n][ngram] *= ratio[0]

# manage others
for index in range(1, len(output_corpus["freq"].keys()) +1):
n = str(index)
for corpus_index, corpus in enumerate(corpora[1:]):
print(corpus, corpus_index)
output_corpus["count"][n] += corpus["count"][n]

for ngram in corpus["freq"][n]:
if ngram not in output_corpus["freq"][n]:
output_corpus["freq"][n][ngram] = 0
output_corpus["freq"][n][ngram] += corpus["freq"][n][ngram] * ratio[corpus_index]
return output_corpus



if __name__ == "__main__":
argl = len(argv) - 1 # number of files to merge
if argl >= 2:
print(json.dumps(merge(argv[1:], argl), indent=4, ensure_ascii=False))
dir = Path(__file__).resolve().parent.parent
files = [Path(f) for f in argv[1:]]
corpora = read_corpora(files)
corpus = mix(corpora, name="mixed")
with open(f"{corpus["name"]}.json", "w", encoding="utf-8") as outfile:
json.dump(corpus, outfile, indent=4, ensure_ascii=False)
print(json.dumps(corpus, indent=4, ensure_ascii=False))