-
Notifications
You must be signed in to change notification settings - Fork 0
/
chess.py
196 lines (176 loc) · 8.29 KB
/
chess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
import sys
sys.path.append("../bsc-contrib/Libraries/GenC/GenCMsg")
sys.path.append("build")
import time
import threading
import json
import msgclient
from _chess import ffi, lib
def cdata_dict(cd, ty=None):
if isinstance(cd, ffi.CData):
ty = ffi.typeof(cd)
if ty.kind == 'array':
return [cdata_dict(x, ty.item) for x in cd]
elif ty.kind == 'struct':
fields = dict(ty.fields)
if 'tag' in fields:
tagName = ffi.string(ffi.cast(fields['tag'].type, cd.tag))[len(ty.cname.split("struct ")[1]) + 1:]
if 'contents' in fields:
unionFields = dict(fields['contents'].type.fields)
if tagName in unionFields:
return {'tag': tagName, 'contents': cdata_dict(getattr(cd.contents, tagName), unionFields[tagName].type)}
return tagName
else:
return {k: cdata_dict(getattr(cd, k), f.type) for k, f in ty.fields}
elif ty.kind == 'enum':
return ffi.string(ffi.cast(ty, cd))
elif ty.kind == 'primitive':
return cd
else:
raise ValueError("C type {} cannot be converted to dict representation".format(ty.cname))
whitePieces = {'King': '♔', 'Queen': '♕', 'Rook': '♖', 'Bishop': '♗', 'Knight': '♘', 'Pawn': '♙'}
blackPieces = {'King': '♚', 'Queen': '♛', 'Rook': '♜', 'Bishop': '♝', 'Knight': '♞', 'Pawn': '♟'}
def strPiece(p):
pieceName = ffi.string(ffi.cast('enum PieceKind_tag', p.kind.tag))[len('PieceKind_'):]
return (whitePieces if p.color.tag == lib.Color_White else blackPieces)[pieceName] + '\uFE0E'
def strPosition(pos):
return chr(ord('a') + pos.file) + str(8 - pos.rank)
def strMove(state, move):
if move.tag == lib.Move_Move or move.tag == lib.Move_EnPassant or move.tag == lib.Move_Promote:
if move.tag == lib.Move_Move:
directMove = move.contents.Move
elif move.tag == lib.Move_EnPassant:
directMove = move.contents.EnPassant
elif move.tag == lib.Move_Promote:
directMove = move.contents.Promote
fromPos = getattr(directMove, 'from') # Can't write directMove.from since 'from' is a keyword in Python
toPos = directMove.to
fromSquare = state.board[fromPos.rank][fromPos.file]
toSquare = state.board[toPos.rank][toPos.file]
assert fromSquare.occupied
assert fromSquare.piece.color.tag == state.turn.tag
capture = toSquare.occupied or move.tag == lib.Move_EnPassant
promo = strPiece(ffi.new("Piece *", {'color': fromSquare.piece.color, 'kind': directMove.kind})) if move.tag == lib.Move_Promote else ""
ep = " e.p." if move.tag == lib.Move_EnPassant else ""
return strPiece(fromSquare.piece) + strPosition(fromPos) + ("x" if capture else "-") + strPosition(toPos) + promo + ep
elif move.tag == lib.Move_Castle:
return "0-0" if move.contents.Castle.kingSide else "0-0-0"
def strOutcome(outcome):
if outcome == lib.Outcome_Check:
return " - Check"
elif outcome == lib.Outcome_CheckMate:
return " - Checkmate"
elif outcome == lib.Outcome_Draw:
return " - Draw"
else:
return ""
class ChessClient(msgclient.Client):
def __init__(self, serial, callback, timeout=5):
super().__init__("ChessMsgs", ffi, lib, serial)
self.callback = callback
self.event = "Initialized"
self.state = None
self.stateId = 1
self._movesAccum = []
self.moves = []
self.outcome = None
self.whiteAI = False
self.blackAI = True
self.timeout = timeout
self.depth = 1
self.bestMove = None
self.searchTimer = None
def start(self):
super().start()
self.updateState()
self.event = "Server restarted"
self.callback(self.event)
def notify(self):
update = False
while state := self.get("state"):
self.state = state
self.stateId = self.stateId % 255 + 1
self.put("command", ffi.new("Command *", {'tag': lib.Command_Query, 'contents': {'Query': {'rid': 0, 'depth': 1, 'getMoves': True}}})[0])
#update = True # No need to update when state is recieved, as there will be an update for moves/outcome
while move := self.get("moves"):
if move.tag == lib.MoveResponse_NextMove:
#print("Got move", strMove(self.state, move.contents.NextMove))
self._movesAccum.append(ffi.new("Move *", move.contents.NextMove)[0])
elif move.tag == lib.MoveResponse_NoMove:
#print("Got no more moves")
self.moves = self._movesAccum
self._movesAccum = []
update = True
while searchResult := self.get("searchResult"):
if searchResult.rid == 0:
self.outcome = searchResult.outcome.tag
if self.outcome != lib.Outcome_NoOutcome:
update = True
if ((self.outcome == lib.Outcome_NoOutcome or self.outcome == lib.Outcome_Check) and
((self.state.turn.tag == lib.Color_Black and self.blackAI) or (self.state.turn.tag == lib.Color_White and self.whiteAI))):
self.startSearch()
elif searchResult.rid == self.stateId and self.searchTimer is not None and searchResult.bestMove.tag == lib.Maybe_Move_Valid:
print("Got valid search move", strMove(self.state, searchResult.bestMove.contents.Valid), searchResult.score)
self.bestMove = ffi.new("Move *", searchResult.bestMove.contents.Valid)[0]
self.depth += 1
print("Deepening to depth", self.depth)
self.requestSearchMove()
else:
print("Did not get a valid search move")
if update:
self.callback(self.event + strOutcome(self.outcome))
def startSearch(self):
if self.searchTimer is None:
print("Getting search move for", ffi.string(ffi.cast('enum Color_tag', self.state.turn.tag)))
self.depth = 1
self.requestSearchMove()
self.searchTimer = threading.Timer(self.timeout, self.sendSearchMove)
self.searchTimer.start()
def sendSearchMove(self):
self.searchTimer = None
self.cancelSearch()
if self.bestMove is None:
print("Search move not ready")
self.startSearch() # Retry
else:
print("Sending best move")
self.put("command", ffi.new("Command *", {'tag': lib.Command_Move, 'contents': {'Move': self.bestMove}})[0])
self.updateState()
self.event = strMove(self.state, self.bestMove)
self.bestMove = None
self.outcome = None
def cancelSearch(self):
self.put("command", ffi.new("Command *", {'tag': lib.Command_CancelSearch})[0])
if self.searchTimer is not None:
self.searchTimer.cancel()
self.searchTimer = None
def updateState(self):
self.put("command", ffi.new("Command *", {'tag': lib.Command_GetState})[0])
def requestSearchMove(self):
self.put("command", ffi.new("Command *", {'tag': lib.Command_Query, 'contents': {'Query': {'rid': self.stateId, 'depth': self.depth, 'getMoves': False}}})[0])
def jsonStatus(self):
if self.state:
return json.dumps({
'state': cdata_dict(self.state),
'moves': list(map(cdata_dict, self.moves)),
'whiteAI': self.whiteAI,
'blackAI': self.blackAI,
'timeout': self.timeout,
})
def move(self, i):
if (self.state.turn.tag == lib.Color_Black and not self.blackAI) or (self.state.turn.tag == lib.Color_White and not self.whiteAI):
self.put("command", ffi.new("Command *", {'tag': lib.Command_Move, 'contents': {'Move': self.moves[i]}})[0])
self.updateState()
self.event = strMove(self.state, self.moves[i])
self.outcome = None
def reset(self):
self.cancelSearch()
self.put("command", ffi.new("Command *", {'tag': lib.Command_Reset})[0])
self.updateState()
self.event = "Game reset"
self.outcome = None
def config(self, whiteAI, blackAI):
self.whiteAI = whiteAI
self.blackAI = blackAI
self.cancelSearch()
self.updateState()