diff --git a/python/gigl/common/data/load_torch_tensors.py b/python/gigl/common/data/load_torch_tensors.py index 2200f8b76..1e3ad26c7 100644 --- a/python/gigl/common/data/load_torch_tensors.py +++ b/python/gigl/common/data/load_torch_tensors.py @@ -28,10 +28,11 @@ _FEATURE_FMT = "{entity}_features" _NODE_KEY = "node" _EDGE_KEY = "edge" -_POSITIVE_LABEL_KEY = "positive_label" -_NEGATIVE_LABEL_KEY = "negative_label" +_POSITIVE_SUPERVISION_EDGES_KEY = "positive_supervision_edges" +_NEGATIVE_SUPERVISION_EDGES_KEY = "negative_supervision_edges" +# TODO (mkolodner-sc): Change positive/negative label name to positive/negative supervision edges @dataclass(frozen=True) class SerializedGraphMetadata: """ @@ -256,7 +257,7 @@ def load_torch_tensors_from_tf_record( "tf_record_dataloader": tf_record_dataloader, "output_dict": edge_output_dict, "error_dict": error_dict, - "entity_type": _POSITIVE_LABEL_KEY, + "entity_type": _POSITIVE_SUPERVISION_EDGES_KEY, "serialized_tf_record_info": serialized_graph_metadata.positive_label_entity_info, "rank": rank, }, @@ -271,7 +272,7 @@ def load_torch_tensors_from_tf_record( "tf_record_dataloader": tf_record_dataloader, "output_dict": edge_output_dict, "error_dict": error_dict, - "entity_type": _NEGATIVE_LABEL_KEY, + "entity_type": _NEGATIVE_SUPERVISION_EDGES_KEY, "serialized_tf_record_info": serialized_graph_metadata.negative_label_entity_info, "rank": rank, }, @@ -328,12 +329,12 @@ def load_torch_tensors_from_tf_record( edge_index = edge_output_dict[_ID_FMT.format(entity=_EDGE_KEY)] edge_features = edge_output_dict.get(_FEATURE_FMT.format(entity=_EDGE_KEY), None) - positive_labels = edge_output_dict.get( - _ID_FMT.format(entity=_POSITIVE_LABEL_KEY), None + positive_supervision_edges = edge_output_dict.get( + _ID_FMT.format(entity=_POSITIVE_SUPERVISION_EDGES_KEY), None ) - negative_labels = edge_output_dict.get( - _ID_FMT.format(entity=_NEGATIVE_LABEL_KEY), None + negative_supervision_edges = edge_output_dict.get( + _ID_FMT.format(entity=_NEGATIVE_SUPERVISION_EDGES_KEY), None ) if rpc_is_initialized(): @@ -351,6 +352,6 @@ def load_torch_tensors_from_tf_record( node_features=node_features, edge_index=edge_index, edge_features=edge_features, - positive_label=positive_labels, - negative_label=negative_labels, + positive_supervision_edges=positive_supervision_edges, + negative_supervision_edges=negative_supervision_edges, ) diff --git a/python/gigl/distributed/dataset_factory.py b/python/gigl/distributed/dataset_factory.py index a387d9d9f..5a265bffa 100644 --- a/python/gigl/distributed/dataset_factory.py +++ b/python/gigl/distributed/dataset_factory.py @@ -202,7 +202,7 @@ def _load_and_build_partitioned_dataset( set up beforehand, this function will throw an error. Args: serialized_graph_metadata (SerializedGraphMetadata): Serialized Graph Metadata contains serialized information for loading TFRecords across node and edge types - should_load_tensors_in_parallel (bool): Whether tensors should be loaded from serialized information in parallel or in sequence across the [node, edge, pos_label, neg_label] entity types. + should_load_tensors_in_parallel (bool): Whether tensors should be loaded from serialized information in parallel or in sequence across the [node, edge, positive_supervision_edges, negative_supervision_edges] entity types. edge_dir (Literal["in", "out"]): Edge direction of the provided graph partitioner_class (Optional[Type[DistPartitioner]]): Partitioner class to partition the graph inputs. If provided, this must be a DistPartitioner or subclass of it. If not provided, will initialize use the DistPartitioner class. @@ -238,8 +238,8 @@ def _load_and_build_partitioned_dataset( # TODO (mkolodner-sc): Move this code block (from here up to start of partitioning) to transductive splitter once that is ready if _ssl_positive_label_percentage is not None: if ( - loaded_graph_tensors.positive_label is not None - or loaded_graph_tensors.negative_label is not None + loaded_graph_tensors.positive_supervision_edges is not None + or loaded_graph_tensors.negative_supervision_edges is not None ): raise ValueError( "Cannot have loaded positive and negative labels when attempting to select self-supervised positive edges from edge index." @@ -269,7 +269,7 @@ def _load_and_build_partitioned_dataset( f"Found an unknown edge index type: {type(loaded_graph_tensors.edge_index)} when attempting to select positive labels" ) - loaded_graph_tensors.positive_label = positive_label_edges + loaded_graph_tensors.positive_supervision_edges = positive_label_edges if ( isinstance(splitter, NodeAnchorLinkSplitter) @@ -304,13 +304,15 @@ def _load_and_build_partitioned_dataset( partitioner.register_edge_features( edge_features=loaded_graph_tensors.edge_features ) - if loaded_graph_tensors.positive_label is not None: - partitioner.register_labels( - label_edge_index=loaded_graph_tensors.positive_label, is_positive=True + if loaded_graph_tensors.positive_supervision_edges is not None: + partitioner.register_supervision_edges( + supervision_edge_index=loaded_graph_tensors.positive_supervision_edges, + is_positive=True, ) - if loaded_graph_tensors.negative_label is not None: - partitioner.register_labels( - label_edge_index=loaded_graph_tensors.negative_label, is_positive=False + if loaded_graph_tensors.negative_supervision_edges is not None: + partitioner.register_supervision_edges( + supervision_edge_index=loaded_graph_tensors.negative_supervision_edges, + is_positive=False, ) # We call del so that the reference count of these registered fields is 1, @@ -321,8 +323,8 @@ def _load_and_build_partitioned_dataset( loaded_graph_tensors.node_features, loaded_graph_tensors.edge_index, loaded_graph_tensors.edge_features, - loaded_graph_tensors.positive_label, - loaded_graph_tensors.negative_label, + loaded_graph_tensors.positive_supervision_edges, + loaded_graph_tensors.negative_supervision_edges, ) del loaded_graph_tensors @@ -399,7 +401,8 @@ def _build_dataset_process( node_rank (int): Rank of the node (machine) on which this process is running node_world_size (int): World size (total #) of the nodes participating in hosting the dataset sample_edge_direction (Literal["in", "out"]): Whether edges in the graph are directed inward or outward - should_load_tensors_in_parallel (bool): Whether tensors should be loaded from serialized information in parallel or in sequence across the [node, edge, pos_label, neg_label] entity types. + should_load_tensors_in_parallel (bool): Whether tensors should be loaded from serialized information in parallel or + in sequence across the [node, edge, positive_supervision_edges, negative_supervision_edges] entity types. partitioner_class (Optional[Type[DistPartitioner]]): Partitioner class to partition the graph inputs. If provided, this must be a DistPartitioner or subclass of it. If not provided, will initialize use the DistPartitioner class. node_tf_dataset_options (TFDatasetOptions): Options provided to a tf.data.Dataset to tune how serialized node data is read. @@ -498,7 +501,8 @@ def build_dataset( you need not initialized a process_group, one will be initialized. sample_edge_direction (Union[Literal["in", "out"], str]): Whether edges in the graph are directed inward or outward. Note that this is listed as a possible string to satisfy type check, but in practice must be a Literal["in", "out"]. - should_load_tensors_in_parallel (bool): Whether tensors should be loaded from serialized information in parallel or in sequence across the [node, edge, pos_label, neg_label] entity types. + should_load_tensors_in_parallel (bool): Whether tensors should be loaded from serialized information in parallel or in sequence across the [node, edge, + positive_supervision_edges, negative_supervision_edges] entity types. partitioner_class (Optional[Type[DistPartitioner]]): Partitioner class to partition the graph inputs. If provided, this must be a DistPartitioner or subclass of it. If not provided, will initialize use the DistPartitioner class. node_tf_dataset_options (TFDatasetOptions): Options provided to a tf.data.Dataset to tune how serialized node data is read. diff --git a/python/gigl/distributed/dist_link_prediction_dataset.py b/python/gigl/distributed/dist_link_prediction_dataset.py index 24bab46e1..5a6d83255 100644 --- a/python/gigl/distributed/dist_link_prediction_dataset.py +++ b/python/gigl/distributed/dist_link_prediction_dataset.py @@ -690,8 +690,12 @@ def build( gc.collect() # Initializing Positive and Negative Edge Labels - self._positive_edge_label = partition_output.partitioned_positive_labels - self._negative_edge_label = partition_output.partitioned_negative_labels + self._positive_edge_label = ( + partition_output.partitioned_positive_supervision_edges + ) + self._negative_edge_label = ( + partition_output.partitioned_negative_supervision_edges + ) # TODO (mkolodner-sc): Enable custom params for init_graph, init_node_features, and init_edge_features diff --git a/python/gigl/distributed/dist_partitioner.py b/python/gigl/distributed/dist_partitioner.py index d1eb4b7ac..3dbae4805 100644 --- a/python/gigl/distributed/dist_partitioner.py +++ b/python/gigl/distributed/dist_partitioner.py @@ -83,7 +83,7 @@ class DistPartitioner: This class is based on GLT's DistRandomPartitioner class (https://github.com/alibaba/graphlearn-for-pytorch/blob/main/graphlearn_torch/python/distributed/dist_random_partitioner.py) and has been optimized for better flexibility and memory management. We assume that init_rpc() and init_worker_group have been called to initialize the rpc and context, respectively, prior to this class. This class aims to partition homogeneous and heterogeneous input data, such as nodes, - node features, edges, edge features, and any supervision labels across multiple machines. This class also produces partition books for edges and + node features, edges, edge features, and any supervision edges across multiple machines. This class also produces partition books for edges and nodes, which are 1-d tensors that indicate which rank each node id and edge id are stored on. For example, the node partition book [0, 0, 1, 2] @@ -103,25 +103,27 @@ class DistPartitioner: partitioner.register_nodes(node_ids) del node_ids # Del reference to node_ids outside of DistPartitioner to allow memory cleanup within the class partitioner.partition_nodes() - # We may optionally want to call gc.collect() to ensure that any lingering memory is cleaned up, which may happen in cases where only a subset of inputs are partitioned (i.e no feats or labels) + # We may optionally want to call gc.collect() to ensure that any lingering memory is cleaned up, + # which may happen in cases where only a subset of inputs are partitioned (i.e no feats or supervision edges) gc.collect() ``` Option 2: User wants to partition all parts of a graph together and in sequence ``` - partitioner = DistPartitioner(node_ids, edge_index, node_features, edge_features, pos_labels, neg_labels) + partitioner = DistPartitioner(node_ids, edge_index, node_features, edge_features, positive_supervision_edges, negative_supervision_edges) # Register is called in the __init__ functions and doesn't need to be called at all outside the class. del ( node_ids, edge_index, node_features, edge_features, - pos_labels, - neg_labels + positive_supervision_edges, + negative_supervision_edges ) # Del reference to tensors outside of DistPartitioner to allow memory cleanup within the class partitioner.partition() - # We may optionally want to call gc.collect() to ensure that any lingering memory is cleaned up, which may happen in cases where only a subset of inputs are partitioned (i.e no feats or labels) + # We may optionally want to call gc.collect() to ensure that any lingering memory is cleaned up, + # which may happen in cases where only a subset of inputs are partitioned (i.e no feats or supervision edges) gc.collect() ``` @@ -153,10 +155,10 @@ def __init__( edge_features: Optional[ Union[torch.Tensor, dict[EdgeType, torch.Tensor]] ] = None, - positive_labels: Optional[ + positive_supervision_edges: Optional[ Union[torch.Tensor, dict[EdgeType, torch.Tensor]] ] = None, - negative_labels: Optional[ + negative_supervision_edges: Optional[ Union[torch.Tensor, dict[EdgeType, torch.Tensor]] ] = None, ): @@ -171,8 +173,10 @@ def __init__( node_features (Optional[Union[torch.Tensor, dict[NodeType, torch.Tensor]]]): Optionally registered node feats from input. Tensors should be of shope [num_nodes_on_current_rank, node_feat_dim] edge_index (Optional[Union[torch.Tensor, dict[EdgeType, torch.Tensor]]]): Optionally registered edge indexes from input. Tensors should be of shape [2, num_edges_on_current_rank] edge_features (Optional[Union[torch.Tensor, dict[EdgeType, torch.Tensor]]]): Optionally registered edge features from input. Tensors should be of shape [num_edges_on_current_rank, edge_feat_dim] - positive_labels (Optional[Union[torch.Tensor, dict[EdgeType, torch.Tensor]]]): Optionally registered positive labels from input. Tensors should be of shape [2, num_pos_labels_on_current_rank] - negative_labels (Optional[Union[torch.Tensor, dict[EdgeType, torch.Tensor]]]): Optionally registered negative labels from input. Tensors should be of shape [2, num_neg_labels_on_current_rank] + positive_supervision_edges (Optional[Union[torch.Tensor, dict[EdgeType, torch.Tensor]]]): Optionally registered positive supervision edges from input. + Tensors should be of shape [2, num_positive_supervision_edges_on_current_rank] + negative_supervision_edges (Optional[Union[torch.Tensor, dict[EdgeType, torch.Tensor]]]): Optionally registered negative supervision edges from input. + Tensors should be of shape [2, num_negative_supervision_edges_on_current_rank] """ self._world_size: int @@ -198,10 +202,14 @@ def __init__( self._edge_feat: Optional[dict[EdgeType, torch.Tensor]] = None self._edge_feat_dim: Optional[dict[EdgeType, int]] = None - # TODO (mkolodner-sc): Deprecate the need for explicitly storing labels are part of this class, leveraging + # TODO (mkolodner-sc): Deprecate the need for explicitly storing supervision edges are part of this class, leveraging # heterogeneous support instead - self._positive_label_edge_index: Optional[dict[EdgeType, torch.Tensor]] = None - self._negative_label_edge_index: Optional[dict[EdgeType, torch.Tensor]] = None + self._positive_supervision_edge_index: Optional[ + dict[EdgeType, torch.Tensor] + ] = None + self._negative_supervision_edge_index: Optional[ + dict[EdgeType, torch.Tensor] + ] = None if node_ids is not None: self.register_node_ids(node_ids=node_ids) @@ -215,11 +223,15 @@ def __init__( if edge_features is not None: self.register_edge_features(edge_features=edge_features) - if positive_labels is not None: - self.register_labels(label_edge_index=positive_labels, is_positive=True) + if positive_supervision_edges is not None: + self.register_supervision_edges( + supervision_edge_index=positive_supervision_edges, is_positive=True + ) - if negative_labels is not None: - self.register_labels(label_edge_index=negative_labels, is_positive=False) + if negative_supervision_edges is not None: + self.register_supervision_edges( + supervision_edge_index=negative_supervision_edges, is_positive=False + ) def _assert_data_type_consistency( self, @@ -541,43 +553,45 @@ def register_edge_features( for edge_type in input_edge_features: self._edge_feat_dim[edge_type] = input_edge_features[edge_type].shape[1] - def register_labels( + def register_supervision_edges( self, - label_edge_index: Union[torch.Tensor, dict[EdgeType, torch.Tensor]], + supervision_edge_index: Union[torch.Tensor, dict[EdgeType, torch.Tensor]], is_positive: bool, ) -> None: """ - Registers the positive or negative label to the partitioner. Note that for the homogeneous case, - all edge types of the graph must be present in the label edge index dictionary. + Registers the positive or negative supervision edges to the partitioner. Note that for the homogeneous case, + all edge types of the graph must be present in the supervision edge index dictionary. - For optimal memory management, it is recommended that the reference to the label tensor be deleted + For optimal memory management, it is recommended that the reference to the supervision edges tensor be deleted after calling this function using del , as maintaining both original and intermediate tensors can cause OOM concerns. We do not need to perform `all_gather` calls here since register_edge_index is responsible for determining total number of edges across all ranks and inferring edge ids. Args: - label_edge_index (Union[torch.Tensor, dict[EdgeType, torch.Tensor]]): Input positive or negative labels which is either a torch.Tensor if homogeneous or a Dict if heterogeneous - is_positive (bool): Whether positive labels are currently being registered. If False, labels will be registered as negative + supervision_edge_index (Union[torch.Tensor, dict[EdgeType, torch.Tensor]]): Input positive or negative supervision edges which is either a torch.Tensor if homogeneous or a Dict if heterogeneous + is_positive (bool): Whether positive supervision edges are currently being registered. If False, supervision edges will be registered as negative """ self._assert_and_get_rpc_setup() - input_label_edge_index = self._convert_edge_entity_to_heterogeneous_format( - input_edge_entity=label_edge_index + input_supervision_edge_index = ( + self._convert_edge_entity_to_heterogeneous_format( + input_edge_entity=supervision_edge_index + ) ) assert ( - input_label_edge_index - ), "Label edge index is an empty dictionary. Please provide label edge indices to register." + input_supervision_edge_index + ), "Supervision edge index is an empty dictionary. Please provide supervision edge indices to register." if is_positive: - logger.info("Registering Positive Labels ...") - self._positive_label_edge_index = convert_to_tensor( - input_label_edge_index, dtype=torch.int64 + logger.info("Registering Positive Supervision Edges ...") + self._positive_supervision_edge_index = convert_to_tensor( + input_supervision_edge_index, dtype=torch.int64 ) else: - logger.info("Registering Negative Labels ...") - self._negative_label_edge_index = convert_to_tensor( - input_label_edge_index, dtype=torch.int64 + logger.info("Registering Negative Supervision Edges ...") + self._negative_supervision_edge_index = convert_to_tensor( + input_supervision_edge_index, dtype=torch.int64 ) def _partition_single_chunk_data( @@ -1034,22 +1048,23 @@ def _edge_feature_pfn(edge_feature_ids, _): return current_graph_part, current_feat_part, edge_partition_book - def _partition_label_edge_index( + def _partition_supervision_edges( self, node_partition_book: dict[NodeType, PartitionBook], is_positive: bool, edge_type: EdgeType, ) -> torch.Tensor: """ - Partitions labels according to the node partition book. + Partitions supervision edges according to the node partition book. Args: node_partition_book (dict[NodeType, PartitionBook]): The partition book of nodes - is_positive (bool): Whether positive labels are currently being registered. If False, negative labels will be partitioned. + is_positive (bool): Whether positive supervision edges are currently being registered. + If False, negative supervision edges will be partitioned. edge_type (EdgeType): Edge type of input data, must be specified if heterogeneous Returns: - torch.Tensor: Edge index tensor of positive or negative labels, depending on is_positive flag + torch.Tensor: Edge index tensor of positive or negative supervision edges, depending on is_positive flag """ start_time = time.time() @@ -1061,16 +1076,16 @@ def _partition_label_edge_index( target_node_partition_book = node_partition_book[edge_type.src_node_type] if is_positive: assert ( - self._positive_label_edge_index is not None - ), "Must register positive labels prior to partitioning them" - label_edge_index = self._positive_label_edge_index[edge_type] + self._positive_supervision_edge_index is not None + ), "Must register positive supervision edges prior to partitioning them" + supervision_edge_index = self._positive_supervision_edge_index[edge_type] else: assert ( - self._negative_label_edge_index is not None - ), "Must register negative labels prior to partitioning them" - label_edge_index = self._negative_label_edge_index[edge_type] + self._negative_supervision_edge_index is not None + ), "Must register negative supervision edges prior to partitioning them" + supervision_edge_index = self._negative_supervision_edge_index[edge_type] - def _label_pfn(source_node_ids, _): + def _supervision_edge_pfn(source_node_ids, _): return target_node_partition_book[source_node_ids] # partitioned_chunks is a list of tuples. Each tuple is the the partitioned @@ -1079,45 +1094,45 @@ def _label_pfn(source_node_ids, _): # node IDs. partitioned_chunks, _ = self._partition_by_chunk( input_data=( - label_edge_index[0], - label_edge_index[1], + supervision_edge_index[0], + supervision_edge_index[1], ), # 'partition_fn' takes 'val_indices' as input, uses it as keys for partition, # and returns the partition index. - rank_indices=label_edge_index[0], - partition_function=_label_pfn, - total_val_size=label_edge_index[0].size(0), + rank_indices=supervision_edge_index[0], + partition_function=_supervision_edge_pfn, + total_val_size=supervision_edge_index[0].size(0), generate_pb=False, ) - del label_edge_index + del supervision_edge_index if is_positive: # This assert is added to pass mypy type check, in practice we will not see this fail assert ( - self._positive_label_edge_index is not None - ), "Must register positive labels prior to partitioning them" + self._positive_supervision_edge_index is not None + ), "Must register positive supervision edges prior to partitioning them" - del self._positive_label_edge_index[edge_type] - if len(self._positive_label_edge_index) == 0: - self._positive_label_edge_index = None + del self._positive_supervision_edge_index[edge_type] + if len(self._positive_supervision_edge_index) == 0: + self._positive_supervision_edge_index = None else: # This assert is added to pass mypy type check, in practice we will not see this fail assert ( - self._negative_label_edge_index is not None - ), "Must register negative labels prior to partitioning them" + self._negative_supervision_edge_index is not None + ), "Must register negative supervision edges prior to partitioning them" - del self._negative_label_edge_index[edge_type] - if len(self._negative_label_edge_index) == 0: - self._negative_label_edge_index = None + del self._negative_supervision_edge_index[edge_type] + if len(self._negative_supervision_edge_index) == 0: + self._negative_supervision_edge_index = None gc.collect() # Combine the partitioned source and destination node IDs into a single 2D tensor if len(partitioned_chunks) == 0: - partitioned_label_edge_index = torch.empty((2, 0)) + partitioned_supervision_edge_index = torch.empty((2, 0)) else: - partitioned_label_edge_index = torch.stack( + partitioned_supervision_edge_index = torch.stack( [ torch.cat([src_ids for src_ids, _ in partitioned_chunks]), torch.cat([dst_ids for _, dst_ids in partitioned_chunks]), @@ -1130,10 +1145,10 @@ def _label_pfn(source_node_ids, _): gc.collect() logger.info( - f"Edge label partitioning for edge type {edge_type} finished, took {time.time() - start_time:.3f}s" + f"Edge supervision edge partitioning for edge type {edge_type} finished, took {time.time() - start_time:.3f}s" ) - return partitioned_label_edge_index + return partitioned_supervision_edge_index def partition_node(self) -> Union[PartitionBook, dict[NodeType, PartitionBook]]: """ @@ -1346,41 +1361,46 @@ def partition_edge_index_and_edge_features( edge_partition_book, ) - def partition_labels( + def partition_supervision_edges( self, node_partition_book: Union[PartitionBook, dict[NodeType, PartitionBook]], is_positive: bool, ) -> Union[torch.Tensor, dict[EdgeType, torch.Tensor]]: """ - Partitions positive or negative labels of a graph. If heterogeneous, partitions labels for all edge type. + Partitions positive or negative supervision edges of a graph. If heterogeneous, partitions supervision edges for all edge type. Must call `partition_node` first to get the node partition book as input. - Note that labels are always partitioned by the source node, since the edges are always assumed to be anchor_node -> to -> supervision node. + Note that supervision edges are always partitioned by the source node, since the edges are always assumed to be anchor_node -> to -> supervision node. Args: node_partition_book (Union[PartitionBook, dict[NodeType, PartitionBook]]): The computed Node Partition Book - is_positive (bool): Whether positive labels are currently being registered. If False, negative labels will be partitioned. + is_positive (bool): Whether positive supervision edges are currently being registered. If False, negative supervision edges will be partitioned. Returns: - Union[torch.Tensor, dict[EdgeType, torch.Tensor]]: Returns the edge indices for partitioned positive or negative label, dependent on the is_positive flag + Union[torch.Tensor, dict[EdgeType, torch.Tensor]]: Returns the edge indices for partitioned positive or + negative supervision edges, dependent on the is_positive flag """ self._assert_and_get_rpc_setup() if is_positive: assert ( - self._positive_label_edge_index is not None - ), "Must register positive labels prior to partitioning them" + self._positive_supervision_edge_index is not None + ), "Must register positive supervision edges prior to partitioning them" - edge_label_types = sorted(self._positive_label_edge_index.keys()) + supervision_edge_types = sorted( + self._positive_supervision_edge_index.keys() + ) - logger.info("Partitioning Positive Labels ...") + logger.info("Partitioning Positive Supervision Edges ...") else: assert ( - self._negative_label_edge_index is not None - ), "Must register negative labels partitioning them" + self._negative_supervision_edge_index is not None + ), "Must register negative supervision edges partitioning them" - edge_label_types = sorted(self._negative_label_edge_index.keys()) + supervision_edge_types = sorted( + self._negative_supervision_edge_index.keys() + ) - logger.info("Partitioning Negative Labels ...") + logger.info("Partitioning Negative Supervision Edges ...") start_time = time.time() @@ -1396,9 +1416,11 @@ def partition_labels( is_subset=False, ) - partitioned_label_edge_index: dict[EdgeType, torch.Tensor] = {} - for edge_type in edge_label_types: - partitioned_label_edge_index[edge_type] = self._partition_label_edge_index( + partitioned_supervision_edge_index: dict[EdgeType, torch.Tensor] = {} + for edge_type in supervision_edge_types: + partitioned_supervision_edge_index[ + edge_type + ] = self._partition_supervision_edges( node_partition_book=transformed_node_partition_book, is_positive=is_positive, edge_type=edge_type, @@ -1415,9 +1437,9 @@ def partition_labels( ) if self._is_input_homogeneous: - return partitioned_label_edge_index[DEFAULT_HOMOGENEOUS_EDGE_TYPE] + return partitioned_supervision_edge_index[DEFAULT_HOMOGENEOUS_EDGE_TYPE] else: - return partitioned_label_edge_index + return partitioned_supervision_edge_index def partition( self, @@ -1434,7 +1456,7 @@ def partition( start_time = time.time() # Node partition should happen at the very beginning, as edge partition - # and label partition depends on node partition book. + # and supervision edge partition depends on node partition book. node_partition_book = self.partition_node() # Partition edge and clean up input edge data. @@ -1453,15 +1475,15 @@ def partition( else: partitioned_node_features = None - if self._positive_label_edge_index is not None: - partitioned_positive_edge_index = self.partition_labels( + if self._positive_supervision_edge_index is not None: + partitioned_positive_edge_index = self.partition_supervision_edges( node_partition_book=node_partition_book, is_positive=True ) else: partitioned_positive_edge_index = None - if self._negative_label_edge_index is not None: - partitioned_negative_edge_index = self.partition_labels( + if self._negative_supervision_edge_index is not None: + partitioned_negative_edge_index = self.partition_supervision_edges( node_partition_book=node_partition_book, is_positive=False ) else: @@ -1477,6 +1499,6 @@ def partition( partitioned_edge_index=partitioned_edge_index, partitioned_node_features=partitioned_node_features, partitioned_edge_features=partitioned_edge_features, - partitioned_positive_labels=partitioned_positive_edge_index, - partitioned_negative_labels=partitioned_negative_edge_index, + partitioned_positive_supervision_edges=partitioned_positive_edge_index, + partitioned_negative_supervision_edges=partitioned_negative_edge_index, ) diff --git a/python/gigl/types/graph.py b/python/gigl/types/graph.py index 26a504675..abaf614ff 100644 --- a/python/gigl/types/graph.py +++ b/python/gigl/types/graph.py @@ -81,12 +81,12 @@ class PartitionOutput: ] # Positive edge indices on current rank, May be None if positive edge labels are not partitioned - partitioned_positive_labels: Optional[ + partitioned_positive_supervision_edges: Optional[ Union[torch.Tensor, dict[EdgeType, torch.Tensor]] ] # Negative edge indices on current rank, May be None if negative edge labels are not partitioned - partitioned_negative_labels: Optional[ + partitioned_negative_supervision_edges: Optional[ Union[torch.Tensor, dict[EdgeType, torch.Tensor]] ] @@ -145,10 +145,14 @@ class LoadedGraphTensors: edge_index: Union[torch.Tensor, dict[EdgeType, torch.Tensor]] # Unpartitioned Edge Features edge_features: Optional[Union[torch.Tensor, dict[EdgeType, torch.Tensor]]] - # Unpartitioned Positive Edge Label - positive_label: Optional[Union[torch.Tensor, dict[EdgeType, torch.Tensor]]] - # Unpartitioned Negative Edge Label - negative_label: Optional[Union[torch.Tensor, dict[EdgeType, torch.Tensor]]] + # Unpartitioned Positive Supervision Edges + positive_supervision_edges: Optional[ + Union[torch.Tensor, dict[EdgeType, torch.Tensor]] + ] + # Unpartitioned Negative Supervision Edges + negative_supervision_edges: Optional[ + Union[torch.Tensor, dict[EdgeType, torch.Tensor]] + ] def treat_labels_as_edges(self, edge_dir: Literal["in", "out"]) -> None: """ @@ -157,13 +161,13 @@ def treat_labels_as_edges(self, edge_dir: Literal["in", "out"]) -> None: outwards in form (`anchor_node_type`, `relation`, `supervision_node_type`), and all edges in the edge index must be in the same direction. This function requires the following conditions and will throw if they are not met: - 1. The positive_label is not None + 1. The positive_supervision_edges is not None Args: edge_dir: The edge direction of the graph. """ - if self.positive_label is None: + if self.positive_supervision_edges is None: raise ValueError( "Cannot treat labels as edges when positive label is None." ) @@ -178,83 +182,91 @@ def treat_labels_as_edges(self, edge_dir: Literal["in", "out"]) -> None: else: main_edge_type = None - if isinstance(self.positive_label, torch.Tensor): + if isinstance(self.positive_supervision_edges, torch.Tensor): if main_edge_type is None: raise ValueError( "Detected multiple edge types in provided edge_index, but no edge types specified for provided positive label." ) - positive_label_edge_type = message_passing_to_positive_label(main_edge_type) + positive_supervision_edge_edge_type = ( + message_passing_to_positive_supervision_edges(main_edge_type) + ) labeled_edge_type, edge_index = _get_label_edges( - labeled_edge_index=self.positive_label, + labeled_edge_index=self.positive_supervision_edges, edge_dir=edge_dir, - labeled_edge_type=positive_label_edge_type, + labeled_edge_type=positive_supervision_edge_edge_type, ) edge_index_with_labels[labeled_edge_type] = edge_index logger.info( - f"Treating homogeneous positive labels as edge type {positive_label_edge_type}." + f"Treating homogeneous positive labels as edge type {positive_supervision_edge_edge_type}." ) - elif isinstance(self.positive_label, dict): + elif isinstance(self.positive_supervision_edges, dict): for ( - positive_label_type, - positive_label_tensor, - ) in self.positive_label.items(): - positive_label_edge_type = message_passing_to_positive_label( - positive_label_type + positive_supervision_edge_type, + positive_supervision_edge_tensor, + ) in self.positive_supervision_edges.items(): + positive_supervision_edge_edge_type = ( + message_passing_to_positive_supervision_edges( + positive_supervision_edge_type + ) ) labeled_edge_type, edge_index = _get_label_edges( - labeled_edge_index=positive_label_tensor, + labeled_edge_index=positive_supervision_edge_tensor, edge_dir=edge_dir, - labeled_edge_type=positive_label_edge_type, + labeled_edge_type=positive_supervision_edge_edge_type, ) edge_index_with_labels[labeled_edge_type] = edge_index logger.info( - f"Treating heterogeneous positive labels {positive_label_type} as edge type {positive_label_edge_type}." + f"Treating heterogeneous positive labels {positive_supervision_edge_type} as edge type {positive_supervision_edge_edge_type}." ) - if isinstance(self.negative_label, torch.Tensor): + if isinstance(self.negative_supervision_edges, torch.Tensor): if main_edge_type is None: raise ValueError( "Detected multiple edge types in provided edge_index, but no edge types specified for provided negative label." ) - negative_label_edge_type = message_passing_to_negative_label(main_edge_type) + negative_supervision_edge_edge_type = ( + message_passing_to_negative_supervision_edges(main_edge_type) + ) labeled_edge_type, edge_index = _get_label_edges( - labeled_edge_index=self.negative_label, + labeled_edge_index=self.negative_supervision_edges, edge_dir=edge_dir, - labeled_edge_type=negative_label_edge_type, + labeled_edge_type=negative_supervision_edge_edge_type, ) edge_index_with_labels[labeled_edge_type] = edge_index logger.info( - f"Treating homogeneous negative labels as edge type {negative_label_edge_type}." + f"Treating homogeneous negative labels as edge type {negative_supervision_edge_edge_type}." ) - elif isinstance(self.negative_label, dict): + elif isinstance(self.negative_supervision_edges, dict): for ( - negative_label_type, - negative_label_tensor, - ) in self.negative_label.items(): - negative_label_edge_type = message_passing_to_negative_label( - negative_label_type + negative_supervision_edge_type, + negative_supervision_edge_tensor, + ) in self.negative_supervision_edges.items(): + negative_supervision_edge_edge_type = ( + message_passing_to_negative_supervision_edges( + negative_supervision_edge_type + ) ) labeled_edge_type, edge_index = _get_label_edges( - labeled_edge_index=negative_label_tensor, + labeled_edge_index=negative_supervision_edge_tensor, edge_dir=edge_dir, - labeled_edge_type=negative_label_edge_type, + labeled_edge_type=negative_supervision_edge_edge_type, ) edge_index_with_labels[labeled_edge_type] = edge_index logger.info( - f"Treating heterogeneous negative labels {negative_label_type} as edge type {negative_label_edge_type}." + f"Treating heterogeneous negative labels {negative_supervision_edge_type} as edge type {negative_supervision_edge_edge_type}." ) self.node_ids = to_heterogeneous_node(self.node_ids) self.node_features = to_heterogeneous_node(self.node_features) self.edge_index = edge_index_with_labels self.edge_features = to_heterogeneous_edge(self.edge_features) - self.positive_label = None - self.negative_label = None + self.positive_supervision_edges = None + self.negative_supervision_edges = None gc.collect() -def message_passing_to_positive_label( +def message_passing_to_positive_supervision_edges( message_passing_edge_type: _EdgeType, ) -> _EdgeType: """Convert a message passing edge type to a positive label edge type. @@ -278,7 +290,7 @@ def message_passing_to_positive_label( return edge_type -def message_passing_to_negative_label( +def message_passing_to_negative_supervision_edges( message_passing_edge_type: _EdgeType, ) -> _EdgeType: """Convert a message passing edge type to a negative label edge type. @@ -330,18 +342,24 @@ def select_label_edge_types( Returns: tuple[EdgeType, Optional[EdgeType]]: A tuple containing the positive label edge type and optionally the negative label edge type. """ - positive_label_type = None - negative_label_type = None + positive_supervision_edge_type = None + negative_supervision_edge_type = None for edge_type in edge_entities: - if message_passing_to_positive_label(message_passing_edge_type) == edge_type: - positive_label_type = edge_type - if message_passing_to_negative_label(message_passing_edge_type) == edge_type: - negative_label_type = edge_type - if positive_label_type is None: + if ( + message_passing_to_positive_supervision_edges(message_passing_edge_type) + == edge_type + ): + positive_supervision_edge_type = edge_type + if ( + message_passing_to_negative_supervision_edges(message_passing_edge_type) + == edge_type + ): + negative_supervision_edge_type = edge_type + if positive_supervision_edge_type is None: raise ValueError( - f"Could not find positive label edge type for message passing edge type {message_passing_to_positive_label(message_passing_edge_type)} from edge entities {edge_entities}." + f"Could not find positive label edge type for message passing edge type {message_passing_to_positive_supervision_edges(message_passing_edge_type)} from edge entities {edge_entities}." ) - return positive_label_type, negative_label_type + return positive_supervision_edge_type, negative_supervision_edge_type # Entities that represent a graph, somehow. diff --git a/python/gigl/utils/data_splitters.py b/python/gigl/utils/data_splitters.py index a7272cd4d..62b5b076f 100644 --- a/python/gigl/utils/data_splitters.py +++ b/python/gigl/utils/data_splitters.py @@ -23,8 +23,8 @@ from gigl.types.graph import ( DEFAULT_HOMOGENEOUS_EDGE_TYPE, DEFAULT_HOMOGENEOUS_NODE_TYPE, - message_passing_to_negative_label, - message_passing_to_positive_label, + message_passing_to_negative_supervision_edges, + message_passing_to_positive_supervision_edges, reverse_edge_type, ) @@ -236,10 +236,10 @@ def __init__( self._labeled_edge_types: Sequence[EdgeType] if should_convert_labels_to_edges: labeled_edge_types = [ - message_passing_to_positive_label(supervision_edge_type) + message_passing_to_positive_supervision_edges(supervision_edge_type) for supervision_edge_type in supervision_edge_types ] + [ - message_passing_to_negative_label(supervision_edge_type) + message_passing_to_negative_supervision_edges(supervision_edge_type) for supervision_edge_type in supervision_edge_types ] # If the edge direction is "in", we must reverse the labeled edge type, since separately provided labels are expected to be initially outgoing, and all edges @@ -744,6 +744,7 @@ def _check_node_ids(node_ids: torch.Tensor): raise ValueError("Expected non-empty node_ids tensor.") +# TODO (mkolodner-sc): Change positive/negative label name to positive/negative supervision edges def select_ssl_positive_label_edges( edge_index: torch.Tensor, positive_label_percentage: float ) -> torch.Tensor: diff --git a/python/tests/test_assets/distributed/constants.py b/python/tests/test_assets/distributed/constants.py index d1f534a23..636a9b02f 100644 --- a/python/tests/test_assets/distributed/constants.py +++ b/python/tests/test_assets/distributed/constants.py @@ -162,10 +162,10 @@ class TestGraphData: edge_features: dict[EdgeType, torch.Tensor] # Positive edge label tensor for mocked data - positive_labels: dict[EdgeType, torch.Tensor] + positive_supervision_edges: dict[EdgeType, torch.Tensor] # Input negative edge label tensor to partitioner for mocked data - negative_labels: dict[EdgeType, torch.Tensor] + negative_supervision_edges: dict[EdgeType, torch.Tensor] RANK_TO_MOCKED_GRAPH: Final[dict[int, TestGraphData]] = { @@ -185,11 +185,11 @@ class TestGraphData: edge_features={ USER_TO_USER_EDGE_TYPE: MOCKED_U2U_EDGE_FEATURES_ON_RANK_ZERO, }, - positive_labels={ + positive_supervision_edges={ USER_TO_USER_EDGE_TYPE: MOCKED_U2U_POS_EDGE_INDEX_ON_RANK_ZERO, USER_TO_ITEM_EDGE_TYPE: MOCKED_U2I_POS_EDGE_INDEX_ON_RANK_ZERO, }, - negative_labels={ + negative_supervision_edges={ USER_TO_USER_EDGE_TYPE: MOCKED_U2U_NEG_EDGE_INDEX_ON_RANK_ZERO, USER_TO_ITEM_EDGE_TYPE: MOCKED_U2I_NEG_EDGE_INDEX_ON_RANK_ZERO, }, @@ -210,11 +210,11 @@ class TestGraphData: edge_features={ USER_TO_USER_EDGE_TYPE: MOCKED_U2U_EDGE_FEATURES_ON_RANK_ONE, }, - positive_labels={ + positive_supervision_edges={ USER_TO_USER_EDGE_TYPE: MOCKED_U2U_POS_EDGE_INDEX_ON_RANK_ONE, USER_TO_ITEM_EDGE_TYPE: MOCKED_U2I_POS_EDGE_INDEX_ON_RANK_ONE, }, - negative_labels={ + negative_supervision_edges={ USER_TO_USER_EDGE_TYPE: MOCKED_U2U_NEG_EDGE_INDEX_ON_RANK_ONE, USER_TO_ITEM_EDGE_TYPE: MOCKED_U2I_NEG_EDGE_INDEX_ON_RANK_ONE, }, @@ -275,7 +275,7 @@ class TestGraphData: dim=0, ), }, - positive_labels={ + positive_supervision_edges={ USER_TO_USER_EDGE_TYPE: torch.cat( ( MOCKED_U2U_POS_EDGE_INDEX_ON_RANK_ZERO, @@ -291,7 +291,7 @@ class TestGraphData: dim=1, ), }, - negative_labels={ + negative_supervision_edges={ USER_TO_USER_EDGE_TYPE: torch.cat( ( MOCKED_U2U_NEG_EDGE_INDEX_ON_RANK_ZERO, diff --git a/python/tests/test_assets/distributed/run_distributed_dataset.py b/python/tests/test_assets/distributed/run_distributed_dataset.py index c150a5469..1bae70999 100644 --- a/python/tests/test_assets/distributed/run_distributed_dataset.py +++ b/python/tests/test_assets/distributed/run_distributed_dataset.py @@ -62,7 +62,8 @@ def run_distributed_dataset( world_size (int): World size of the current process mocked_dataset_info (MockedDatasetInfo): Mocked Dataset Metadata for current run - should_load_tensors_in_parallel (bool): Whether tensors should be loaded from serialized information in parallel or in sequence across the [node, edge, pos_label, neg_label] entity types. + should_load_tensors_in_parallel (bool): Whether tensors should be loaded from serialized information in parallel or + in sequence across the [node, edge, positive_supervision_edges, negative_supervision_edges] entity types. output_dict (Optional[MutableMapping[int, DistLinkPredictionDataset]]): Dict initialized by mp.Manager().dict() in which outputs will be written to partitioner_class (Optional[Type[DistPartitioner]]): Optional partitioner class to pass into `build_dataset` splitter (Optional[Union[NodeAnchorLinkSplitter, NodeSplitter]]): Provided splitter for testing diff --git a/python/tests/test_assets/distributed/run_distributed_partitioner.py b/python/tests/test_assets/distributed/run_distributed_partitioner.py index cbfa68f2c..aa75b5959 100644 --- a/python/tests/test_assets/distributed/run_distributed_partitioner.py +++ b/python/tests/test_assets/distributed/run_distributed_partitioner.py @@ -51,23 +51,27 @@ def run_distributed_partitioner( edge_index: Union[torch.Tensor, dict[EdgeType, torch.Tensor]] node_features: Union[torch.Tensor, dict[NodeType, torch.Tensor]] edge_features: Union[torch.Tensor, dict[EdgeType, torch.Tensor]] - positive_labels: Union[torch.Tensor, dict[EdgeType, torch.Tensor]] - negative_labels: Union[torch.Tensor, dict[EdgeType, torch.Tensor]] + positive_supervision_edges: Union[torch.Tensor, dict[EdgeType, torch.Tensor]] + negative_supervision_edges: Union[torch.Tensor, dict[EdgeType, torch.Tensor]] if not is_heterogeneous: node_ids = input_graph.node_ids[USER_NODE_TYPE] edge_index = input_graph.edge_index[USER_TO_USER_EDGE_TYPE] node_features = input_graph.node_features[USER_NODE_TYPE] edge_features = input_graph.edge_features[USER_TO_USER_EDGE_TYPE] - positive_labels = input_graph.positive_labels[USER_TO_USER_EDGE_TYPE] - negative_labels = input_graph.negative_labels[USER_TO_USER_EDGE_TYPE] + positive_supervision_edges = input_graph.positive_supervision_edges[ + USER_TO_USER_EDGE_TYPE + ] + negative_supervision_edges = input_graph.negative_supervision_edges[ + USER_TO_USER_EDGE_TYPE + ] else: node_ids = input_graph.node_ids edge_index = input_graph.edge_index node_features = input_graph.node_features edge_features = input_graph.edge_features - positive_labels = input_graph.positive_labels - negative_labels = input_graph.negative_labels + positive_supervision_edges = input_graph.positive_supervision_edges + negative_supervision_edges = input_graph.negative_supervision_edges partition_output: PartitionOutput init_worker_group(world_size=MOCKED_NUM_PARTITIONS, rank=rank) @@ -101,20 +105,24 @@ def run_distributed_partitioner( node_partition_book=output_node_partition_book ) - dist_partitioner.register_labels( - label_edge_index=positive_labels, is_positive=True + dist_partitioner.register_supervision_edges( + supervision_edge_index=positive_supervision_edges, is_positive=True ) - del positive_labels - output_positive_labels = dist_partitioner.partition_labels( - node_partition_book=output_node_partition_book, is_positive=True + del positive_supervision_edges + output_positive_supervision_edges = ( + dist_partitioner.partition_supervision_edges( + node_partition_book=output_node_partition_book, is_positive=True + ) ) - dist_partitioner.register_labels( - label_edge_index=negative_labels, is_positive=False + dist_partitioner.register_supervision_edges( + supervision_edge_index=negative_supervision_edges, is_positive=False ) - del negative_labels - output_negative_labels = dist_partitioner.partition_labels( - node_partition_book=output_node_partition_book, is_positive=False + del negative_supervision_edges + output_negative_supervision_edges = ( + dist_partitioner.partition_supervision_edges( + node_partition_book=output_node_partition_book, is_positive=False + ) ) partition_output = PartitionOutput( @@ -123,8 +131,8 @@ def run_distributed_partitioner( partitioned_edge_index=output_edge_index, partitioned_node_features=output_node_features, partitioned_edge_features=output_edge_features, - partitioned_positive_labels=output_positive_labels, - partitioned_negative_labels=output_negative_labels, + partitioned_positive_supervision_edges=output_positive_supervision_edges, + partitioned_negative_supervision_edges=output_negative_supervision_edges, ) elif input_data_strategy == InputDataStrategy.REGISTER_MINIMAL_ENTITIES_SEPARATELY: dist_partitioner = partitioner_class( @@ -151,8 +159,8 @@ def run_distributed_partitioner( partitioned_edge_index=output_graph, partitioned_node_features=None, partitioned_edge_features=None, - partitioned_positive_labels=None, - partitioned_negative_labels=None, + partitioned_positive_supervision_edges=None, + partitioned_negative_supervision_edges=None, ) else: @@ -162,8 +170,8 @@ def run_distributed_partitioner( node_features=node_features, edge_index=edge_index, edge_features=edge_features, - positive_labels=positive_labels, - negative_labels=negative_labels, + positive_supervision_edges=positive_supervision_edges, + negative_supervision_edges=negative_supervision_edges, ) # We call del to mimic the real use case for handling these input tensors del ( @@ -171,8 +179,8 @@ def run_distributed_partitioner( node_features, edge_index, edge_features, - positive_labels, - negative_labels, + positive_supervision_edges, + negative_supervision_edges, ) partition_output = dist_partitioner.partition() diff --git a/python/tests/unit/distributed/distributed_dataset_test.py b/python/tests/unit/distributed/distributed_dataset_test.py index 8824d4fb4..f4d3b358c 100644 --- a/python/tests/unit/distributed/distributed_dataset_test.py +++ b/python/tests/unit/distributed/distributed_dataset_test.py @@ -546,8 +546,8 @@ def test_building_homogeneous_dataset_preserves_node_features_and_labels(self): feats=torch.zeros(10, 2), ids=torch.arange(10) ), partitioned_edge_features=None, - partitioned_positive_labels=None, - partitioned_negative_labels=None, + partitioned_positive_supervision_edges=None, + partitioned_negative_supervision_edges=None, partitioned_node_labels=FeaturePartitionData( feats=torch.arange(10).unsqueeze(1), ids=torch.arange(10) ), @@ -587,8 +587,8 @@ def test_building_heterogeneous_dataset_preserves_node_features_and_labels(self) ), }, partitioned_edge_features=None, - partitioned_positive_labels=None, - partitioned_negative_labels=None, + partitioned_positive_supervision_edges=None, + partitioned_negative_supervision_edges=None, partitioned_node_labels={ _USER: FeaturePartitionData( feats=torch.arange(10).unsqueeze(1), ids=torch.arange(10) diff --git a/python/tests/unit/distributed/distributed_neighborloader_test.py b/python/tests/unit/distributed/distributed_neighborloader_test.py index eed655705..a92c7ce7c 100644 --- a/python/tests/unit/distributed/distributed_neighborloader_test.py +++ b/python/tests/unit/distributed/distributed_neighborloader_test.py @@ -34,8 +34,8 @@ FeaturePartitionData, GraphPartitionData, PartitionOutput, - message_passing_to_negative_label, - message_passing_to_positive_label, + message_passing_to_negative_supervision_edges, + message_passing_to_positive_supervision_edges, to_heterogeneous_node, to_homogeneous, ) @@ -49,8 +49,12 @@ get_process_group_init_method, ) -_POSITIVE_EDGE_TYPE = message_passing_to_positive_label(DEFAULT_HOMOGENEOUS_EDGE_TYPE) -_NEGATIVE_EDGE_TYPE = message_passing_to_negative_label(DEFAULT_HOMOGENEOUS_EDGE_TYPE) +_POSITIVE_EDGE_TYPE = message_passing_to_positive_supervision_edges( + DEFAULT_HOMOGENEOUS_EDGE_TYPE +) +_NEGATIVE_EDGE_TYPE = message_passing_to_negative_supervision_edges( + DEFAULT_HOMOGENEOUS_EDGE_TYPE +) _USER = NodeType("user") _STORY = NodeType("story") @@ -708,8 +712,8 @@ def test_ablp_dataloader( }, partitioned_edge_features=None, partitioned_node_features=None, - partitioned_negative_labels=None, - partitioned_positive_labels=None, + partitioned_negative_supervision_edges=None, + partitioned_positive_supervision_edges=None, ) dataset = DistLinkPredictionDataset(rank=0, world_size=1, edge_dir="out") dataset.build(partition_output=partition_output) @@ -946,8 +950,8 @@ def test_distributed_neighbor_loader_with_node_labels_homogeneous(self): feats=torch.zeros(10, 2), ids=torch.arange(10) ), partitioned_edge_features=None, - partitioned_positive_labels=None, - partitioned_negative_labels=None, + partitioned_positive_supervision_edges=None, + partitioned_negative_supervision_edges=None, partitioned_node_labels=FeaturePartitionData( feats=torch.arange(10).unsqueeze(1), ids=torch.arange(10) ), @@ -991,8 +995,8 @@ def test_distributed_neighbor_loader_with_node_labels_heterogeneous(self): ), }, partitioned_edge_features=None, - partitioned_positive_labels=None, - partitioned_negative_labels=None, + partitioned_positive_supervision_edges=None, + partitioned_negative_supervision_edges=None, partitioned_node_labels={ _USER: FeaturePartitionData( feats=torch.arange(5).unsqueeze(1), ids=torch.arange(5) diff --git a/python/tests/unit/distributed/distributed_partitioner_test.py b/python/tests/unit/distributed/distributed_partitioner_test.py index 18358259a..6bb0aa668 100644 --- a/python/tests/unit/distributed/distributed_partitioner_test.py +++ b/python/tests/unit/distributed/distributed_partitioner_test.py @@ -678,9 +678,13 @@ def test_partitioning_correctness( unified_output_edge_feat: dict[EdgeType, list[torch.Tensor]] = defaultdict(list) - unified_output_pos_label: dict[EdgeType, list[torch.Tensor]] = defaultdict(list) + unified_positive_supervision_edges: dict[ + EdgeType, list[torch.Tensor] + ] = defaultdict(list) - unified_output_neg_label: dict[EdgeType, list[torch.Tensor]] = defaultdict(list) + unified_negative_supervision_edges: dict[ + EdgeType, list[torch.Tensor] + ] = defaultdict(list) is_range_based_partition = partitioner_class is DistRangePartitioner @@ -722,8 +726,12 @@ def test_partitioning_correctness( ): self.assertIsNone(partition_output.partitioned_edge_features) self.assertIsNone(partition_output.partitioned_node_features) - self.assertIsNone(partition_output.partitioned_positive_labels) - self.assertIsNone(partition_output.partitioned_negative_labels) + self.assertIsNone( + partition_output.partitioned_positive_supervision_edges + ) + self.assertIsNone( + partition_output.partitioned_negative_supervision_edges + ) else: assert ( partition_output.partitioned_node_features is not None @@ -733,11 +741,11 @@ def test_partitioning_correctness( ), f"Must partition edge features for strategy {input_data_strategy.value}" assert ( - partition_output.partitioned_positive_labels is not None + partition_output.partitioned_positive_supervision_edges is not None ), f"Must partition positive labels for strategy {input_data_strategy.value}" assert ( - partition_output.partitioned_negative_labels is not None + partition_output.partitioned_negative_supervision_edges is not None ), f"Must partition negative labels for strategy {input_data_strategy.value}" self._assert_node_feature_outputs( @@ -789,22 +797,26 @@ def test_partitioning_correctness( is_heterogeneous=is_heterogeneous, output_node_partition_book=partition_output.node_partition_book, should_assign_edges_by_src_node=True, - output_labeled_edge_index=partition_output.partitioned_positive_labels, + output_labeled_edge_index=partition_output.partitioned_positive_supervision_edges, expected_edge_types=MOCKED_HETEROGENEOUS_EDGE_TYPES, expected_pb_dtype=expected_pb_dtype, ) if isinstance( - partition_output.partitioned_positive_labels, abc.Mapping + partition_output.partitioned_positive_supervision_edges, abc.Mapping ): for ( edge_type, pos_edge_label, - ) in partition_output.partitioned_positive_labels.items(): - unified_output_pos_label[edge_type].append(pos_edge_label) + ) in ( + partition_output.partitioned_positive_supervision_edges.items() + ): + unified_positive_supervision_edges[edge_type].append( + pos_edge_label + ) else: - unified_output_pos_label[USER_TO_USER_EDGE_TYPE].append( - partition_output.partitioned_positive_labels + unified_positive_supervision_edges[USER_TO_USER_EDGE_TYPE].append( + partition_output.partitioned_positive_supervision_edges ) self._assert_label_outputs( @@ -812,22 +824,26 @@ def test_partitioning_correctness( is_heterogeneous=is_heterogeneous, output_node_partition_book=partition_output.node_partition_book, should_assign_edges_by_src_node=True, - output_labeled_edge_index=partition_output.partitioned_negative_labels, + output_labeled_edge_index=partition_output.partitioned_negative_supervision_edges, expected_edge_types=MOCKED_HETEROGENEOUS_EDGE_TYPES, expected_pb_dtype=expected_pb_dtype, ) if isinstance( - partition_output.partitioned_negative_labels, abc.Mapping + partition_output.partitioned_negative_supervision_edges, abc.Mapping ): for ( edge_type, neg_edge_labels, - ) in partition_output.partitioned_negative_labels.items(): - unified_output_neg_label[edge_type].append(neg_edge_labels) + ) in ( + partition_output.partitioned_negative_supervision_edges.items() + ): + unified_negative_supervision_edges[edge_type].append( + neg_edge_labels + ) else: - unified_output_neg_label[USER_TO_USER_EDGE_TYPE].append( - partition_output.partitioned_negative_labels + unified_negative_supervision_edges[USER_TO_USER_EDGE_TYPE].append( + partition_output.partitioned_negative_supervision_edges ) ## Checking that the union of edge indices across all ranks equals to the full set from the input @@ -874,28 +890,40 @@ def test_partitioning_correctness( tensor_a=expected_edge_feat, tensor_b=partitioned_edge_feat, dim=0 ) - for edge_type in unified_output_pos_label: + for edge_type in unified_positive_supervision_edges: # First, we get the expected pos edge label from the mocked input for this edge type - expected_positive_labels = MOCKED_UNIFIED_GRAPH.positive_labels[edge_type] + expected_positive_supervision_edges = ( + MOCKED_UNIFIED_GRAPH.positive_supervision_edges[edge_type] + ) # We combine the output pos labels across all the ranks - output_pos_label = torch.cat(unified_output_pos_label[edge_type], dim=1) + output_positive_supervision_edges = torch.cat( + unified_positive_supervision_edges[edge_type], dim=1 + ) # Finally, we check that the expected tensor and output tensor have the same rows, which is achieved by setting the shuffle dimension to 1 assert_tensor_equality( - tensor_a=expected_positive_labels, tensor_b=output_pos_label, dim=1 + tensor_a=expected_positive_supervision_edges, + tensor_b=output_positive_supervision_edges, + dim=1, ) - for edge_type in unified_output_neg_label: + for edge_type in unified_negative_supervision_edges: # First, we get the expected neg edge label from the mocked input for this edge type - expected_negative_labels = MOCKED_UNIFIED_GRAPH.negative_labels[edge_type] + expected_negative_supervision_edges = ( + MOCKED_UNIFIED_GRAPH.negative_supervision_edges[edge_type] + ) # We combine the output neg labels across all the ranks - output_neg_label = torch.cat(unified_output_neg_label[edge_type], dim=1) + output_negative_supervision_edges = torch.cat( + unified_negative_supervision_edges[edge_type], dim=1 + ) # Finally, we check that the expected tensor and output tensor have the same rows, which is achieved by setting the shuffle dimension to 1 assert_tensor_equality( - tensor_a=expected_negative_labels, tensor_b=output_neg_label, dim=1 + tensor_a=expected_negative_supervision_edges, + tensor_b=output_negative_supervision_edges, + dim=1, ) def test_partitioning_failure(self) -> None: @@ -952,16 +980,20 @@ def test_partitioning_failure(self) -> None: empty_edge_index = torch.empty((2, 0)) empty_node_features = torch.empty((0, 5)) empty_edge_features = torch.empty((0, 10)) - empty_pos_label = torch.empty((2, 0)) - empty_neg_label = torch.empty((2, 0)) + empty_positive_supervision_edges = torch.empty((2, 0)) + empty_negative_supervision_edges = torch.empty((2, 0)) # Test partitioning with empty node_ids, empty node_feats, empty edge_feats, and empty edge index partitioner.register_node_ids(node_ids=empty_node_ids) partitioner.register_edge_index(edge_index=empty_edge_index) partitioner.register_node_features(node_features=empty_node_features) partitioner.register_edge_features(edge_features=empty_edge_features) - partitioner.register_labels(label_edge_index=empty_pos_label, is_positive=True) - partitioner.register_labels(label_edge_index=empty_neg_label, is_positive=False) + partitioner.register_supervision_edges( + supervision_edge_index=empty_positive_supervision_edges, is_positive=True + ) + partitioner.register_supervision_edges( + supervision_edge_index=empty_negative_supervision_edges, is_positive=False + ) partitioned_output = partitioner.partition() assert not isinstance( diff --git a/python/tests/unit/distributed/utils/neighborloader_test.py b/python/tests/unit/distributed/utils/neighborloader_test.py index 9ba1cca10..1b0c701e4 100644 --- a/python/tests/unit/distributed/utils/neighborloader_test.py +++ b/python/tests/unit/distributed/utils/neighborloader_test.py @@ -13,13 +13,13 @@ shard_nodes_by_process, strip_label_edges, ) -from gigl.types.graph import FeatureInfo, message_passing_to_positive_label +from gigl.types.graph import FeatureInfo, message_passing_to_positive_supervision_edges from tests.test_assets.distributed.utils import assert_tensor_equality _U2U_EDGE_TYPE = ("user", "to", "user") _U2I_EDGE_TYPE = ("user", "to", "item") _I2U_EDGE_TYPE = ("item", "to", "user") -_LABELED_EDGE_TYPE = message_passing_to_positive_label(_U2I_EDGE_TYPE) +_LABELED_EDGE_TYPE = message_passing_to_positive_supervision_edges(_U2I_EDGE_TYPE) class LoaderUtilsTest(unittest.TestCase): diff --git a/python/tests/unit/types_tests/graph_test.py b/python/tests/unit/types_tests/graph_test.py index 6618908a8..05bdbd73d 100644 --- a/python/tests/unit/types_tests/graph_test.py +++ b/python/tests/unit/types_tests/graph_test.py @@ -10,8 +10,8 @@ DEFAULT_HOMOGENEOUS_NODE_TYPE, LoadedGraphTensors, is_label_edge_type, - message_passing_to_negative_label, - message_passing_to_positive_label, + message_passing_to_negative_supervision_edges, + message_passing_to_positive_supervision_edges, select_label_edge_types, to_heterogeneous_edge, to_heterogeneous_node, @@ -90,14 +90,14 @@ def test_from_heterogeneous_invalid(self, _, input_value): node_features=torch.tensor([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]]), edge_index=torch.tensor([[0, 1], [1, 2]]), edge_features=torch.tensor([[0.1, 0.2], [0.3, 0.4]]), - positive_label=torch.tensor([[0], [2]]), - negative_label=torch.tensor([[1], [0]]), + positive_supervision_edges=torch.tensor([[0], [2]]), + negative_supervision_edges=torch.tensor([[1], [0]]), expected_edge_index={ DEFAULT_HOMOGENEOUS_EDGE_TYPE: torch.tensor([[0, 1], [1, 2]]), - message_passing_to_positive_label( + message_passing_to_positive_supervision_edges( DEFAULT_HOMOGENEOUS_EDGE_TYPE ): torch.tensor([[2], [0]]), - message_passing_to_negative_label( + message_passing_to_negative_supervision_edges( DEFAULT_HOMOGENEOUS_EDGE_TYPE ): torch.tensor([[0], [1]]), }, @@ -109,14 +109,14 @@ def test_from_heterogeneous_invalid(self, _, input_value): node_features=torch.tensor([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]]), edge_index=torch.tensor([[0, 1], [1, 2]]), edge_features=torch.tensor([[0.1, 0.2], [0.3, 0.4]]), - positive_label=torch.tensor([[0], [2]]), - negative_label=torch.tensor([[1], [0]]), + positive_supervision_edges=torch.tensor([[0], [2]]), + negative_supervision_edges=torch.tensor([[1], [0]]), expected_edge_index={ DEFAULT_HOMOGENEOUS_EDGE_TYPE: torch.tensor([[0, 1], [1, 2]]), - message_passing_to_positive_label( + message_passing_to_positive_supervision_edges( DEFAULT_HOMOGENEOUS_EDGE_TYPE ): torch.tensor([[0], [2]]), - message_passing_to_negative_label( + message_passing_to_negative_supervision_edges( DEFAULT_HOMOGENEOUS_EDGE_TYPE ): torch.tensor([[1], [0]]), }, @@ -148,12 +148,12 @@ def test_from_heterogeneous_invalid(self, _, input_value): NodeType("bar"), Relation("to"), NodeType("foo") ): torch.tensor([[0.5, 0.6], [0.7, 0.8]]), }, - positive_label={ + positive_supervision_edges={ EdgeType( NodeType("foo"), Relation("labels"), NodeType("bar") ): torch.tensor([[0], [2]]) }, - negative_label={ + negative_supervision_edges={ EdgeType( NodeType("bar"), Relation("labels"), NodeType("foo") ): torch.tensor([[1], [0]]) @@ -165,10 +165,10 @@ def test_from_heterogeneous_invalid(self, _, input_value): EdgeType( NodeType("bar"), Relation("to"), NodeType("foo") ): torch.tensor([[2, 3], [0, 1]]), - message_passing_to_positive_label( + message_passing_to_positive_supervision_edges( EdgeType(NodeType("foo"), Relation("labels"), NodeType("bar")) ): torch.tensor([[0], [2]]), - message_passing_to_negative_label( + message_passing_to_negative_supervision_edges( EdgeType(NodeType("bar"), Relation("labels"), NodeType("foo")) ): torch.tensor([[1], [0]]), }, @@ -200,12 +200,12 @@ def test_from_heterogeneous_invalid(self, _, input_value): NodeType("bar"), Relation("to"), NodeType("foo") ): torch.tensor([[0.5, 0.6], [0.7, 0.8]]), }, - positive_label={ + positive_supervision_edges={ EdgeType( NodeType("foo"), Relation("labels"), NodeType("bar") ): torch.tensor([[0], [2]]) }, - negative_label={ + negative_supervision_edges={ EdgeType( NodeType("bar"), Relation("labels"), NodeType("foo") ): torch.tensor([[1], [0]]) @@ -217,10 +217,10 @@ def test_from_heterogeneous_invalid(self, _, input_value): EdgeType( NodeType("bar"), Relation("to"), NodeType("foo") ): torch.tensor([[2, 3], [0, 1]]), - message_passing_to_positive_label( + message_passing_to_positive_supervision_edges( EdgeType(NodeType("bar"), Relation("labels"), NodeType("foo")) ): torch.tensor([[2], [0]]), - message_passing_to_negative_label( + message_passing_to_negative_supervision_edges( EdgeType(NodeType("foo"), Relation("labels"), NodeType("bar")) ): torch.tensor([[0], [1]]), }, @@ -252,12 +252,12 @@ def test_from_heterogeneous_invalid(self, _, input_value): NodeType("bar"), Relation("to"), NodeType("foo") ): torch.tensor([[0.5, 0.6], [0.7, 0.8]]), }, - positive_label={ + positive_supervision_edges={ EdgeType( NodeType("foo"), Relation("labels"), NodeType("bar") ): torch.tensor([[0], [2]]) }, - negative_label=None, + negative_supervision_edges=None, expected_edge_index={ EdgeType( NodeType("foo"), Relation("to"), NodeType("bar") @@ -265,7 +265,7 @@ def test_from_heterogeneous_invalid(self, _, input_value): EdgeType( NodeType("bar"), Relation("to"), NodeType("foo") ): torch.tensor([[2, 3], [0, 1]]), - message_passing_to_positive_label( + message_passing_to_positive_supervision_edges( EdgeType(NodeType("foo"), Relation("labels"), NodeType("bar")) ): torch.tensor([[0], [2]]), }, @@ -280,8 +280,8 @@ def test_treat_labels_as_edges_success( node_features: Union[torch.Tensor, dict[NodeType, torch.Tensor]], edge_index: Union[torch.Tensor, dict[EdgeType, torch.Tensor]], edge_features: Union[torch.Tensor, dict[EdgeType, torch.Tensor]], - positive_label: Union[torch.Tensor, dict[EdgeType, torch.Tensor]], - negative_label: Union[torch.Tensor, dict[EdgeType, torch.Tensor]], + positive_supervision_edges: Union[torch.Tensor, dict[EdgeType, torch.Tensor]], + negative_supervision_edges: Union[torch.Tensor, dict[EdgeType, torch.Tensor]], expected_edge_index: dict[EdgeType, torch.Tensor], edge_dir: Literal["in", "out"], ): @@ -290,12 +290,12 @@ def test_treat_labels_as_edges_success( node_features=node_features, edge_index=edge_index, edge_features=edge_features, - positive_label=positive_label, - negative_label=negative_label, + positive_supervision_edges=positive_supervision_edges, + negative_supervision_edges=negative_supervision_edges, ) graph_tensors.treat_labels_as_edges(edge_dir=edge_dir) - self.assertIsNone(graph_tensors.positive_label) - self.assertIsNone(graph_tensors.negative_label) + self.assertIsNone(graph_tensors.positive_supervision_edges) + self.assertIsNone(graph_tensors.negative_supervision_edges) assert isinstance(graph_tensors.edge_index, dict) self.assertEqual(graph_tensors.edge_index.keys(), expected_edge_index.keys()) for edge_type, expected_tensor in expected_edge_index.items(): @@ -307,8 +307,8 @@ def test_select_label_edge_types(self): message_passing_edge_type = DEFAULT_HOMOGENEOUS_EDGE_TYPE edge_types = [ message_passing_edge_type, - message_passing_to_positive_label(message_passing_edge_type), - message_passing_to_negative_label(message_passing_edge_type), + message_passing_to_positive_supervision_edges(message_passing_edge_type), + message_passing_to_negative_supervision_edges(message_passing_edge_type), EdgeType(NodeType("foo"), Relation("bar"), NodeType("baz")), EdgeType( DEFAULT_HOMOGENEOUS_NODE_TYPE, @@ -319,8 +319,12 @@ def test_select_label_edge_types(self): self.assertEqual( ( - message_passing_to_positive_label(message_passing_edge_type), - message_passing_to_negative_label(message_passing_edge_type), + message_passing_to_positive_supervision_edges( + message_passing_edge_type + ), + message_passing_to_negative_supervision_edges( + message_passing_edge_type + ), ), select_label_edge_types(message_passing_edge_type, edge_types), ) @@ -329,16 +333,20 @@ def test_select_label_edge_types_pyg(self): message_passing_edge_type = ("node", "to", "node") edge_types = [ message_passing_edge_type, - message_passing_to_positive_label(message_passing_edge_type), - message_passing_to_negative_label(message_passing_edge_type), + message_passing_to_positive_supervision_edges(message_passing_edge_type), + message_passing_to_negative_supervision_edges(message_passing_edge_type), ("other", "to", "node"), ("other", "to", "other"), ] self.assertEqual( ( - message_passing_to_positive_label(message_passing_edge_type), - message_passing_to_negative_label(message_passing_edge_type), + message_passing_to_positive_supervision_edges( + message_passing_edge_type + ), + message_passing_to_negative_supervision_edges( + message_passing_edge_type + ), ), select_label_edge_types(message_passing_edge_type, edge_types), )