Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(interactive): Refactor EdgeColumn Implementation #4391

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,11 @@ RTAny edge_to_rt_any(const results::Edge& edge) {
std::get<2>(edge_triplet_tuple)};
if (properties.size() == 0) {
return RTAny::from_edge(
std::tuple{label_triplet, src_vid, dst_vid, Any(), Direction::kOut});
{label_triplet, src_vid, dst_vid, Any(), Direction::kOut});
} else if (properties.size() == 1) {
LOG(FATAL) << "Not implemented.";
return RTAny::from_edge(std::tuple{label_triplet, src_vid, dst_vid,
property_to_any(properties[0]),
Direction::kOut});
return RTAny::from_edge({label_triplet, src_vid, dst_vid,
property_to_any(properties[0]), Direction::kOut});
} else {
std::vector<Any> props;
for (auto& prop : properties) {
Expand All @@ -157,7 +156,7 @@ RTAny edge_to_rt_any(const results::Edge& edge) {
Any any;
any.set_record(props);
return RTAny::from_edge(
std::tuple{label_triplet, src_vid, dst_vid, any, Direction::kOut});
{label_triplet, src_vid, dst_vid, any, Direction::kOut});
}
} // namespace runtime

Expand Down
36 changes: 17 additions & 19 deletions flex/engines/graph_db/runtime/common/accessors.h
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ class VertexPropertyVertexAccessor : public IAccessor {

class EdgeIdPathAccessor : public IAccessor {
public:
using elem_t = std::tuple<LabelTriplet, vid_t, vid_t, Any, Direction>;
using elem_t = EdgeRecord;
EdgeIdPathAccessor(const Context& ctx, int tag)
: edge_col_(*std::dynamic_pointer_cast<IEdgeColumn>(ctx.get(tag))) {}

Expand Down Expand Up @@ -407,14 +407,12 @@ class EdgePropertyPathAccessor : public IAccessor {

RTAny eval_path(size_t idx) const override {
const auto& e = col_.get_edge(idx);
return RTAny(std::get<3>(e));
return RTAny(e.prop_);
}

elem_t typed_eval_path(size_t idx) const {
const auto& e = col_.get_edge(idx);
elem_t ret;
ConvertAny<T>::to(std::get<3>(e), ret);
return ret;
return e.prop_.as<elem_t>();
}

bool is_optional() const override { return col_.is_optional(); }
Expand Down Expand Up @@ -464,30 +462,30 @@ class MultiPropsEdgePropertyPathAccessor : public IAccessor {

RTAny eval_path(size_t idx) const override {
const auto& e = col_.get_edge(idx);
auto val = std::get<3>(e);
auto id = get_index(std::get<0>(e));
if (std::get<3>(e).type != PropertyType::RecordView()) {
auto val = e.prop_;
auto id = get_index(e.label_triplet_);
if (e.prop_.type.type_enum_ != RTAnyType::RTAnyTypeImpl::kRecordView) {
CHECK(id == 0);
return RTAny(val);
} else {
auto rv = val.AsRecordView();
auto rv = val.as<RecordView>();
CHECK(id != std::numeric_limits<size_t>::max());
return RTAny(rv[id]);
}
}

elem_t typed_eval_path(size_t idx) const {
const auto& e = col_.get_edge(idx);
auto val = std::get<3>(e);
auto id = get_index(std::get<0>(e));
if (std::get<3>(e).type != PropertyType::RecordView()) {
auto val = e.prop_;
auto id = get_index(e.label_triplet_);
if (e.prop_.type.type_enum_ != RTAnyType::RTAnyTypeImpl::kRecordView) {
CHECK(id == 0);
elem_t ret;
ConvertAny<T>::to(val, ret);
return ret;

} else {
auto rv = val.AsRecordView();
auto rv = val.as<RecordView>();
CHECK(id != std::numeric_limits<size_t>::max());
auto tmp = rv[id];
elem_t ret;
Expand Down Expand Up @@ -530,12 +528,12 @@ class EdgeLabelPathAccessor : public IAccessor {

RTAny eval_path(size_t idx) const override {
const auto& e = col_.get_edge(idx);
return RTAny(static_cast<int32_t>(std::get<0>(e).edge_label));
return RTAny(static_cast<int32_t>(e.label_triplet_.edge_label));
}

elem_t typed_eval_path(size_t idx) const {
const auto& e = col_.get_edge(idx);
return static_cast<int32_t>(std::get<0>(e).edge_label);
return static_cast<int32_t>(e.label_triplet_.edge_label);
}

std::shared_ptr<IContextColumnBuilder> builder() const override {
Expand Down Expand Up @@ -632,10 +630,10 @@ class EdgeGlobalIdPathAccessor : public IAccessor {

elem_t typed_eval_path(size_t idx) const {
const auto& e = edge_col_.get_edge(idx);
auto label_id = generate_edge_label_id(std::get<0>(e).src_label,
std::get<0>(e).dst_label,
std::get<0>(e).edge_label);
return encode_unique_edge_id(label_id, std::get<1>(e), std::get<2>(e));
auto label_id = generate_edge_label_id(e.label_triplet_.src_label,
e.label_triplet_.dst_label,
e.label_triplet_.edge_label);
return encode_unique_edge_id(label_id, e.src_, e.dst_);
}

RTAny eval_path(size_t idx) const override {
Expand Down
100 changes: 88 additions & 12 deletions flex/engines/graph_db/runtime/common/columns/edge_columns.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ std::shared_ptr<IContextColumn> SDSLEdgeColumn::dup() const {
builder.reserve(edges_.size());
for (size_t i = 0; i < edges_.size(); ++i) {
auto e = get_edge(i);
builder.push_back_opt(std::get<1>(e), std::get<2>(e), std::get<3>(e));
builder.push_back_opt(e.src_, e.dst_, e.prop_);
}
return builder.finish();
}
Expand Down Expand Up @@ -58,7 +58,41 @@ std::shared_ptr<IContextColumn> SDSLEdgeColumn::shuffle(
size_t off = offsets[idx];
const auto& e = edges_[off];
builder.push_back_endpoints(e.first, e.second);
ret_props.set_any(idx, prop_col_->get(off));
ret_props.set_any(idx, prop_col_.get(), off);
}
}

return builder.finish();
}

std::shared_ptr<IContextColumn> SDSLEdgeColumn::optional_shuffle(
const std::vector<size_t>& offsets) const {
CHECK(prop_type_ != PropertyType::kRecordView);
OptionalSDSLEdgeColumnBuilder builder(dir_, label_, prop_type_);
size_t new_row_num = offsets.size();
builder.reserve(new_row_num);

if (prop_type_ == PropertyType::kEmpty) {
for (auto off : offsets) {
if (off == std::numeric_limits<size_t>::max()) {
builder.push_back_null();
} else {
const auto& e = edges_[off];
builder.push_back_endpoints(e.first, e.second);
}
}
} else {
auto& ret_props = *builder.prop_col_;
ret_props.resize(new_row_num);
for (size_t idx = 0; idx < new_row_num; ++idx) {
size_t off = offsets[idx];
if (off == std::numeric_limits<size_t>::max()) {
builder.push_back_null();
} else {
const auto& e = edges_[off];
builder.push_back_endpoints(e.first, e.second);
ret_props.set_any(idx, prop_col_.get(), off);
}
}
}

Expand All @@ -70,7 +104,7 @@ std::shared_ptr<IContextColumn> SDSLEdgeColumnBuilder::finish() {
std::make_shared<SDSLEdgeColumn>(dir_, label_, prop_type_, sub_types_);
ret->edges_.swap(edges_);
// shrink to fit
prop_col_->resize(edges_.size());
prop_col_->resize(ret->edges_.size());
ret->prop_col_ = prop_col_;
return ret;
}
Expand All @@ -80,8 +114,7 @@ std::shared_ptr<IContextColumn> BDSLEdgeColumn::dup() const {
builder.reserve(size());
for (size_t i = 0; i < size(); ++i) {
auto e = get_edge(i);
builder.push_back_opt(std::get<1>(e), std::get<2>(e), std::get<3>(e),
std::get<4>(e));
builder.push_back_opt(e.src_, e.dst_, e.prop_, e.dir_);
}
return builder.finish();
}
Expand All @@ -98,14 +131,38 @@ std::shared_ptr<IContextColumn> BDSLEdgeColumn::shuffle(
size_t off = offsets[idx];
const auto& e = edges_[off];
builder.push_back_endpoints(std::get<0>(e), std::get<1>(e), std::get<2>(e));
ret_props.set_any(idx, prop_col_->get(off));
ret_props.set_any(idx, prop_col_.get(), off);
}

return builder.finish();
}

std::shared_ptr<IContextColumn> BDSLEdgeColumn::optional_shuffle(
const std::vector<size_t>& offsets) const {
OptionalBDSLEdgeColumnBuilder builder(label_, prop_type_);
size_t new_row_num = offsets.size();
builder.reserve(new_row_num);

auto& ret_props = *builder.prop_col_;
ret_props.resize(new_row_num);
for (size_t idx = 0; idx < new_row_num; ++idx) {
size_t off = offsets[idx];
if (off == std::numeric_limits<size_t>::max()) {
builder.push_back_null();
} else {
const auto& e = edges_[off];
builder.push_back_endpoints(std::get<0>(e), std::get<1>(e),
std::get<2>(e));
ret_props.set_any(idx, prop_col_.get(), off);
}
}

return builder.finish();
}

std::shared_ptr<IContextColumn> BDSLEdgeColumnBuilder::finish() {
auto ret = std::make_shared<BDSLEdgeColumn>(label_, prop_type_);
prop_col_->resize(edges_.size());
ret->edges_.swap(edges_);
ret->prop_col_ = prop_col_;
return ret;
Expand Down Expand Up @@ -189,8 +246,7 @@ std::shared_ptr<IContextColumn> OptionalBDSLEdgeColumn::shuffle(
for (size_t idx = 0; idx < new_row_num; ++idx) {
size_t off = offsets[idx];
const auto& e = get_edge(off);
builder.push_back_opt(std::get<1>(e), std::get<2>(e), std::get<3>(e),
std::get<4>(e));
builder.push_back_opt(e.src_, e.dst_, e.prop_, e.dir_);
}
return builder.finish();
}
Expand All @@ -199,8 +255,24 @@ std::shared_ptr<IContextColumn> OptionalBDSLEdgeColumn::dup() const {
builder.reserve(edges_.size());
for (size_t i = 0; i < edges_.size(); ++i) {
auto e = get_edge(i);
builder.push_back_opt(std::get<1>(e), std::get<2>(e), std::get<3>(e),
std::get<4>(e));
builder.push_back_opt(e.src_, e.dst_, e.prop_, e.dir_);
}
return builder.finish();
}

std::shared_ptr<IContextColumn> OptionalBDSLEdgeColumn::optional_shuffle(
const std::vector<size_t>& offsets) const {
OptionalBDSLEdgeColumnBuilder builder(label_, prop_type_);
size_t new_row_num = offsets.size();
builder.reserve(new_row_num);
for (size_t idx = 0; idx < new_row_num; ++idx) {
size_t off = offsets[idx];
if (off == std::numeric_limits<size_t>::max()) {
builder.push_back_null();
continue;
}
const auto& e = get_edge(off);
builder.push_back_opt(e.src_, e.dst_, e.prop_, e.dir_);
}
return builder.finish();
}
Expand All @@ -209,6 +281,8 @@ std::shared_ptr<IContextColumn> OptionalBDSLEdgeColumnBuilder::finish() {
auto ret = std::make_shared<OptionalBDSLEdgeColumn>(label_, prop_type_);
ret->edges_.swap(edges_);
ret->prop_col_ = prop_col_;
// shrink to fit
ret->prop_col_->resize(ret->edges_.size());
return ret;
}

Expand All @@ -220,7 +294,7 @@ std::shared_ptr<IContextColumn> OptionalSDSLEdgeColumn::shuffle(
for (size_t idx = 0; idx < new_row_num; ++idx) {
size_t off = offsets[idx];
const auto& e = get_edge(off);
builder.push_back_opt(std::get<1>(e), std::get<2>(e), std::get<3>(e));
builder.push_back_opt(e.src_, e.dst_, e.prop_);
}
return builder.finish();
}
Expand All @@ -230,7 +304,7 @@ std::shared_ptr<IContextColumn> OptionalSDSLEdgeColumn::dup() const {
builder.reserve(edges_.size());
for (size_t i = 0; i < edges_.size(); ++i) {
auto e = get_edge(i);
builder.push_back_opt(std::get<1>(e), std::get<2>(e), std::get<3>(e));
builder.push_back_opt(e.src_, e.dst_, e.prop_);
}
return builder.finish();
}
Expand All @@ -239,6 +313,8 @@ std::shared_ptr<IContextColumn> OptionalSDSLEdgeColumnBuilder::finish() {
auto ret = std::make_shared<OptionalSDSLEdgeColumn>(dir_, label_, prop_type_);
ret->edges_.swap(edges_);
ret->prop_col_ = prop_col_;
// shrink to fit
ret->prop_col_->resize(ret->edges_.size());
return ret;
}

Expand Down
Loading
Loading