diff --git a/main.py b/main.py index 5815cd1..08fadd4 100644 --- a/main.py +++ b/main.py @@ -1,59 +1,82 @@ import labelbox from labelbox import Client, Dataset from labelbox.schema.data_row_metadata import DataRowMetadata, DataRowMetadataField, DataRowMetadataKind, DeleteDataRowMetadata +from processing.metadata import * import os -# Add your API key below -labelbox_api_key = os.environ.get("labelbox_api_key", '') -try: - client = Client(api_key=labelbox_api_key) -except: - print(f'Could not connect to Labelbox. Please provide the Cloud Function with a Labelbox API KEY as an Environment Variable with the name "labelbox_api_key"') - - def stream_data_rows(event, context): - """ Uploads an asset to Catalog when a new asset is uploaded to GCP bucket. Creates a dataset if a dataset with this bucket name doesn't exist yet + """ + Creates a data row when a new asset is uploaded to a GCS bucket. + - Row Data URL is pulled from the event payload + - Global Key is determined by the GCS Object Name; it will use the unique GCS Object ID if the first choice is taken + - Dataset is determined by the GCS Bucket Name; if there isn't one with that name, this script creates one + Environment Variables: + labelbox_api_key : Required (str) : Labelbox API Key + labelbox_integration_name : Optional (str) : Labelbox Delegated Access Integration Name Args: - event (dict) : Event payload. - context (google.cloud.functions.Context) : Metadata for the event. - """ + event : Required (dict) : Event payload. + context : Required (google.cloud.functions.Context) : Metadata for the event. + """ + # Get environment variables + labelbox_api_key = os.environ.get("labelbox_api_key", '') + # Connect to Labelbox + try: + client = Client(api_key=labelbox_api_key) + except: + print(f'Could not connect to Labelbox. Please provide this function runtime with a Labelbox API KEY as an Environment Variable with the name "labelbox_api_key"') + # Get event details gcs_bucket_name = event['bucket'] gcs_object_id = event['id'] gcs_object_name = event['name'] - + url = f"gs://{gcs_bucket_name}/{gcs_object_name}" + print(f'Row Data URL {url}') + # Get or create a Labelbox dataset with the same name as the GCS bucket datasets = client.get_datasets(where=Dataset.name == gcs_bucket_name) lb_dataset = next(datasets, None) if not lb_dataset: + # If creating a new dataset, we need to set the Delegated Access Integration lb_integration = os.environ.get("labelbox_integration_name", 'DEFAULT') for integration in client.get_organization().get_iam_integrations(): if integration.name == lb_integration: lb_integration = integration lb_dataset = client.create_dataset(name=gcs_bucket_name, iam_integration=lb_integration) - print(f'Created Labelbox Dataset {lb_dataset.name} with ID {lb_dataset.uid}') - - url = f"gs://{gcs_bucket_name}/{gcs_object_name}" - print(f'Row Data URL {url}') - + da_name = lb_integration if type(lb_integration) == str else lb_integration.name + print(f'Created Labelbox Dataset {lb_dataset.name} with ID {lb_dataset.uid} and Delegated Access Integration Name {da_name}') + # Create a Labelbox Data Row using the GCS Object Name as the Labelbox Global Key value. + # If that Global Key value is unavailable, use the GCS-unique Object ID value instead. try: lb_data_row = lb_dataset.create_data_row(row_data=url, global_key=gcs_object_name) print(f'Created Data Row with ID {lb_data_row.uid} and Global Key {gcs_object_name}') except: print(f'Data Row with Global Key "{gcs_object_name}" already exists. Creating a Data Row with GCS Object ID "{gcs_object_id}"') lb_data_row = lb_dataset.create_data_row(row_data=url, global_key=gcs_object_id) - print(f'Created Data Row with ID {lb_data_row.uid} and Global Key {gcs_object_id}') - + print(f'Created Data Row with ID {lb_data_row.uid} and Global Key {gcs_object_id}') return True def update_metadata(event, context): - """ Updates a data row's metadata with the current metadata in GCS. Meant to be triggered whenever there's a change to metadata in GCS. + """ + Updates labelbox metadata when object metadata is updated in GCS. If an object has no metadata, this script deletes all Labelbox metadata. + - Uses Global Keys to find Data Rows. First will use GCS Object ID, defers to GCS Object Name + - This script can be customized to not delete Labelbox metadata for specific metadata schema IDs. See environment variable 'keep_metadata_name' + Environment Variables: + labelbox_api_key : Required (str) : Labelbox API Key + keep_metadata_name : Optional (list) : List of metadata field names to **not** delete metadata from Args: - event (dict) : Event payload. - context (google.cloud.functions.Context) : Metadata for the event. + event : Required (dict) : Event payload. + context : Required (google.cloud.functions.Context) : Metadata for the event. """ + # Get environment variables + labelbox_api_key = os.environ.get("labelbox_api_key", '') + keep_metadata_name = os.environ.get("keep_metadata_name", []) + # Connect to Labelbox + try: + client = Client(api_key=labelbox_api_key) + except: + print(f'Could not connect to Labelbox. Please provide this function runtime with a Labelbox API KEY as an Environment Variable with the name "labelbox_api_key"') + # Get event details gcs_object_id = event['id'] gcs_object_name = event['name'] - - # Grab Labelbox Data Row ID given the global_key + # Grab Labelbox Data Row ID given the Global Key try: lb_data_row_id = client.get_data_row_ids_for_global_keys([gcs_object_id])['results'][0] lb_global_key = gcs_object_id @@ -64,91 +87,66 @@ def update_metadata(event, context): except: print(f'No data row with Global Key {gcs_object_name} or {gcs_object_id} exist') quit() - - # If no 'metadata', then that means metadata was deleted + # If there's no event['metadata'] then that means metadata must have been deleted try: gcs_metadata = event['metadata'] + gcs_metadata_names = list(gcs_metadata.keys()) + lb_metadata_list = [] except: - gcs_metadata = [] - - mdo = client.get_data_row_metadata_ontology() - - # If there's gcs_metadata, create a `lb_metadata_list` list of Labelbox Metadata Fields to upload + gcs_metadata = False + gcs_metadata_names = [] + # Sync Metadata fields between Labelbox and GCS + lb_mdo = __sync_metadata_fields_as_strings(client, gcs_metadata_names) + metadata_schema_to_name_key = __get_metadata_schema_to_name_key(lb_mdo) + metadata_name_key_to_schema = {v: k for k, v in metadata_schema_to_name_key.items()} + # If there's metadata in GCS, update it in Labelbox if gcs_metadata: - lb_metadata_list = [] - lb_metadata_names = [field['name'] for field in mdo._get_ontology()] - # Ensure all your metadata fields in this object are in Labelbox - if not, we'll create "string" metadata given the field name - for gcs_metadata_field in gcs_metadata.keys(): - if gcs_metadata_field not in lb_metadata_names: - mdo.create_schema(name=gcs_metadata_field, kind=DataRowMetadataKind.string) - mdo = client.get_data_row_metadata_ontology() - lb_metadata_names = [field['name'] for field in mdo._get_ontology()] - print(f'GCS Metadata: {gcs_metadata}') - # Create a `mdo_index` dictionary where {key=metadata_field_name : value=metadata_schema_id} - metadata_dict = mdo.reserved_by_name - metadata_dict.update(mdo.custom_by_name) - mdo_index = {} - for mdo_name in metadata_dict: - if type(metadata_dict[mdo_name]) == dict: - mdo_index.update({str(mdo_name) : {"schema_id" : list(metadata_dict[mdo_name].values())[0].parent}}) - for enum_option in metadata_dict[mdo_name]: - mdo_index.update({str(enum_option) : {"schema_id" : metadata_dict[mdo_name][enum_option].uid, "parent_schema_id" : metadata_dict[mdo_name][enum_option].parent}}) - else: - mdo_index.update({str(mdo_name):{"schema_id" : metadata_dict[mdo_name].uid}}) - # Create a `lb_metadata_list` list of Labelbox Metadata Fields to upload - for gcs_metadata_field in gcs_metadata.keys(): - lb_metadata_list.append( - DataRowMetadataField( - schema_id=mdo_index[gcs_metadata_field]['schema_id'], - value=gcs_metadata[gcs_metadata_field] + lb_metadata_list = [DataRowMetadataField(schema_id=metadata_name_key_to_schema[gcs_metadata_name],value=gcs_metadata[gcs_metadata_name]) for gcs_metadata_name in gcs_metadata] + lb_mdo.bulk_upsert( + [ + DataRowMetadata( + data_row_id=lb_data_row_id, + fields=lb_metadata_list ) - ) - # Update Labelbox Metadata - mdo = client.get_data_row_metadata_ontology() - mdo.bulk_upsert([ - DataRowMetadata( - data_row_id = lb_data_row_id, - fields = lb_metadata_list - ) - ]) + ] + ) print(f'Updated Data Row with ID {lb_data_row_id} and Global Key {lb_global_key}') - else: - # If there's no metadata it means metadata was deleted - # Can be customized to not delete specific metadata schema IDs - delete_metadata_schema_ids = [field.schema_id for field in mdo.bulk_export([lb_data_row_id])[0].fields] - mdo.bulk_delete([ - DeleteDataRowMetadata( - data_row_id = lb_data_row_id, - fields = delete_metadata_schema_ids - ) - ]) - print(f'Deleted all Metadata for Data Row with ID {lb_data_row_id} and Global Key {lb_global_key}') - + # If there isn't metadata in GCS, then we can assume metadata was deleted in GCS, delete it in Labelbox + else: + delete_metadata_names = [field.name for field in lb_mdo.bulk_export([lb_data_row_id])[0].fields] + for keep_metadata_name in keep_metadata_names: + delete_metadata_names.remove(keep_metadata_name) + __delete_data_row_metadata(lb_mdo, [lb_data_row_id], delete_metadata_names, metadata_name_key_to_schema) + print(f'Deleted Metadata for Data Row with ID {lb_data_row_id} and Global Key {lb_global_key}') return True def delete_data_rows(event, context): - """ Deletes data rows from Labelbox. First checks to see if the global key for gcs_object_id exists before using the gcs_object_name + """ Delete a data row from Labelbox + - Uses Global Keys to find Data Rows. First will use GCS Object ID, defers to GCS Object Name Args: event (dict) : Event payload. context (google.cloud.functions.Context) : Metadata for the event. """ + # Get environment variables + labelbox_api_key = os.environ.get("labelbox_api_key", '') + # Connect to Labelbox + try: + client = Client(api_key=labelbox_api_key) + except: + print(f'Could not connect to Labelbox. Please provide this function runtime with a Labelbox API KEY as an Environment Variable with the name "labelbox_api_key"') + # Get event details gcs_object_id = event['id'] - gcs_object_name = event['name'] - - # Get your Labelbox Data Row ID + gcs_object_name = event['name'] + # Grab Labelbox Data Row ID given the Global Key try: - # gcs_object_id is used as the global key when a data row with the object name as the global key already exists print(f'Checking if data row with global_key {gcs_object_id} exists') lb_data_row_id = client.get_data_row_ids_for_global_keys([gcs_object_id])['results'][0] lb_global_key = gcs_object_id except: - # If the gcs_object_id was not used as the global key, the gcs_object_name was lb_data_row_id = client.get_data_row_ids_for_global_keys([gcs_object_name])['results'][0] lb_global_key = gcs_object_name - - # Delete the Labelbox Data Row + # Delete Data Row from Labelbox print(f'Deleting Data Row with ID {lb_data_row_id} and Global Key {lb_global_key}') lb_data_row = client.get_data_row(lb_data_row_id) lb_data_row.delete() - return True diff --git a/processing/metadata.py b/processing/metadata.py new file mode 100644 index 0000000..598edc8 --- /dev/null +++ b/processing/metadata.py @@ -0,0 +1,67 @@ +from labelbox.schema.data_row_metadata import DataRowMetadata, DataRowMetadataKind, DataRowMetadataOntology, DeleteDataRowMetadata + +def __sync_metadata_fields_as_strings(client, metadata_names=[]): + """ Ensures Labelbox's Metadata Ontology has all necessary metadata fields given a list of names - this script will always sycn metadata fields as strings + Args: + client : Required (labelbox.Client) : Labelbox client object + metadata_names : Required (list) : List of metadata field names to sync with Labelbox + Returns: + Updated Labelbox metadata ontology object + """ + lb_mdo = client.get_data_row_metadata_ontology() + lb_metadata_names = [field['name'] for field in lb_mdo._get_ontology()] + metadata_names += ['lb_integration_source'] + # Iterate over your metadata_index, if a metadata_index key is not an existing metadata_field, then create it in Labelbox + for metadata_name in metadata_names: + if metadata_name not in lb_metadata_names: + lb_mdo.create_schema(name=metadata_name, kind=DataRowMetadataKind.string) + lb_mdo = client.get_data_row_metadata_ontology() + lb_metadata_names = [field['name'] for field in lb_mdo._get_ontology()] + return lb_mdo + +def __get_metadata_schema_to_name_key( + lb_mdo:DataRowMetadataOntology + ): + """ Creates a dictionary where {key=metadata_schema_id: value=metadata_name_key} + - name_key is name for all metadata fields, and for enum options, it is "parent_name>>child_name" + Args: + lb_mdo : Required (labelbox.schema.data_row_metadata.DataRowMetadataOntology) - Labelbox metadata ontology + Returns: + Dictionary where {key=metadata_schema_id: value=metadata_name_key} + """ + lb_metadata_dict = lb_mdo.reserved_by_name + lb_metadata_dict.update(lb_mdo.custom_by_name) + metadata_schema_to_name_key = {} + for metadata_field_name in lb_metadata_dict: + if type(lb_metadata_dict[metadata_field_name]) == dict: + metadata_schema_to_name_key[lb_metadata_dict[metadata_field_name][next(iter(lb_metadata_dict[metadata_field_name]))].parent] = str(metadata_field_name) + for enum_option in lb_metadata_dict[metadata_field_name]: + metadata_schema_to_name_key[lb_metadata_dict[metadata_field_name][enum_option].uid] = f"{str(metadata_field_name)}///{str(enum_option)}" + else: + metadata_schema_to_name_key[lb_metadata_dict[metadata_field_name].uid] = str(metadata_field_name) + return metadata_schema_to_name_key + +def __delete_data_row_metadata( + lb_mdo:DataRowMetadataOntology, + data_row_ids:list, metadata_field_names:list, metadata_name_key_to_schema:dict + ): + """ Deletes metadata values from a given set of data rows given a list of metadata field names + Args: + lb_mdo : Required (labelbox.schema.data_row_metadata.DataRowMetadataOntology) - Labelbox metadata ontology + data_row_ids : Required (dict) : List of data row IDs to delete metadata from + metadata_field_names : Required (list) : List of metadata schemas to delete for each data row + metadata_name_key_to_schema : Required (dict) : Dictionary where {key=metadata_schema_id: value=metadata_name_key} + Returns: + True + """ + if 'lb_integration_source' in metadata_field_names: + metadata_field_names.remove('lb_integration_source') + schemas_to_delete = [metadata_name_key_to_schema[name] for name in metadata_field_names] + for data_row_id in data_row_ids: + lb_mdo.bulk_delete([ + DeleteDataRowMetadata( + data_row_id = data_row_id, + fields = schemas_to_delete + ) + ]) + return True \ No newline at end of file