-
Notifications
You must be signed in to change notification settings - Fork 5
/
pc_io.py
106 lines (82 loc) · 2.87 KB
/
pc_io.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import os
import logging
logger = logging.getLogger(__name__)
import numpy as np
import pandas as pd
import multiprocessing
import functools
from tqdm import tqdm
from pyntcloud import PyntCloud
from glob import glob
class PC:
def __init__(self, points, p_min, p_max):
self.points = points # ndarray: n.points x 3
self.p_max = p_max # [resolution, resolution, resolution]
self.p_min = p_min # [0, 0, 0]
self.data = {}
if not np.all(points <= p_max):
print('******************', points.min(), points.max())
if not np.all(points >= p_min):
print('******************', points.min(), points.max())
def __repr__(self):
return f"<PC with {self.points.shape[0]} points (p_min: {self.p_min}, p_max: {self.p_max})>"
def is_empty(self):
return self.points.shape[0] == 0
def p_mid(self):
p_min = self.p_min
p_max = self.p_max
return p_min + ((p_max - p_min) / 2.)
def df_to_pc(df, p_min, p_max):
''''''
points = df[['x','y','z']].values
return PC(points, p_min, p_max)
def pa_to_df(points):
df = pd.DataFrame(data={
'x': points[:,0],
'y': points[:,1],
'z': points[:,2]
}, dtype=np.float32)
return df
def pc_to_df(pc):
points = pc.points
return pa_to_df(points)
def load_pc(path, p_min, p_max):
logger.debug(f"Loading PC {path}")
# read a .ply from file path
pc = PyntCloud.from_file(path)
# wrap to a PC instance
ret = df_to_pc(pc.points, p_min, p_max)
logger.debug(f"Loaded PC {path}")
return ret
def write_pc(path, pc):
df = pc_to_df(pc)
write_df(path, df)
def write_df(path, df):
pc = PyntCloud(df)
pc.to_file(path)
def get_shape_data(resolution):
bbox_min = 0
bbox_max = resolution
p_max = np.array([bbox_max, bbox_max, bbox_max])
p_min = np.array([bbox_min, bbox_min, bbox_min])
dense_tensor_shape = np.concatenate([[1], p_max]).astype('int64')
return p_min, p_max, dense_tensor_shape
def get_files(input_glob):
return np.array(glob(input_glob, recursive=True))
def load_points_func(x, p_min, p_max):
xyz = load_pc(x, p_min, p_max).points
return xyz
def load_points(files, p_min, p_max, batch_size=32, processbar=True):
files_len = len(files)
with multiprocessing.Pool() as p:
logger.info('Loading PCs into memory (parallel reading)')
f = functools.partial(load_points_func, p_min=p_min, p_max=p_max)
if processbar:
points = np.array(list(tqdm(p.imap(f, files, batch_size), total=files_len)))
else:
points = np.array(list(p.imap(f, files, batch_size)))
return points
def save_pc(pc, filename, path='./viewing'):
points = pd.DataFrame(pc, columns=['x', 'y', 'z'])
cloud = PyntCloud(points)
cloud.to_file(os.path.join(path, filename))