Skip to content

Commit

Permalink
Optimize __fetch_adb_docs (#33)
Browse files Browse the repository at this point in the history
* optimize `__fetch_adb_docs`

* cleanup: `aql_return_value`
  • Loading branch information
aMahanna committed Jan 19, 2024
1 parent 3984f4b commit 2ad3cbc
Showing 1 changed file with 27 additions and 5 deletions.
32 changes: 27 additions & 5 deletions adbcug_adapter/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ def arangodb_to_cugraph(
logger.debug(f"Preparing '{v_col}' vertices")

# 1. Fetch ArangoDB vertices
v_col_cursor, v_col_size = self.__fetch_adb_docs(v_col, **adb_export_kwargs)
v_col_cursor, v_col_size = self.__fetch_adb_docs(
v_col, is_edge=False, edge_attr=None, **adb_export_kwargs
)

# 2. Process ArangoDB vertices
self.__process_adb_cursor(
Expand All @@ -161,7 +163,9 @@ def arangodb_to_cugraph(
logger.debug(f"Preparing '{e_col}' edges")

# 1. Fetch ArangoDB edges
e_col_cursor, e_col_size = self.__fetch_adb_docs(e_col, **adb_export_kwargs)
e_col_cursor, e_col_size = self.__fetch_adb_docs(
e_col, is_edge=True, edge_attr=edge_attr, **adb_export_kwargs
)

# 2. Process ArangoDB edges
self.__process_adb_cursor(
Expand Down Expand Up @@ -216,7 +220,9 @@ def arangodb_collections_to_cugraph(
"edgeCollections": {col: set() for col in e_cols},
}

return self.arangodb_to_cugraph(name, metagraph, edge_attr, **adb_export_kwargs)
return self.arangodb_to_cugraph(
name, metagraph, edge_attr, default_edge_attr_value, **adb_export_kwargs
)

def arangodb_graph_to_cugraph(
self,
Expand Down Expand Up @@ -248,7 +254,12 @@ def arangodb_graph_to_cugraph(
e_cols: Set[str] = {ed["edge_collection"] for ed in edge_definitions}

return self.arangodb_collections_to_cugraph(
name, v_cols, e_cols, edge_attr, **adb_export_kwargs
name,
v_cols,
e_cols,
edge_attr,
default_edge_attr_value,
**adb_export_kwargs,
)

###############################
Expand Down Expand Up @@ -434,25 +445,36 @@ def cugraph_to_arangodb(
def __fetch_adb_docs(
self,
col: str,
is_edge: bool,
edge_attr: Optional[str],
**adb_export_kwargs: Any,
) -> Tuple[Cursor, int]:
"""ArangoDB -> cuGraph: Fetches ArangoDB documents within a collection.
:param col: The ArangoDB collection.
:type col: str
:param is_edge: True if **col** is an edge collection.
:type is_edge: bool
:param edge_attr: The weight attribute name of your ArangoDB edges.
:type edge_attr: str | None
:param adb_export_kwargs: Keyword arguments to specify AQL query options when
fetching documents from the ArangoDB instance.
:type adb_export_kwargs: Any
:return: The document cursor along with the total collection size.
:rtype: Tuple[arango.cursor.Cursor, int]
"""
keys = ["_id"]
keys += ["_from", "_to"] if is_edge else []
keys += [edge_attr] if is_edge and edge_attr else []
aql_return_value = f"KEEP(doc, {keys})"

col_size: int = self.__db.collection(col).count()

with get_export_spinner_progress(f"ADB Export: '{col}' ({col_size})") as p:
p.add_task(col)

cursor: Cursor = self.__db.aql.execute(
"FOR doc IN @@col RETURN doc",
f"FOR doc IN @@col RETURN {aql_return_value}",
bind_vars={"@col": col},
**{**adb_export_kwargs, **{"stream": True}},
)
Expand Down

0 comments on commit 2ad3cbc

Please sign in to comment.