This repository was archived by the owner on Jun 10, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathapp.py
124 lines (105 loc) · 3.52 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
__copyright__ = "Copyright (c) 2021 Jina AI Limited. All rights reserved."
__license__ = "Apache-2.0"
import os
import itertools
import csv
import shutil
import click
import sys
from backend_config import (
text_length,
max_docs,
backend_datafile,
backend_port,
backend_workdir,
)
from executors import MyTransformer, DiskIndexer
from jina import Flow, Document
try:
__import__("pretty_errors")
except ImportError:
pass
def trim_string(
input_string: str, word_count: int = text_length, sep: str = " "
) -> str:
"""
Trim a string to a certain number of words.
:param input_string: string to trim
:param word_count: how many words to trim to
:param sep: separator between words
:return: trimmmed string
"""
sanitized_string = input_string.replace("\\n", sep)
words = sanitized_string.split(sep)[:word_count]
trimmed_string = " ".join(words)
return trimmed_string
def prep_docs(input_file: str, max_docs=max_docs):
"""
Create DocumentArray consisting of every row in csv as a Document
:param input_file: Input csv filename
:return: populated Document Generator
"""
with open(input_file, "r") as csv_file:
csv_reader = csv.DictReader(csv_file)
input_field = "Description"
for row in itertools.islice(csv_reader, max_docs):
input_data = trim_string(row[input_field])
doc = Document(text=input_data)
doc.tags = row
yield doc
def index():
flow = (
Flow()
.add(uses=MyTransformer, parallel=2, name="encoder")
.add(uses=DiskIndexer, workspace=backend_workdir, name="indexer")
)
with flow:
flow.post(
on="/index",
inputs=prep_docs(input_file=backend_datafile, max_docs=max_docs),
request_size=64,
read_mode="r",
)
def query_restful():
flow = (
Flow()
.add(uses=MyTransformer, name="encoder")
.add(uses=DiskIndexer, workspace=backend_workdir, name="indexer")
)
with flow:
flow.protocol = "http"
flow.port_expose = backend_port
flow.block()
@click.command()
@click.option(
"--task",
"-t",
type=click.Choice(["index", "query_restful"], case_sensitive=False),
)
@click.option("--num_docs", "-n", default=max_docs)
@click.option("--force", "-f", is_flag=True)
def main(task: str, num_docs: int, force: bool):
workspace = backend_workdir
if task == "index":
if os.path.exists(workspace):
if force:
shutil.rmtree(workspace)
else:
print(
f"\n +----------------------------------------------------------------------------------+ \
\n | 🤖🤖🤖 | \
\n | The directory {workspace} already exists. Please remove it before indexing again. | \
\n | 🤖🤖🤖 | \
\n +----------------------------------------------------------------------------------+"
)
sys.exit(1)
index()
if task == "query_restful":
if not os.path.exists(workspace):
print(
f"The directory {workspace} does not exist. Please index first via `python app.py -t index`"
)
sys.exit(1)
query_restful()
if __name__ == "__main__":
main()