forked from astrelsky/libhijacker
-
Notifications
You must be signed in to change notification settings - Fork 0
/
send_elf.py
173 lines (146 loc) · 4.96 KB
/
send_elf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#!/usr/bin/env python3
import argparse
import asyncio
import re
from contextlib import asynccontextmanager
from pathlib import Path
import aiofiles
SEM = asyncio.Semaphore(60)
LOGGER_PORT = 9020
ELF_PORT = 9027
class LineBuffer(bytearray):
# hack to deal with StreamReader not allowing a regex pattern
SEP = re.compile(b'(?:\r\n)|\r|\n')
def find(self, _, offset):
match = self.SEP.search(self, offset)
return match.start() if match else -1
class DummyLogger:
async def __aenter__(self):
await asyncio.sleep(0)
return self
async def __aexit__(self, *args):
await asyncio.sleep(0)
async def write(self, _):
await asyncio.sleep(0)
@asynccontextmanager
async def get_logger(log: Path | None):
if log is None:
async with DummyLogger() as fp:
yield fp
else:
async with aiofiles.open(log, 'w+', encoding='utf-8') as fp:
yield fp
@asynccontextmanager
async def open_connection(host: str, port: int):
while True:
try:
reader, writer = await asyncio.open_connection(host, port)
except OSError:
await asyncio.sleep(1)
continue
try:
yield reader, writer
break
finally:
await writer.drain()
writer.close()
async def send_elf(host: str, elf: Path):
async with open_connection(host, ELF_PORT) as (_, writer):
data = elf.read_bytes()
writer.write(len(data).to_bytes(8, byteorder='little'))
writer.write(data)
writer.write_eof()
await writer.drain()
async def log_task(host: str, reader: asyncio.StreamReader, elf: Path, log: Path | None, silent = False):
sent_elf = False
async with get_logger(log) as fp:
line = b''
while True:
if reader.at_eof():
if '\r' in line:
await fp.write(line.replace('\r', '\n'))
break
try:
line = await reader.readline()
except OSError:
await asyncio.sleep(1)
continue
line = line.decode('latin-1')
if not sent_elf and line.startswith('waiting for connection'):
if not silent:
if '\r' not in line:
await fp.write(line)
print(line, end='')
await send_elf(host, elf)
sent_elf = True
if silent:
continue
if '\r' not in line:
await fp.write(line)
print(line, end='')
async def logger_client(host: str, elf: Path, spawner: Path, log: Path | None, silent: bool):
# waiting for connection
async with SEM:
async with open_connection(host, LOGGER_PORT) as (reader, writer):
reader._buffer = LineBuffer(reader._buffer)
writer.write(spawner.read_bytes())
writer.write_eof()
await writer.drain()
await log_task(host, reader, elf, log, silent)
async def run_loggers(*args):
# klogger code was left incase it's helpful for someone in the future
logger = asyncio.create_task(logger_client(*args))
#klogger = asyncio.create_task(klog_client(host))
tasks = (logger, ) #klogger)
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
if not logger.done():
await asyncio.wait_for(logger, timeout=None)
#if not klogger.done():
# klogger.cancel()
def main():
parser = argparse.ArgumentParser(
description='Helper script for sending the spawner, elf to load into spawned process and for logging',
epilog='Text at the bottom of help'
)
parser.add_argument('ip', help='PS5 ip address')
parser.add_argument('elf', help='Path to the elf to load into the spawned process')
parser.add_argument(
'--spawner',
default='bin/spawner.elf',
help='Path to the elf used to spawn the new process. (default: bin/spawner.elf)'
)
parser.add_argument(
'--log',
default='log.txt',
help='Path to output log file if enabled. (default: log.txt)'
)
parser.add_argument(
'--nolog',
default=False,
action='store_true',
help='Switch to disable logging to an output file. (default False)'
)
parser.add_argument(
'--silent',
default=False,
action='store_true',
help='Switch to disable all logging. (default: False)'
)
args = parser.parse_args()
try:
elf = Path(args.elf)
spawner = Path(args.spawner)
log = Path(args.log)
if args.nolog:
log = None
if not spawner.exists():
print(f'{spawner} does not exist')
return
if not elf.exists():
print(f'{elf} does not exist')
return
asyncio.run(run_loggers(args.ip, elf, spawner, log, args.silent))
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main()