-
Notifications
You must be signed in to change notification settings - Fork 0
/
kiss.py
executable file
·170 lines (145 loc) · 4.22 KB
/
kiss.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
#!/usr/bin/env python3
import socket
import sys
import time
import subprocess
from fcntl import fcntl, F_GETFL, F_SETFL
from os import O_NONBLOCK, read
# config
host = '127.0.0.1'
port = 7342 # FLDIGI KISS default port
FEND = 0xC0
FESC = 0xDB
TFEND = 0xDC
TFESC = 0xDD
def kiss_encode(payload):
msg = b''
for x in payload:
# handle escape bytes
if x == FEND:
msg += bytes([FESC, TFEND])
elif x == FESC:
msg += bytes([FESC, TFESC])
else:
msg += bytes([x])
return payload
def kiss_decode(payload):
msg = b''
frame_escape = False
for x in payload:
# handle escape bytes
if frame_escape:
if x == TFESC:
msg += bytes([FESC])
elif x == TFEND:
msg += bytes([FEND])
# everything else is an error
frame_escape = False
elif x == FESC:
frame_escape = True
else:
msg += bytes([x])
return msg
def kiss_data_frame(payload):
msg = b''
msg += bytes([FEND, 0x07]) # 0 = Port 0, 7 = FLDIGI RAW
msg += payload
msg += bytes([FEND])
return kiss_encode(msg)
def enable_raw_mode():
msg = b''
msg += bytes([FEND, 0x06]) # 0 = Port 0, 6 = FLDIGI H/W
msg += "KISSRAW:ON".encode('ascii')
msg += bytes([FEND])
return kiss_encode(msg)
def send_frame(socket, frame):
totalsent = 0
while totalsent < len(frame):
sent = socket.send(frame[totalsent:])
if sent == 0:
raise RuntimeError("connection unexpectedly closed")
totalsent += sent
# MFSK32 framing
STX = 0x02
EOT = 0x04
found_STX = False
message = b''
def handle_message(msg, sock):
global frotz
cmd = msg.decode('ascii').strip() + "\n"
print(msg)
frotz.stdin.write(cmd.encode('ascii'))
frotz.stdin.flush()
time.sleep(2)
resp = get_game_response(frotz)
print(resp)
send_frame(sock, kiss_data_frame(resp.encode('ascii')))
# receive and process a decoded KISS frame (all framing information is removed; frame type is the first byte)
def receive_frame_handler(frame, sock):
global found_STX
global message
if len(frame) == 0:
return
if frame[0] != 0x07:
# not a data/raw frame
return
data = frame[1:]
# TODO refactor for the extremely unlikely case that we get STX and EOT simultaneously
if STX in data:
if found_STX:
print("Warning, duplicate STX without EOT, restarting frame")
else:
found_STX = True
idx = list(data).index(STX)
message = data[idx+1:]
elif EOT in data:
if found_STX:
found_STX = False
else:
print("Warning, EOT without STX, discarding")
idx = list(data).index(EOT)
message += data[0:idx]
handle_message(message, sock)
else:
if found_STX:
message += data
# ignore data that's received outside of frame
def get_game_response(frotz):
msg = ""
while True:
data = frotz.stdout.read()
if data is not None:
msg += data.decode('ascii')
if msg.endswith(">"):
return msg
# entry point
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((host, port))
send_frame(s, enable_raw_mode())
#send_frame(s, kiss_data_frame("TESTING 1 TESTING 2 TESTING 3 de VE3TUX".encode('ascii')))
frotz = subprocess.Popen(['/home/mtrberzi/games/frotz/dfrotz', '-h', '1000', '/home/mtrberzi/games/zork1.z5'], stdout=subprocess.PIPE, stdin=subprocess.PIPE, shell=False)
flags = fcntl(frotz.stdout, F_GETFL)
fcntl(frotz.stdout, F_SETFL, flags | O_NONBLOCK)
opening = get_game_response(frotz)
print(opening, end='')
send_frame(s, kiss_data_frame(opening.encode('ascii')))
try:
buf = b''
while True:
chunk = s.recv(1024)
if chunk == b'':
raise RuntimeError("connection unexpectedly closed")
buf += chunk
while FEND in buf:
idx = list(buf).index(FEND)
frame = buf[0:idx]
receive_frame_handler(kiss_decode(frame), s)
buf = buf[idx+1:]
except:
s.close()
frotz.kill()
raise
s.close()
frotz.stdin.close()
frotz.stdout.close()
frotz.kill()