-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathscoreserver.py
executable file
·158 lines (117 loc) · 3.98 KB
/
scoreserver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import StringIO, traceback, random, string, zlib, time, json, os
import chess.pgn
import chess
import pystockfish
import boto
import boto.sqs
from boto.sqs.message import Message
from boto.s3.key import Key
from djeval import *
DEBUG = ('DEBUG' in os.environ)
MIN_ITEMS_PER_KEY = 100
NUM_ITEMS_PER_KEY = 1000
if 'MIN_ITEMS_PER_KEY' in os.environ:
MIN_ITEMS_PER_KEY = int(os.environ['MIN_ITEMS_PER_KEY'])
movetime = None
if 'MOVETIME' in os.environ:
movetime = int(os.environ['MOVETIME'])
depth = None
if 'HASH' in os.environ:
hash = int(os.environ['HASH'])
else:
hash = 100
if 'THREADS' in os.environ:
threads = int(os.environ['THREADS'])
else:
threads = 1
if 'RESULTSFOLDER' in os.environ:
resultsfolder = os.environ['RESULTSFOLDER']
else:
resultsfolder = time.strftime('%Y%m%d')
conn = boto.sqs.connect_to_region("us-east-1")
in_queuename = 'numbers'
out_queuename = 'results'
# we have this many seconds to complete processing of a game, or else
# another node may pick up the work and do it as well.
#
# leave enough time to process a 300 halfply game
visibility_timeout = (300 * movetime) / 1000
# max permitted wait time.
wait_time_seconds = 20
def queue_read(q):
return q.read(visibility_timeout, wait_time_seconds)
msg("Hi! Analyzing %s, writing results to %s" % (in_queuename, out_queuename))
engine = pystockfish.Engine(depth=depth, param={'Threads':threads, 'Hash':hash}, movetime=movetime)
inq = conn.get_queue(in_queuename)
outq = conn.get_queue(out_queuename)
s3conn = boto.connect_s3()
gamesbucket = s3conn.get_bucket('bc-games')
# read outputs from outqueue in batches and stuff them into S3
def consolidate_outputs():
if outq.count() < MIN_ITEMS_PER_KEY * 2:
msg("Not enough outputs to consolidate. Sleeping 10 seconds.")
time.sleep(10)
return
msg("There are %i outputs in queue. Consolidating." % outq.count())
# read a bunch of output from the outqueue
ms = []
for ix in range(0, NUM_ITEMS_PER_KEY):
nextmsg = outq.read()
if nextmsg is None:
break
ms.append(nextmsg)
if len(ms) < MIN_ITEMS_PER_KEY:
msg("Not enough messages read. Sleeping 10 seconds.")
time.sleep(10)
return
# make a giant blobstring out of them
blob = "["
for m in ms:
blob = blob + m.get_body() + ", "
blob = blob + "]"
# create an S3 key to write them into
random.seed()
fifty = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(50))
keyname = '%s/%s.json.zl' % (resultsfolder, fifty)
resultsbucket = s3conn.get_bucket('bc-runoutputs')
k = Key(resultsbucket)
k.key = keyname
# DO IT
k.set_contents_from_string(zlib.compress(blob))
# clear the messages from the outqueue
for m in ms:
m.delete()
msg("Consolidated. Hooray!")
def do_game(game_number):
key_name = "kaggle/%s.pgn" % game_number
msg("Retrieving %s" % key_name)
k = gamesbucket.get_key(key_name)
game_pgn_string = k.get_contents_as_string()
game_fd = StringIO.StringIO(game_pgn_string)
game = chess.pgn.read_game(game_fd)
result_struct = do_it_backwards(engine=engine, game=game, debug=DEBUG)
result_struct['movetime'] = movetime
result_struct['hash'] = hash
result_msg = Message()
result_msg.set_body(json.dumps(result_struct))
outq.write(result_msg)
def do_work():
game_number = 0
try:
msg("There are %d games in queue." % inq.count())
if inq.count() == 0:
consolidate_outputs()
else:
game_msg = queue_read(inq)
if game_msg is not None:
game_number = game_msg.get_body()
do_game(game_number)
inq.delete_message(game_msg)
except:
msg("Unexpected error: %s" % sys.exc_info()[0])
traceback.print_tb(sys.exc_info()[2])
msg("Game number: %s" % game_number)
time.sleep(10)
while True:
do_work()
msg("Broke out of loop somehow!")