Skip to content

Commit

Permalink
colour added, surface scan promt added
Browse files Browse the repository at this point in the history
  • Loading branch information
rootifera committed Jul 22, 2024
1 parent 41a5064 commit bc2ab3c
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 30 deletions.
116 changes: 86 additions & 30 deletions disk_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@ def time_operation(operation, fd, sector, size):

def write_sector(fd, sector, size):
os.lseek(fd.fileno(), sector * size, os.SEEK_SET)
os.write(fd.fileno(), b'\0' * size) # Writing zero bytes to the sector
os.write(fd.fileno(), b'\0' * size)


def read_sector(fd, sector, size):
os.lseek(fd.fileno(), sector * size, os.SEEK_SET)
os.read(fd.fileno(), size)


def scan_disk(disk_path, sector_size, update_queue, lock):
# Get disk size in sectors
def scan_disk(disk_path, sector_size, update_queue, lock, stop_event):
try:
result = subprocess.run(['blockdev', '--getsz', disk_path], capture_output=True, text=True, check=True)
total_sectors = int(result.stdout.strip())
Expand All @@ -39,6 +38,11 @@ def scan_disk(disk_path, sector_size, update_queue, lock):
try:
with open(disk_path, 'rb+', buffering=0) as fd:
for sector in range(total_sectors):
if stop_event.is_set():
with lock:
update_queue[disk_path]['stopped'] = True
break

write_time = time_operation(write_sector, fd, sector, sector_size)
read_time = time_operation(read_sector, fd, sector, sector_size)

Expand Down Expand Up @@ -84,41 +88,85 @@ def scan_disk(disk_path, sector_size, update_queue, lock):
update_queue[disk_path]['error'] = f"Failed to open disk: {e}"


def update_ui(stdscr, update_queue, lock, disks):
while True:
stdscr.clear()
height, width = stdscr.getmaxyx()
def update_ui(stdscr, update_queue, lock, disk_map, stop_events):
curses.curs_set(0)
stdscr.nodelay(True)
curses.echo()

cols = width // 30
rows = (len(disks) + cols - 1)
curses.start_color()
curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLACK)
curses.init_pair(2, curses.COLOR_CYAN, curses.COLOR_BLACK)
curses.init_pair(3, curses.COLOR_YELLOW, curses.COLOR_BLACK)
curses.init_pair(4, curses.COLOR_YELLOW, curses.COLOR_BLACK)
curses.init_pair(5, curses.COLOR_BLUE, curses.COLOR_BLACK)
curses.init_pair(6, curses.COLOR_MAGENTA, curses.COLOR_BLACK)
curses.init_pair(7, curses.COLOR_RED, curses.COLOR_BLACK)
curses.init_pair(8, curses.COLOR_RED, curses.COLOR_BLACK)

for idx, disk in enumerate(disks):
col = idx % cols
row = idx // cols
height, width = stdscr.getmaxyx()
max_width_per_disk = 30
max_disks_per_row = width // max_width_per_disk
input_buffer = ""

x = col * 30
while True:
stdscr.clear()
num_disks = len(disk_map)
num_rows = (num_disks + max_disks_per_row - 1) // max_disks_per_row
max_rows_for_display = (height - 1) // 10
num_rows = min(num_rows, max_rows_for_display)

for idx, (disk_num, disk) in enumerate(disk_map.items()):
row = idx // max_disks_per_row
col = idx % max_disks_per_row
y = row * 10
x = col * max_width_per_disk

if y + 8 >= height:
stdscr.addstr(height - 1, 0, "Not all disks are displayed.")
break

if 'error' in update_queue[disk]:
stdscr.addstr(y, x, f"{disk}: {update_queue[disk]['error']}")
stdscr.addstr(y, x, f"Disk {disk_num}: {update_queue[disk]['error']}", curses.color_pair(7))
continue
stdscr.addstr(y, x, f"Disk: {disk}")
if 'stopped' in update_queue[disk]:
stdscr.addstr(y, x, f"Disk {disk_num}: Stopped", curses.color_pair(5))
continue

stdscr.addstr(y, x, f"Disk {disk_num}: {disk}")
with lock:
stats = update_queue[disk]
stdscr.addstr(y + 1, x, f"<5ms = {stats['<5ms']}")
stdscr.addstr(y + 2, x, f"<10ms = {stats['<10ms']}")
stdscr.addstr(y + 3, x, f"<20ms = {stats['<20ms']}")
stdscr.addstr(y + 4, x, f"<50ms = {stats['<50ms']}")
stdscr.addstr(y + 5, x, f"<150ms = {stats['<150ms']}")
stdscr.addstr(y + 6, x, f"<500ms = {stats['<500ms']}")
stdscr.addstr(y + 7, x, f">500ms = {stats['>500ms']}")
stdscr.addstr(y + 8, x, f"BAD = {stats['bad']}")

stdscr.addstr(y + 1, x, f"<5ms = {stats['<5ms']}", curses.color_pair(1))
stdscr.addstr(y + 2, x, f"<10ms = {stats['<10ms']}", curses.color_pair(2))
stdscr.addstr(y + 3, x, f"<20ms = {stats['<20ms']}", curses.color_pair(3))
stdscr.addstr(y + 4, x, f"<50ms = {stats['<50ms']}", curses.color_pair(4))
stdscr.addstr(y + 5, x, f"<150ms = {stats['<150ms']}", curses.color_pair(5))
stdscr.addstr(y + 6, x, f"<500ms = {stats['<500ms']}", curses.color_pair(6))
stdscr.addstr(y + 7, x, f">500ms = {stats['>500ms']}", curses.color_pair(7))
stdscr.addstr(y + 8, x, f"BAD = {stats['bad']}", curses.color_pair(8) | curses.A_BOLD)

prompt_str = "Press 'q' to quit. Enter disk number to stop: "
stdscr.addstr(height - 1, 0, prompt_str)
stdscr.addstr(height - 1, len(prompt_str), input_buffer)
stdscr.refresh()

key = stdscr.getch()

if key == ord('q'):
break

if key == curses.KEY_BACKSPACE or key == 127:
input_buffer = input_buffer[:-1]

elif key == curses.KEY_ENTER or key == 10:
if input_buffer.isdigit():
disk_num = int(input_buffer)
if disk_num in stop_events:
stop_events[disk_num].set()
input_buffer = ""

elif key != -1 and chr(key).isdigit():
input_buffer += chr(key)

time.sleep(1)


Expand All @@ -127,13 +175,21 @@ def scan_disks(disks):
update_queue = defaultdict(lambda: defaultdict(int))
lock = threading.Lock()

disk_map = {i: disk for i, disk in enumerate(disks)}
stop_events = {i: threading.Event() for i in disk_map}

threads = []
for disk in disks:
t = threading.Thread(target=scan_disk, args=(disk, sector_size, update_queue, lock))
for i, disk in disk_map.items():
t = threading.Thread(target=scan_disk, args=(disk, sector_size, update_queue, lock, stop_events[i]))
t.start()
threads.append(t)

curses.wrapper(update_ui, update_queue, lock, disks)

for t in threads:
t.join()
try:
curses.wrapper(update_ui, update_queue, lock, disk_map, stop_events)
except KeyboardInterrupt:
pass
finally:
for event in stop_events.values():
event.set()
for t in threads:
t.join()
19 changes: 19 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import signal
import sys

import disk_scanner
import diskforge
from colorama import Fore, init

Expand All @@ -12,6 +13,17 @@ def signal_handler(sig, frame):
sys.exit(0)


def ask_user(prompt):
while True:
response = input(prompt).strip().lower()
if response in ['y', 'yes']:
return True
elif response in ['n', 'no']:
return False
else:
print("Please enter 'yes' or 'no'.")


if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)

Expand All @@ -32,3 +44,10 @@ def signal_handler(sig, frame):
print(f"{Fore.BLUE}====================================")
diskforge.set_labels(disks)
print(f"{Fore.BLUE}====================================")

if ask_user("Would you like to surface scan the disks? [If you need to remove disks please do it now] (yes/no): "):
disks = diskforge.identify_disks()
disk_scanner.scan_disks(disks)
else:
print(f"{Fore.GREEN}Exiting without surface scan.")
sys.exit(0)

0 comments on commit bc2ab3c

Please sign in to comment.