Skip to content

Commit

Permalink
optimize __fetch_adb_docs
Browse files Browse the repository at this point in the history
  • Loading branch information
aMahanna committed Jan 19, 2024
1 parent 3984f4b commit d11acee
Showing 1 changed file with 18 additions and 3 deletions.
21 changes: 18 additions & 3 deletions adbcug_adapter/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ def arangodb_to_cugraph(
logger.debug(f"Preparing '{v_col}' vertices")

# 1. Fetch ArangoDB vertices
v_col_cursor, v_col_size = self.__fetch_adb_docs(v_col, **adb_export_kwargs)
v_col_cursor, v_col_size = self.__fetch_adb_docs(
v_col, is_edge=False, edge_attr=None, **adb_export_kwargs
)

# 2. Process ArangoDB vertices
self.__process_adb_cursor(
Expand All @@ -161,7 +163,9 @@ def arangodb_to_cugraph(
logger.debug(f"Preparing '{e_col}' edges")

# 1. Fetch ArangoDB edges
e_col_cursor, e_col_size = self.__fetch_adb_docs(e_col, **adb_export_kwargs)
e_col_cursor, e_col_size = self.__fetch_adb_docs(
e_col, is_edge=True, edge_attr=edge_attr, **adb_export_kwargs
)

# 2. Process ArangoDB edges
self.__process_adb_cursor(
Expand Down Expand Up @@ -434,25 +438,36 @@ def cugraph_to_arangodb(
def __fetch_adb_docs(
self,
col: str,
is_edge: bool,
edge_attr: Optional[str],
**adb_export_kwargs: Any,
) -> Tuple[Cursor, int]:
"""ArangoDB -> cuGraph: Fetches ArangoDB documents within a collection.
:param col: The ArangoDB collection.
:type col: str
:param is_edge: True if **col** is an edge collection.
:type is_edge: bool
:param edge_attr: The weight attribute name of your ArangoDB edges.
:type edge_attr: str | None
:param adb_export_kwargs: Keyword arguments to specify AQL query options when
fetching documents from the ArangoDB instance.
:type adb_export_kwargs: Any
:return: The document cursor along with the total collection size.
:rtype: Tuple[arango.cursor.Cursor, int]
"""
aql_return_value = "{ _id: doc._id"
aql_return_value += ", _from: doc._from, _to: doc._to" if is_edge else ""
aql_return_value += f", {edge_attr}: doc.{edge_attr}" if edge_attr else ""
aql_return_value += "}"

col_size: int = self.__db.collection(col).count()

with get_export_spinner_progress(f"ADB Export: '{col}' ({col_size})") as p:
p.add_task(col)

cursor: Cursor = self.__db.aql.execute(
"FOR doc IN @@col RETURN doc",
f"FOR doc IN @@col RETURN {aql_return_value}",
bind_vars={"@col": col},
**{**adb_export_kwargs, **{"stream": True}},
)
Expand Down

0 comments on commit d11acee

Please sign in to comment.