Skip to content
This repository was archived by the owner on Oct 30, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 86 additions & 88 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,82 @@
import labelbox
from labelbox import Client, Dataset
from labelbox.schema.data_row_metadata import DataRowMetadata, DataRowMetadataField, DataRowMetadataKind, DeleteDataRowMetadata
from processing.metadata import *
import os

# Add your API key below
labelbox_api_key = os.environ.get("labelbox_api_key", '')
try:
client = Client(api_key=labelbox_api_key)
except:
print(f'Could not connect to Labelbox. Please provide the Cloud Function with a Labelbox API KEY as an Environment Variable with the name "labelbox_api_key"')


def stream_data_rows(event, context):
""" Uploads an asset to Catalog when a new asset is uploaded to GCP bucket. Creates a dataset if a dataset with this bucket name doesn't exist yet
"""
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple new lines here may throw off the autocomplete functions of some editors.

Creates a data row when a new asset is uploaded to a GCS bucket.
- Row Data URL is pulled from the event payload
- Global Key is determined by the GCS Object Name; it will use the unique GCS Object ID if the first choice is taken
- Dataset is determined by the GCS Bucket Name; if there isn't one with that name, this script creates one
Environment Variables:
labelbox_api_key : Required (str) : Labelbox API Key
labelbox_integration_name : Optional (str) : Labelbox Delegated Access Integration Name
Args:
event (dict) : Event payload.
context (google.cloud.functions.Context) : Metadata for the event.
"""
event : Required (dict) : Event payload.
context : Required (google.cloud.functions.Context) : Metadata for the event.
"""
# Get environment variables
labelbox_api_key = os.environ.get("labelbox_api_key", '')
# Connect to Labelbox
try:
client = Client(api_key=labelbox_api_key)
except:
print(f'Could not connect to Labelbox. Please provide this function runtime with a Labelbox API KEY as an Environment Variable with the name "labelbox_api_key"')
# Get event details
gcs_bucket_name = event['bucket']
gcs_object_id = event['id']
gcs_object_name = event['name']

url = f"gs://{gcs_bucket_name}/{gcs_object_name}"
print(f'Row Data URL {url}')
# Get or create a Labelbox dataset with the same name as the GCS bucket
datasets = client.get_datasets(where=Dataset.name == gcs_bucket_name)
lb_dataset = next(datasets, None)
if not lb_dataset:
# If creating a new dataset, we need to set the Delegated Access Integration
lb_integration = os.environ.get("labelbox_integration_name", 'DEFAULT')
for integration in client.get_organization().get_iam_integrations():
if integration.name == lb_integration:
lb_integration = integration
lb_dataset = client.create_dataset(name=gcs_bucket_name, iam_integration=lb_integration)
print(f'Created Labelbox Dataset {lb_dataset.name} with ID {lb_dataset.uid}')

url = f"gs://{gcs_bucket_name}/{gcs_object_name}"
print(f'Row Data URL {url}')

da_name = lb_integration if type(lb_integration) == str else lb_integration.name
print(f'Created Labelbox Dataset {lb_dataset.name} with ID {lb_dataset.uid} and Delegated Access Integration Name {da_name}')
# Create a Labelbox Data Row using the GCS Object Name as the Labelbox Global Key value.
# If that Global Key value is unavailable, use the GCS-unique Object ID value instead.
try:
lb_data_row = lb_dataset.create_data_row(row_data=url, global_key=gcs_object_name)
print(f'Created Data Row with ID {lb_data_row.uid} and Global Key {gcs_object_name}')
except:
print(f'Data Row with Global Key "{gcs_object_name}" already exists. Creating a Data Row with GCS Object ID "{gcs_object_id}"')
lb_data_row = lb_dataset.create_data_row(row_data=url, global_key=gcs_object_id)
print(f'Created Data Row with ID {lb_data_row.uid} and Global Key {gcs_object_id}')

print(f'Created Data Row with ID {lb_data_row.uid} and Global Key {gcs_object_id}')
return True

def update_metadata(event, context):
""" Updates a data row's metadata with the current metadata in GCS. Meant to be triggered whenever there's a change to metadata in GCS.
"""
Updates labelbox metadata when object metadata is updated in GCS. If an object has no metadata, this script deletes all Labelbox metadata.
- Uses Global Keys to find Data Rows. First will use GCS Object ID, defers to GCS Object Name
- This script can be customized to not delete Labelbox metadata for specific metadata schema IDs. See environment variable 'keep_metadata_name'
Environment Variables:
labelbox_api_key : Required (str) : Labelbox API Key
keep_metadata_name : Optional (list) : List of metadata field names to **not** delete metadata from
Args:
event (dict) : Event payload.
context (google.cloud.functions.Context) : Metadata for the event.
event : Required (dict) : Event payload.
context : Required (google.cloud.functions.Context) : Metadata for the event.
"""
# Get environment variables
labelbox_api_key = os.environ.get("labelbox_api_key", '')
keep_metadata_name = os.environ.get("keep_metadata_name", [])
# Connect to Labelbox
try:
client = Client(api_key=labelbox_api_key)
except:
print(f'Could not connect to Labelbox. Please provide this function runtime with a Labelbox API KEY as an Environment Variable with the name "labelbox_api_key"')
# Get event details
gcs_object_id = event['id']
gcs_object_name = event['name']

# Grab Labelbox Data Row ID given the global_key
# Grab Labelbox Data Row ID given the Global Key
try:
lb_data_row_id = client.get_data_row_ids_for_global_keys([gcs_object_id])['results'][0]
lb_global_key = gcs_object_id
Expand All @@ -64,91 +87,66 @@ def update_metadata(event, context):
except:
print(f'No data row with Global Key {gcs_object_name} or {gcs_object_id} exist')
quit()

# If no 'metadata', then that means metadata was deleted
# If there's no event['metadata'] then that means metadata must have been deleted
try:
gcs_metadata = event['metadata']
gcs_metadata_names = list(gcs_metadata.keys())
lb_metadata_list = []
except:
gcs_metadata = []

mdo = client.get_data_row_metadata_ontology()

# If there's gcs_metadata, create a `lb_metadata_list` list of Labelbox Metadata Fields to upload
gcs_metadata = False
gcs_metadata_names = []
# Sync Metadata fields between Labelbox and GCS
lb_mdo = __sync_metadata_fields_as_strings(client, gcs_metadata_names)
metadata_schema_to_name_key = __get_metadata_schema_to_name_key(lb_mdo)
metadata_name_key_to_schema = {v: k for k, v in metadata_schema_to_name_key.items()}
# If there's metadata in GCS, update it in Labelbox
if gcs_metadata:
lb_metadata_list = []
lb_metadata_names = [field['name'] for field in mdo._get_ontology()]
# Ensure all your metadata fields in this object are in Labelbox - if not, we'll create "string" metadata given the field name
for gcs_metadata_field in gcs_metadata.keys():
if gcs_metadata_field not in lb_metadata_names:
mdo.create_schema(name=gcs_metadata_field, kind=DataRowMetadataKind.string)
mdo = client.get_data_row_metadata_ontology()
lb_metadata_names = [field['name'] for field in mdo._get_ontology()]
print(f'GCS Metadata: {gcs_metadata}')
# Create a `mdo_index` dictionary where {key=metadata_field_name : value=metadata_schema_id}
metadata_dict = mdo.reserved_by_name
metadata_dict.update(mdo.custom_by_name)
mdo_index = {}
for mdo_name in metadata_dict:
if type(metadata_dict[mdo_name]) == dict:
mdo_index.update({str(mdo_name) : {"schema_id" : list(metadata_dict[mdo_name].values())[0].parent}})
for enum_option in metadata_dict[mdo_name]:
mdo_index.update({str(enum_option) : {"schema_id" : metadata_dict[mdo_name][enum_option].uid, "parent_schema_id" : metadata_dict[mdo_name][enum_option].parent}})
else:
mdo_index.update({str(mdo_name):{"schema_id" : metadata_dict[mdo_name].uid}})
# Create a `lb_metadata_list` list of Labelbox Metadata Fields to upload
for gcs_metadata_field in gcs_metadata.keys():
lb_metadata_list.append(
DataRowMetadataField(
schema_id=mdo_index[gcs_metadata_field]['schema_id'],
value=gcs_metadata[gcs_metadata_field]
lb_metadata_list = [DataRowMetadataField(schema_id=metadata_name_key_to_schema[gcs_metadata_name],value=gcs_metadata[gcs_metadata_name]) for gcs_metadata_name in gcs_metadata]
lb_mdo.bulk_upsert(
[
DataRowMetadata(
data_row_id=lb_data_row_id,
fields=lb_metadata_list
)
)
# Update Labelbox Metadata
mdo = client.get_data_row_metadata_ontology()
mdo.bulk_upsert([
DataRowMetadata(
data_row_id = lb_data_row_id,
fields = lb_metadata_list
)
])
]
)
print(f'Updated Data Row with ID {lb_data_row_id} and Global Key {lb_global_key}')
else:
# If there's no metadata it means metadata was deleted
# Can be customized to not delete specific metadata schema IDs
delete_metadata_schema_ids = [field.schema_id for field in mdo.bulk_export([lb_data_row_id])[0].fields]
mdo.bulk_delete([
DeleteDataRowMetadata(
data_row_id = lb_data_row_id,
fields = delete_metadata_schema_ids
)
])
print(f'Deleted all Metadata for Data Row with ID {lb_data_row_id} and Global Key {lb_global_key}')

# If there isn't metadata in GCS, then we can assume metadata was deleted in GCS, delete it in Labelbox
else:
delete_metadata_names = [field.name for field in lb_mdo.bulk_export([lb_data_row_id])[0].fields]
for keep_metadata_name in keep_metadata_names:
delete_metadata_names.remove(keep_metadata_name)
__delete_data_row_metadata(lb_mdo, [lb_data_row_id], delete_metadata_names, metadata_name_key_to_schema)
print(f'Deleted Metadata for Data Row with ID {lb_data_row_id} and Global Key {lb_global_key}')
return True

def delete_data_rows(event, context):
""" Deletes data rows from Labelbox. First checks to see if the global key for gcs_object_id exists before using the gcs_object_name
""" Delete a data row from Labelbox
- Uses Global Keys to find Data Rows. First will use GCS Object ID, defers to GCS Object Name
Args:
event (dict) : Event payload.
context (google.cloud.functions.Context) : Metadata for the event.
"""
# Get environment variables
labelbox_api_key = os.environ.get("labelbox_api_key", '')
# Connect to Labelbox
try:
client = Client(api_key=labelbox_api_key)
except:
print(f'Could not connect to Labelbox. Please provide this function runtime with a Labelbox API KEY as an Environment Variable with the name "labelbox_api_key"')
# Get event details
gcs_object_id = event['id']
gcs_object_name = event['name']

# Get your Labelbox Data Row ID
gcs_object_name = event['name']
# Grab Labelbox Data Row ID given the Global Key
try:
# gcs_object_id is used as the global key when a data row with the object name as the global key already exists
print(f'Checking if data row with global_key {gcs_object_id} exists')
lb_data_row_id = client.get_data_row_ids_for_global_keys([gcs_object_id])['results'][0]
lb_global_key = gcs_object_id
except:
# If the gcs_object_id was not used as the global key, the gcs_object_name was
lb_data_row_id = client.get_data_row_ids_for_global_keys([gcs_object_name])['results'][0]
lb_global_key = gcs_object_name

# Delete the Labelbox Data Row
# Delete Data Row from Labelbox
print(f'Deleting Data Row with ID {lb_data_row_id} and Global Key {lb_global_key}')
lb_data_row = client.get_data_row(lb_data_row_id)
lb_data_row.delete()

return True
67 changes: 67 additions & 0 deletions processing/metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from labelbox.schema.data_row_metadata import DataRowMetadata, DataRowMetadataKind, DataRowMetadataOntology, DeleteDataRowMetadata

def __sync_metadata_fields_as_strings(client, metadata_names=[]):
""" Ensures Labelbox's Metadata Ontology has all necessary metadata fields given a list of names - this script will always sycn metadata fields as strings
Args:
client : Required (labelbox.Client) : Labelbox client object
metadata_names : Required (list) : List of metadata field names to sync with Labelbox
Returns:
Updated Labelbox metadata ontology object
"""
lb_mdo = client.get_data_row_metadata_ontology()
lb_metadata_names = [field['name'] for field in lb_mdo._get_ontology()]
metadata_names += ['lb_integration_source']
# Iterate over your metadata_index, if a metadata_index key is not an existing metadata_field, then create it in Labelbox
for metadata_name in metadata_names:
if metadata_name not in lb_metadata_names:
lb_mdo.create_schema(name=metadata_name, kind=DataRowMetadataKind.string)
lb_mdo = client.get_data_row_metadata_ontology()
lb_metadata_names = [field['name'] for field in lb_mdo._get_ontology()]
return lb_mdo

def __get_metadata_schema_to_name_key(
lb_mdo:DataRowMetadataOntology
):
""" Creates a dictionary where {key=metadata_schema_id: value=metadata_name_key}
- name_key is name for all metadata fields, and for enum options, it is "parent_name>>child_name"
Args:
lb_mdo : Required (labelbox.schema.data_row_metadata.DataRowMetadataOntology) - Labelbox metadata ontology
Returns:
Dictionary where {key=metadata_schema_id: value=metadata_name_key}
"""
lb_metadata_dict = lb_mdo.reserved_by_name
lb_metadata_dict.update(lb_mdo.custom_by_name)
metadata_schema_to_name_key = {}
for metadata_field_name in lb_metadata_dict:
if type(lb_metadata_dict[metadata_field_name]) == dict:
metadata_schema_to_name_key[lb_metadata_dict[metadata_field_name][next(iter(lb_metadata_dict[metadata_field_name]))].parent] = str(metadata_field_name)
for enum_option in lb_metadata_dict[metadata_field_name]:
metadata_schema_to_name_key[lb_metadata_dict[metadata_field_name][enum_option].uid] = f"{str(metadata_field_name)}///{str(enum_option)}"
else:
metadata_schema_to_name_key[lb_metadata_dict[metadata_field_name].uid] = str(metadata_field_name)
return metadata_schema_to_name_key

def __delete_data_row_metadata(
lb_mdo:DataRowMetadataOntology,
data_row_ids:list, metadata_field_names:list, metadata_name_key_to_schema:dict
):
""" Deletes metadata values from a given set of data rows given a list of metadata field names
Args:
lb_mdo : Required (labelbox.schema.data_row_metadata.DataRowMetadataOntology) - Labelbox metadata ontology
data_row_ids : Required (dict) : List of data row IDs to delete metadata from
metadata_field_names : Required (list) : List of metadata schemas to delete for each data row
metadata_name_key_to_schema : Required (dict) : Dictionary where {key=metadata_schema_id: value=metadata_name_key}
Returns:
True
"""
if 'lb_integration_source' in metadata_field_names:
metadata_field_names.remove('lb_integration_source')
schemas_to_delete = [metadata_name_key_to_schema[name] for name in metadata_field_names]
for data_row_id in data_row_ids:
lb_mdo.bulk_delete([
DeleteDataRowMetadata(
data_row_id = data_row_id,
fields = schemas_to_delete
)
])
return True