-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathsearch.py
76 lines (63 loc) · 2.33 KB
/
search.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
from pprint import pprint
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import ConnectionError
from decouple import config
from managers.home_manager import HomeManager
from schemas.response.home_response import HomeResponseSchema
mapping = {
"mappings": {
"properties": {
"pin": {"properties": {"location": {"type": "geo_point"}}},
}
}
}
class Search:
_connected: bool = True
def __init__(self) -> None:
try:
# Cloud confguration
# self.es = Elasticsearch(
# api_key=config("ELASTIC_API_KEY"), cloud_id=config("ELASTIC_CLOUD_ID")
# )
# Container config without pass and keys
self.es = Elasticsearch(config("ELASTIC_HOST"))
client_info = self.es.info()
print("Connected to Elasticsearch!")
pprint(client_info.body)
except ConnectionError:
self._connected = False
print("Can't connect")
def create_index(self):
self.es.indices.delete(index="real_estate_homes", ignore_unavailable=True)
self.es.indices.create(index="real_estate_homes", body=mapping)
def insert_document(self, document):
return self.es.index(index="real_estate_homes", body=document)
def insert_documents(self, documents):
operations = []
for document in documents:
operations.append({"index": {"_index": "real_estate_homes"}})
operations.append(document)
return self.es.bulk(operations=operations)
def reindex_homes(self):
if not self._connected:
return
self.create_index()
homes = HomeManager.select_all_homes()
for home in homes:
if home.latitude and home.longitude:
home.pin = {
"location": {
"lat": float(home.latitude),
"lon": float(home.longitude),
}
}
else:
home.pin = None
resp_schema = HomeResponseSchema()
resp = resp_schema.dump(homes, many=True)
return self.insert_documents(resp)
def search(self, **query_args):
if self._connected:
return self.es.search(index="real_estate_homes", **query_args)
return None
es = Search()