Skip to content

Commit 5e1f12f

Browse files
committed
feat: post process sv cross edges
1 parent 0058d65 commit 5e1f12f

File tree

3 files changed

+58
-6
lines changed

3 files changed

+58
-6
lines changed

pychunkedgraph/graph/attributes.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,21 +106,21 @@ class Connectivity:
106106

107107
L2CrossChunkEdge = _AttributeArray(
108108
pattern=b"l2_cross_edge_%d",
109-
family_id="3",
109+
family_id="4",
110110
serializer=serializers.NumPyArray(
111111
dtype=basetypes.NODE_ID, shape=(-1, 2), compression_level=22
112112
),
113113
)
114114

115115
FakeEdges = _Attribute(
116116
key=b"fake_edges",
117-
family_id="3",
117+
family_id="4",
118118
serializer=serializers.NumPyArray(dtype=basetypes.NODE_ID, shape=(-1, 2)),
119119
)
120120

121121
CrossChunkEdge = _AttributeArray(
122122
pattern=b"atomic_cross_edges_%d",
123-
family_id="4",
123+
family_id="3",
124124
serializer=serializers.NumPyArray(
125125
dtype=basetypes.NODE_ID, shape=(-1, 2), compression_level=22
126126
),

pychunkedgraph/graph/client/bigtable/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -636,9 +636,9 @@ def _create_column_families(self):
636636
f.create()
637637
f = self._table.column_family("2")
638638
f.create()
639-
f = self._table.column_family("3")
639+
f = self._table.column_family("3", gc_rule=MaxAgeGCRule(datetime.timedelta(days=1)))
640640
f.create()
641-
f = self._table.column_family("4", gc_rule=MaxAgeGCRule(datetime.timedelta(days=1)))
641+
f = self._table.column_family("4")
642642
f.create()
643643

644644
def _get_ids_range(self, key: bytes, size: int) -> typing.Tuple:

pychunkedgraph/ingest/create/atomic_layer.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,13 @@ def _get_remapping(chunk_edges_d: dict):
101101

102102

103103
def _process_component(
104-
cg, chunk_edges_d, parent_id, node_ids, sparse_indices, remapping, time_stamp,
104+
cg,
105+
chunk_edges_d,
106+
parent_id,
107+
node_ids,
108+
sparse_indices,
109+
remapping,
110+
time_stamp,
105111
):
106112
nodes = []
107113
chunk_out_edges = [] # out = between + cross
@@ -145,3 +151,49 @@ def _get_outgoing_edges(node_id, chunk_edges_d, sparse_indices, remapping):
145151
# edges that this node is part of
146152
chunk_out_edges = np.concatenate([chunk_out_edges, edges[row_ids]])
147153
return chunk_out_edges
154+
155+
156+
def postprocess_atomic_chunk(
157+
cg: ChunkedGraph,
158+
chunk_coord: np.ndarray,
159+
time_stamp: Optional[datetime.datetime] = None,
160+
):
161+
time_stamp = get_valid_timestamp(time_stamp)
162+
163+
chunk_id = cg.get_chunk_id(
164+
layer=2, x=chunk_coord[0], y=chunk_coord[1], z=chunk_coord[2]
165+
)
166+
167+
properties = [
168+
attributes.Connectivity.CrossChunkEdge[l] for l in range(2, cg.meta.layer_count)
169+
]
170+
171+
chunk_rr = cg.range_read_chunk(
172+
chunk_id, properties=properties, time_stamp=time_stamp
173+
)
174+
175+
result = {}
176+
for l2id, raw_cx_edges in chunk_rr.items():
177+
try:
178+
cx_edges = {
179+
prop.index: val[0].value.copy() for prop, val in raw_cx_edges.items()
180+
}
181+
result[l2id] = cx_edges
182+
except KeyError:
183+
continue
184+
185+
nodes = []
186+
val_dicts = []
187+
for l2id, cx_edges in result.items():
188+
val_dict = {}
189+
for layer, edges in cx_edges.items():
190+
l2_edges = np.zeros_like(edges)
191+
l2_edges[:, 0] = l2id
192+
l2_edges[:, 1] = cg.get_parents(edges[:, 1])
193+
col = attributes.Connectivity.L2CrossChunkEdge[layer]
194+
val_dict[col] = np.unique(l2_edges, axis=0)
195+
val_dicts.append(val_dict)
196+
197+
r_key = serializers.serialize_uint64(l2id)
198+
nodes.append(cg.client.mutate_row(r_key, val_dict, time_stamp=time_stamp))
199+
cg.client.write(nodes)

0 commit comments

Comments
 (0)