Skip to content

Commit fd46bc1

Browse files
author
Alberto Paro
committed
Initial bulker implementation
1 parent 872cfe9 commit fd46bc1

File tree

5 files changed

+128
-65
lines changed

5 files changed

+128
-65
lines changed

performance/performance.py

+22-17
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,37 @@
1-
from pyes import ES
1+
import sys
2+
3+
#sys.path.insert(0, "../")
4+
5+
#from pyes import ES
6+
from brainaetic.echidnasearch.es import ES
27
from datetime import datetime
38
import shelve
49
#conn = ES('127.0.0.1:9200')
5-
conn = ES('192.168.1.51:9200')
10+
conn = ES('192.168.2.50:9200')
611
try:
712
conn.delete_index("test-index")
813
except:
914
pass
1015

1116
dataset = shelve.open("samples.shelve")
1217

13-
mapping = { u'description': {'boost': 1.0,
14-
'index': 'analyzed',
15-
'store': 'yes',
16-
'type': u'string',
17-
"term_vector" : "with_positions_offsets"
18-
},
19-
u'name': {'boost': 1.0,
20-
'index': 'analyzed',
21-
'store': 'yes',
22-
'type': u'string',
23-
"term_vector" : "with_positions_offsets"
24-
},
25-
u'age': {'store': 'yes',
18+
mapping = {u'description': {'boost': 1.0,
19+
'index': 'analyzed',
20+
'store': 'yes',
21+
'type': u'string',
22+
"term_vector": "with_positions_offsets"
23+
},
24+
u'name': {'boost': 1.0,
25+
'index': 'analyzed',
26+
'store': 'yes',
27+
'type': u'string',
28+
"term_vector": "with_positions_offsets"
29+
},
30+
u'age': {'store': 'yes',
2631
'type': u'integer'},
27-
}
32+
}
2833
conn.create_index("test-index")
29-
conn.put_mapping("test-type", {'properties':mapping}, ["test-index"])
34+
conn.put_mapping("test-type", {'properties': mapping}, ["test-index"])
3035

3136
start = datetime.now()
3237
for k, userdata in dataset.items():

pyes/contrib/__init__.py

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
__author__ = 'alberto'

pyes/contrib/mappings.py

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
from pyes.es import ES
4+
from pyes import mappings
5+
6+
def mappings_to_code(obj, doc_count=0):
7+
result = []
8+
odict = obj.as_dict()
9+
if isinstance(obj, (mappings.DocumentObjectField, mappings.ObjectField, mappings.NestedObject)):
10+
properties = odict.pop("properties", [])
11+
doc_count += 1
12+
kwargs = ["name=%r" % obj.name,
13+
"type=%r" % odict.pop("type")] +\
14+
["%s=%r" % (k, odict[k]) for k in sorted(odict.keys())]
15+
result.append(
16+
"doc%d=" % doc_count + str(type(obj)).split(".")[-1].strip("'>") + "(" + ', '.join(kwargs) + ")")
17+
for k in sorted(obj.properties.keys()):
18+
result.extend(mappings_to_code(obj.properties[k], doc_count))
19+
else:
20+
kwargs = ["name=%r" % obj.name,
21+
"type=%r" % odict.pop("type"),
22+
"store=%r" % obj.store,
23+
"index=%r" % odict.pop("index")] +\
24+
["%s=%r" % (k, odict[k]) for k in sorted(odict.keys())]
25+
result.append("doc%d.add_property(" % doc_count +\
26+
str(type(obj)).split(".")[-1].strip("'>") + "(" +\
27+
', '.join(kwargs) + "))")
28+
29+
return result
30+
31+
if __name__ == '__main__':
32+
es = ES("192.168.1.1:9200")
33+
res = mappings_to_code(es.mappings.get_doctype("twitter", "twitter"))
34+
print "\n".join(res)
35+
36+

pyes/es.py

+64-47
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ def save(self, bulk=False, id=None, parent=None, force=False):
105105
if force:
106106
version = None
107107
res = conn.index(self,
108-
meta.index, meta.type, id, parent=parent, bulk=bulk, version=version, force_insert=force)
108+
meta.index, meta.type, id, parent=parent, bulk=bulk, version=version, force_insert=force)
109109
if not bulk:
110110
self._meta.id = res._id
111111
self._meta.version = res._version
@@ -229,6 +229,38 @@ def dict_to_object(self, d):
229229
return DotDict(d)
230230

231231

232+
class Bulker(object):
233+
def __init__(self, conn, bulk_size=400, raise_on_bulk_item_failure=False):
234+
self.conn = conn
235+
self.bulk_size = bulk_size
236+
# protects bulk_data
237+
self.bulk_lock = threading.RLock()
238+
with self.bulk_lock:
239+
self.bulk_data = []
240+
self.raise_on_bulk_item_failure = raise_on_bulk_item_failure
241+
242+
def add_to_bulk_queue(self, content):
243+
with self.bulk_lock:
244+
self.bulk_data.append(content)
245+
246+
def flush_bulker(self, forced=False):
247+
with self.bulk_lock:
248+
if forced or len(self.bulk_data) >= self.bulk_size:
249+
batch = self.bulk_data
250+
self.bulk_data = []
251+
else:
252+
return None
253+
254+
if len(batch) > 0:
255+
bulk_result = self.conn._send_request("POST",
256+
"/_bulk",
257+
"\n".join(batch) + "\n")
258+
259+
if self.raise_on_bulk_item_failure:
260+
_raise_exception_if_bulk_item_failed(bulk_result)
261+
262+
return bulk_result
263+
232264
class ES(object):
233265
"""
234266
ES connection object.
@@ -237,7 +269,7 @@ class ES(object):
237269
encoder = ESJsonEncoder
238270
decoder = ESJsonDecoder
239271

240-
def __init__(self, server="localhost:9200", timeout=5.0, bulk_size=400,
272+
def __init__(self, server="localhost:9200", timeout=30.0, bulk_size=400,
241273
encoder=None, decoder=None,
242274
max_retries=3,
243275
default_indices=['_all'],
@@ -246,7 +278,8 @@ def __init__(self, server="localhost:9200", timeout=5.0, bulk_size=400,
246278
model=ElasticSearchModel,
247279
basic_auth=None,
248280
raise_on_bulk_item_failure=False,
249-
document_object_field=None):
281+
document_object_field=None,
282+
bulker_class=Bulker):
250283
"""
251284
Init a es object.
252285
Servers can be defined in different forms:
@@ -286,6 +319,7 @@ def __init__(self, server="localhost:9200", timeout=5.0, bulk_size=400,
286319
self.connection = None
287320
self._mappings = None
288321
self.document_object_field = document_object_field
322+
self.bulker_class = bulker_class
289323

290324
if model is None:
291325
model = lambda connection, model: model
@@ -303,11 +337,7 @@ def __init__(self, server="localhost:9200", timeout=5.0, bulk_size=400,
303337

304338
#used in bulk
305339
self.bulk_size = bulk_size #size of the bulk
306-
# protects bulk_data
307-
self.bulk_lock = threading.RLock()
308-
with self.bulk_lock:
309-
self.bulk_data = []
310-
self.raise_on_bulk_item_failure = raise_on_bulk_item_failure
340+
self.bulker = bulker_class(self, bulk_size=bulk_size, raise_on_bulk_item_failure=raise_on_bulk_item_failure)
311341

312342
if encoder:
313343
self.encoder = encoder
@@ -333,16 +363,16 @@ def __del__(self):
333363
Destructor
334364
"""
335365
# Don't bother getting the lock
336-
if len(self.bulk_data) > 0:
366+
if self.bulker:
337367
# It's not safe to rely on the destructor to flush the queue:
338368
# the Python documentation explicitly states "It is not guaranteed
339369
# that __del__() methods are called for objects that still exist "
340370
# when the interpreter exits."
341371
logger.error("pyes object %s is being destroyed, but bulk "
342372
"operations have not been flushed. Call force_bulk()!",
343-
self)
373+
self)
344374
# Do our best to save the client anyway...
345-
self.force_bulk()
375+
self.bulker.force_bulk()
346376

347377
def _check_servers(self):
348378
"""Check the servers variable and convert in a valid tuple form"""
@@ -405,12 +435,17 @@ def _init_connection(self):
405435
if _type in ["http", "https"]:
406436
self.connection = http_connect(
407437
[(_type, host, port) for _type, host, port in self.servers if _type in ["http", "https"]],
408-
timeout=self.timeout, basic_auth=self.basic_auth,
409-
max_retries=self.max_retries)
438+
timeout=self.timeout
439+
,
440+
basic_auth=self.basic_auth
441+
,
442+
max_retries=self.max_retries)
410443
return
411444
elif _type == "thrift":
412445
self.connection = thrift_connect([(host, port) for _type, host, port in self.servers if _type == "thrift"],
413-
timeout=self.timeout, max_retries=self.max_retries)
446+
timeout=self.timeout
447+
,
448+
max_retries=self.max_retries)
414449

415450
def _discovery(self):
416451
"""
@@ -444,7 +479,7 @@ def _send_request(self, method, path, body=None, params=None, headers=None, raw=
444479
else:
445480
body = ""
446481
request = RestRequest(method=Method._NAMES_TO_VALUES[method.upper()],
447-
uri=path, parameters=params, headers=headers, body=body)
482+
uri=path, parameters=params, headers=headers, body=body)
448483
if self.dump_curl is not None:
449484
self._dump_curl_request(request)
450485

@@ -536,8 +571,8 @@ def _set_default_indices(self, default_indices):
536571
@property
537572
def mappings(self):
538573
if self._mappings is None:
539-
self._mappings = Mapper(self.get_mapping(["_all"]), connection=self,
540-
document_object_field=self.document_object_field)
574+
self._mappings = Mapper(self.get_mapping(indices=["_all"]), connection=self,
575+
document_object_field=self.document_object_field)
541576
return self._mappings
542577

543578
#---- Admin commands
@@ -799,7 +834,7 @@ def optimize(self, indices=None,
799834
only_expunge_deletes=only_expunge_deletes,
800835
refresh=refresh,
801836
flush=flush,
802-
)
837+
)
803838
if max_num_segments is not None:
804839
params['max_num_segments'] = max_num_segments
805840
result = self._send_request('POST', path, params=params)
@@ -974,17 +1009,14 @@ def cluster_stats(self, nodes=None):
9741009
path = self._make_path(parts)
9751010
return self._send_request('GET', path)
9761011

977-
def _add_to_bulk_queue(self, content):
978-
with self.bulk_lock:
979-
self.bulk_data.append(content)
9801012

9811013
def index_raw_bulk(self, header, document):
9821014
"""
9831015
Function helper for fast inserting
9841016
9851017
header and document must be string "\n" ended
9861018
"""
987-
self._add_to_bulk_queue(u"%s%s" % (header, document))
1019+
self.bulker.add_to_bulk_queue(u"%s%s" % (header, document))
9881020
return self.flush_bulk()
9891021

9901022
def index(self, doc, index, doc_type, id=None, parent=None,
@@ -1018,7 +1050,7 @@ def index(self, doc, index, doc_type, id=None, parent=None,
10181050
if isinstance(doc, dict):
10191051
doc = json.dumps(doc, cls=self.encoder)
10201052
command = "%s\n%s" % (json.dumps(cmd, cls=self.encoder), doc)
1021-
self._add_to_bulk_queue(command)
1053+
self.bulker.add_to_bulk_queue(command)
10221054
return self.flush_bulk()
10231055

10241056
if force_insert:
@@ -1063,22 +1095,7 @@ def flush_bulk(self, forced=False):
10631095
"""
10641096
Send pending operations if forced or if the bulk threshold is exceeded.
10651097
"""
1066-
with self.bulk_lock:
1067-
if forced or len(self.bulk_data) >= self.bulk_size:
1068-
batch = self.bulk_data
1069-
self.bulk_data = []
1070-
else:
1071-
return None
1072-
1073-
if len(batch) > 0:
1074-
bulk_result = self._send_request("POST",
1075-
"/_bulk",
1076-
"\n".join(batch) + "\n")
1077-
1078-
if self.raise_on_bulk_item_failure:
1079-
_raise_exception_if_bulk_item_failed(bulk_result)
1080-
1081-
return bulk_result
1098+
self.bulker.flush_bulk(forced)
10821099

10831100
def force_bulk(self):
10841101
"""
@@ -1139,7 +1156,7 @@ def update(self, extra_doc, index, doc_type, id, querystring_args=None,
11391156
new_doc = current_doc
11401157
try:
11411158
return self.index(new_doc, index, doc_type, id,
1142-
version=current_doc._meta.version, querystring_args=querystring_args)
1159+
version=current_doc._meta.version, querystring_args=querystring_args)
11431160
except VersionConflictEngineException:
11441161
if attempt <= 0:
11451162
raise
@@ -1154,7 +1171,7 @@ def delete(self, index, doc_type, id, bulk=False, querystring_args=None):
11541171
if bulk:
11551172
cmd = {"delete": {"_index": index, "_type": doc_type,
11561173
"_id": id}}
1157-
self._add_to_bulk_queue(json.dumps(cmd, cls=self.encoder))
1174+
self.bulker.add_to_bulk_queue(json.dumps(cmd, cls=self.encoder))
11581175
return self.flush_bulk()
11591176

11601177
path = self._make_path([index, doc_type, id])
@@ -1258,8 +1275,8 @@ def mget(self, ids, index=None, doc_type=None, routing=None, **get_params):
12581275
if routing:
12591276
get_params["routing"] = routing
12601277
results = self._send_request('GET', "/_mget",
1261-
body={'docs': body},
1262-
params=get_params)
1278+
body={'docs': body},
1279+
params=get_params)
12631280
if 'docs' in results:
12641281
model = self.model
12651282
return [model(self, item) for item in results['docs']]
@@ -1369,7 +1386,7 @@ def count(self, query=None, indices=None, doc_types=None, **query_params):
13691386
if doc_types is None:
13701387
doc_types = []
13711388
if query is None:
1372-
from ..query import MatchAllQuery
1389+
from .query import MatchAllQuery
13731390

13741391
query = MatchAllQuery()
13751392
if hasattr(query, 'to_query_json'):
@@ -1542,7 +1559,7 @@ def _do_search(self, auto_increment=False):
15421559
self.query.size = self.chuck_size
15431560

15441561
self._results = self.connection.search_raw(self.query, indices=self.indices,
1545-
doc_types=self.doc_types, **self.query_params)
1562+
doc_types=self.doc_types, **self.query_params)
15461563
if 'search_type' in self.query_params and self.query_params['search_type'] == "scan":
15471564
self.scroller_parameters['search_type'] = self.query_params['search_type']
15481565
del self.query_params['search_type']
@@ -1561,7 +1578,7 @@ def _do_search(self, auto_increment=False):
15611578
else:
15621579
try:
15631580
self._results = self.connection.search_scroll(self.scroller_id,
1564-
self.scroller_parameters.get("scroll", "10m"))
1581+
self.scroller_parameters.get("scroll", "10m"))
15651582
self.scroller_id = self._results['_scroll_id']
15661583
except ReduceSearchPhaseException:
15671584
#bad hack, should be not hits on the last iteration
@@ -1670,7 +1687,7 @@ def get_start_end(val):
16701687
query['size'] = end - start
16711688

16721689
results = self.connection.search_raw(query, indices=self.indices,
1673-
doc_types=self.doc_types, **self.query_params)
1690+
doc_types=self.doc_types, **self.query_params)
16741691

16751692
hits = results['hits']['hits']
16761693
if not isinstance(val, slice):

pyes/mappings.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,8 @@ def _process(self, data):
484484
for indexname, indexdata in data.items():
485485
self.indices[indexname] = {}
486486
for docname, docdata in indexdata.items():
487-
o = get_field(docname, docdata)
487+
o = get_field(docname, docdata, "document",
488+
document_object_field=self.document_object_field)
488489
o.connection = self.connection
489490
o.index_name = indexname
490491
self.indices[indexname][docname] = o

0 commit comments

Comments
 (0)