This repository was archived by the owner on Jun 10, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathexecutors.py
184 lines (145 loc) · 6 KB
/
executors.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
from typing import Optional, Dict, Tuple
from backend_config import backend_model, backend_top_k
import numpy as np
import os
import torch
from transformers import AutoModel, AutoTokenizer
from jina import Executor, DocumentArray, requests, Document
## READ THIS: This is almost copied 100% from the chatbot example. I don't understand what's _actually_ happening in these Executors, especially the Transformer.
# When Jina Hub 2.0 is released we can remove this file and simply get Executors direct from the Hub
# The Executors below are deliberately simple and not built for power use (e.g. indexer stores everything in memory, so have to re-index on every run)
class MyTransformer(Executor):
"""Transformer executor class """
def __init__(
self,
pretrained_model_name_or_path: str = backend_model,
base_tokenizer_model: Optional[str] = None,
pooling_strategy: str = "mean",
layer_index: int = -1,
max_length: Optional[int] = None,
acceleration: Optional[str] = None,
embedding_fn_name: str = "__call__",
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.pretrained_model_name_or_path = pretrained_model_name_or_path
self.base_tokenizer_model = (
base_tokenizer_model or pretrained_model_name_or_path
)
self.pooling_strategy = pooling_strategy
self.layer_index = layer_index
self.max_length = max_length
self.acceleration = acceleration
self.embedding_fn_name = embedding_fn_name
self.tokenizer = AutoTokenizer.from_pretrained(self.base_tokenizer_model)
self.model = AutoModel.from_pretrained(
self.pretrained_model_name_or_path, output_hidden_states=True
)
self.model.to(torch.device("cpu"))
def _compute_embedding(self, hidden_states: "torch.Tensor", input_tokens: Dict):
import torch
fill_vals = {"cls": 0.0, "mean": 0.0, "max": -np.inf, "min": np.inf}
fill_val = torch.tensor(
fill_vals[self.pooling_strategy], device=torch.device("cpu")
)
layer = hidden_states[self.layer_index]
attn_mask = input_tokens["attention_mask"].unsqueeze(-1).expand_as(layer)
layer = torch.where(attn_mask.bool(), layer, fill_val)
embeddings = layer.sum(dim=1) / attn_mask.sum(dim=1)
return embeddings.cpu().numpy()
@requests
def encode(self, docs: "DocumentArray", *args, **kwargs):
import torch
with torch.no_grad():
if not self.tokenizer.pad_token:
self.tokenizer.add_special_tokens({"pad_token": "[PAD]"})
self.model.resize_token_embeddings(len(self.tokenizer.vocab))
input_tokens = self.tokenizer(
docs.get_attributes("content"),
max_length=self.max_length,
padding="longest",
truncation=True,
return_tensors="pt",
)
input_tokens = {
k: v.to(torch.device("cpu")) for k, v in input_tokens.items()
}
outputs = getattr(self.model, self.embedding_fn_name)(**input_tokens)
if isinstance(outputs, torch.Tensor):
return outputs.cpu().numpy()
hidden_states = outputs.hidden_states
embeds = self._compute_embedding(hidden_states, input_tokens)
for doc, embed in zip(docs, embeds):
doc.embedding = embed
def _get_ones(x, y):
return np.ones((x, y))
def _ext_A(A):
nA, dim = A.shape
A_ext = _get_ones(nA, dim * 3)
A_ext[:, dim : 2 * dim] = A
A_ext[:, 2 * dim :] = A ** 2
return A_ext
def _ext_B(B):
nB, dim = B.shape
B_ext = _get_ones(dim * 3, nB)
B_ext[:dim] = (B ** 2).T
B_ext[dim : 2 * dim] = -2.0 * B.T
del B
return B_ext
def _euclidean(A_ext, B_ext):
sqdist = A_ext.dot(B_ext).clip(min=0)
return np.sqrt(sqdist)
def _norm(A):
return A / np.linalg.norm(A, ord=2, axis=1, keepdims=True)
def _cosine(A_norm_ext, B_norm_ext):
return A_norm_ext.dot(B_norm_ext).clip(min=0) / 2
class DiskIndexer(Executor):
"""Simple indexer class """
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._docs = DocumentArray()
self.top_k = backend_top_k
if os.path.exists(self.save_path):
self._docs = DocumentArray.load(self.save_path)
else:
self._docs = DocumentArray()
@property
def save_path(self):
if not os.path.exists(self.workspace):
os.makedirs(self.workspace)
return os.path.join(self.workspace, 'apps.json')
def close(self):
self._docs.save(self.save_path)
@requests(on="/index")
def index(self, docs: "DocumentArray", **kwargs):
self._docs.extend(docs)
return docs
@requests(on="/search")
def search(self, docs: "DocumentArray", **kwargs):
a = np.stack(docs.get_attributes("embedding"))
b = np.stack(self._docs.get_attributes("embedding"))
q_emb = _ext_A(_norm(a))
d_emb = _ext_B(_norm(b))
dists = _cosine(q_emb, d_emb)
idx, dist = self._get_sorted_top_k(dists, self.top_k)
for _q, _ids, _dists in zip(docs, idx, dist):
for _id, _dist in zip(_ids, _dists):
d = Document(self._docs[int(_id)], copy=True)
# d.score.value = 1 - _dist
_q.matches.append(d)
return docs
@staticmethod
def _get_sorted_top_k(
dist: "np.array", top_k: int
) -> Tuple["np.ndarray", "np.ndarray"]:
if top_k >= dist.shape[1]:
idx = dist.argsort(axis=1)[:, :top_k]
dist = np.take_along_axis(dist, idx, axis=1)
else:
idx_ps = dist.argpartition(kth=top_k, axis=1)[:, :top_k]
dist = np.take_along_axis(dist, idx_ps, axis=1)
idx_fs = dist.argsort(axis=1)
idx = np.take_along_axis(idx_ps, idx_fs, axis=1)
dist = np.take_along_axis(dist, idx_fs, axis=1)
return idx, dist