-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdrivemapper.py
More file actions
214 lines (182 loc) · 8.5 KB
/
drivemapper.py
File metadata and controls
214 lines (182 loc) · 8.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
#!/usr/bin/env python3
# drivemapper.py
# Crawl a drive/share and output a CSV tree map of files with path term columns.
import argparse
import csv
import os
import sys
import platform
from datetime import datetime, timezone
from collections import Counter
IS_WINDOWS = platform.system() == "Windows"
WIN_INVALID_CHARS = set('<>:"/\\|?*')
FILE_ATTRIBUTE_REPARSE_POINT = 0x0400 # Windows file attribute bit
def has_invalid_win_chars(name: str) -> bool:
return any(c in WIN_INVALID_CHARS for c in name) or name.endswith(" ") or name.endswith(".")
def make_printer(verbose: bool):
def _warn(msg: str):
if verbose:
print(f"WARNING: {msg}", file=sys.stderr)
return _warn
def info(msg: str):
print(msg, file=sys.stderr)
def win_extended(path: str) -> str:
if not IS_WINDOWS:
return path
if path.startswith("\\\\?\\"):
return path
if path.startswith("\\\\"): # UNC
return "\\\\?\\UNC\\" + path.lstrip("\\").lstrip("\\")
return "\\\\?\\" + path # drive letter
def is_reparse_dir(dirpath: str, name: str) -> bool:
if not IS_WINDOWS:
return False
child = os.path.join(dirpath, name)
try:
st = os.lstat(child) # don't follow
except OSError:
return False
attrs = getattr(st, "st_file_attributes", 0)
return bool(attrs & FILE_ATTRIBUTE_REPARSE_POINT)
def local_iso(mtime: float) -> str:
dt = datetime.fromtimestamp(mtime, tz=timezone.utc).astimezone()
return dt.isoformat(timespec="seconds")
def filetype_from_name(name: str) -> str:
_, ext = os.path.splitext(name)
return ext[1:].lower() if ext else "noext"
def _walk_once(root_for_walk, follow_symlinks, on_walk_error, prune_invalid, prune_reparse, warn, stats):
"""Single pass of os.walk with given options. Yields (dirpath, dirnames, filenames)."""
for dirpath, dirnames, filenames in os.walk(root_for_walk, followlinks=follow_symlinks, onerror=on_walk_error):
# Prune dirnames in-place
if IS_WINDOWS:
keep = []
for d in dirnames:
if prune_invalid and has_invalid_win_chars(d):
warn(f"skipping directory with invalid name: {os.path.join(dirpath, d)} (name={repr(d)})")
stats["skip_invalid_dirs"] += 1
continue
if prune_reparse and is_reparse_dir(dirpath, d):
warn(f"skipping reparse/junction directory: {os.path.join(dirpath, d)}")
stats["skip_reparse_dirs"] += 1
continue
keep.append(d)
dirnames[:] = keep
yield dirpath, dirnames, filenames
def iter_files(root, follow_symlinks=False, skip_url=False, warn_each=False, stats: Counter | None = None):
r"""
Yield (full_display_path, rel_parts, name) for each file under root.
Strategy:
1) Try os.walk over the normal path first (fastest on most providers).
2) If that produces zero entries on Windows, retry using extended path (\\?\...).
Skips invalid names and (unless --follow-symlinks) reparse-point dirs. Optionally skips .url files.
"""
if stats is None:
stats = Counter()
warn = make_printer(warn_each)
root_abs = os.path.abspath(root)
entries_emitted = 0
def on_walk_error(e):
warn(f"os.walk error under {root_abs}: {e}")
stats["walk_errors"] += 1
def gen_from_walk(walk_root):
nonlocal entries_emitted
for dirpath, dirnames, filenames in _walk_once(
walk_root, follow_symlinks, on_walk_error,
prune_invalid=True, prune_reparse=IS_WINDOWS and not follow_symlinks,
warn=warn, stats=stats
):
# compute rel parts using printable (non-extended) dirpath
printable_dirpath = dirpath
if IS_WINDOWS and printable_dirpath.startswith("\\\\?\\UNC\\"):
printable_dirpath = "\\\\" + printable_dirpath[len("\\\\?\\UNC\\"):]
elif IS_WINDOWS and printable_dirpath.startswith("\\\\?\\"):
printable_dirpath = printable_dirpath[len("\\\\?\\"):]
rel_dir = os.path.relpath(printable_dirpath, root_abs)
rel_parts = [] if rel_dir == os.curdir else rel_dir.split(os.sep)
for name in filenames:
if IS_WINDOWS and has_invalid_win_chars(name):
warn(f"skipping file with invalid name: {os.path.join(printable_dirpath, name)} (name={repr(name)})")
stats["skip_invalid_files"] += 1
continue
if skip_url and name.lower().endswith(".url"):
stats["skip_url"] += 1
continue
full_display = os.path.join(printable_dirpath, name)
entries_emitted += 1
yield full_display, rel_parts, name
# First try: normal path
for item in gen_from_walk(root_abs):
yield item
# Fallback: if nothing found and Windows, try extended path
if IS_WINDOWS and entries_emitted == 0:
ext_root = win_extended(root_abs)
warn(f"normal walk returned no entries; retrying with extended path: {ext_root}")
for item in gen_from_walk(ext_root):
yield item
def max_depth_under_root(root, follow_symlinks=False, skip_url=False, warn_each=False, stats: Counter | None = None):
max_terms = 0
for _, rel_parts, _ in iter_files(root, follow_symlinks, skip_url, warn_each, stats):
if len(rel_parts) > max_terms:
max_terms = len(rel_parts)
return max_terms
def write_csv(root, out_csv, follow_symlinks=False, encoding="utf-8", newline="", skip_url=False, warn_each=False):
stats = Counter()
max_terms = max_depth_under_root(root, follow_symlinks, skip_url, warn_each, stats)
header = ["filename", "filesize", "filetype", "datemodified"] + [f"pathterm-{i}" for i in range(1, max_terms + 1)]
out_for_open = win_extended(os.path.abspath(out_csv)) if IS_WINDOWS else out_csv
warn = make_printer(warn_each)
files_written = 0
with open(out_for_open, "w", encoding=encoding, newline=newline) as f:
writer = csv.writer(f)
writer.writerow(header)
for full_display, rel_parts, name in iter_files(root, follow_symlinks, skip_url, warn_each, stats):
full_for_stat = win_extended(full_display) if IS_WINDOWS else full_display
try:
st = os.stat(full_for_stat, follow_symlinks=follow_symlinks)
except OSError as e:
warn(f"could not stat {full_display}: {e}")
stats["stat_errors"] += 1
continue
size = st.st_size
when = local_iso(st.st_mtime)
ftype = filetype_from_name(name)
terms = rel_parts + [""] * (max_terms - len(rel_parts))
writer.writerow([name, size, ftype, when, *terms])
files_written += 1
info(
"Summary: "
f"files_written={files_written} "
f"skipped_invalid_files={stats.get('skip_invalid_files', 0)} "
f"skipped_invalid_dirs={stats.get('skip_invalid_dirs', 0)} "
f"skipped_reparse_dirs={stats.get('skip_reparse_dirs', 0)} "
f"skipped_url={stats.get('skip_url', 0)} "
f"stat_errors={stats.get('stat_errors', 0)} "
f"walk_errors={stats.get('walk_errors', 0)}"
)
def main():
parser = argparse.ArgumentParser(
description="Crawl a drive/share and output a CSV tree map of files."
)
parser.add_argument("root", help=r'Root folder to scan (e.g. C:\Data or \\server\share or /mnt/share)')
parser.add_argument("-o", "--out", default="drivemap.csv", help="Output CSV file (default: drivemap.csv)")
parser.add_argument("--follow-symlinks", action="store_true", help="Follow symbolic links / junctions (use with care).")
parser.add_argument("--skip-url", action="store_true", help="Skip .url web shortcuts (common with SharePoint/OneDrive).")
parser.add_argument("--warn-each", action="store_true", help="Print every warning (default is summary only).")
args = parser.parse_args()
if not os.path.isdir(args.root):
print(f"ERROR: Root path is not a directory: {args.root}", file=sys.stderr)
sys.exit(1)
try:
write_csv(
args.root,
args.out,
follow_symlinks=args.follow_symlinks,
skip_url=args.skip_url,
warn_each=args.warn_each,
)
except KeyboardInterrupt:
print("\nInterrupted by user.", file=sys.stderr)
sys.exit(130)
print(f"Done. CSV written to: {args.out}")
if __name__ == "__main__":
main()