From 0c2dee08597ae197d9eeabef67429f081bf113bf Mon Sep 17 00:00:00 2001 From: Morgan Funtowicz Date: Tue, 27 Aug 2024 11:27:11 +0200 Subject: [PATCH 1/3] (feat) add opensearch-py dependency for perf-tracking extra --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index 2e8c9489a8..06b16675a8 100644 --- a/setup.py +++ b/setup.py @@ -91,6 +91,7 @@ "quality": QUALITY_REQUIRE, "benchmark": BENCHMARK_REQUIRE, "doc-build": ["accelerate"], + "perf-tracking": ["opensearch-py >= 2.7"] } setup( From 4db50e852bd1f304537ca4c59a8c70c01fb568fc Mon Sep 17 00:00:00 2001 From: Morgan Funtowicz Date: Tue, 27 Aug 2024 15:31:47 +0200 Subject: [PATCH 2/3] (feat) add performance tracking stack --- optimum/tools/__init__.py | 0 optimum/tools/records.py | 156 ++++++++++++++++++++++++++++++++++++++ setup.py | 2 +- 3 files changed, 157 insertions(+), 1 deletion(-) create mode 100644 optimum/tools/__init__.py create mode 100644 optimum/tools/records.py diff --git a/optimum/tools/__init__.py b/optimum/tools/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/optimum/tools/records.py b/optimum/tools/records.py new file mode 100644 index 0000000000..5a61bb849d --- /dev/null +++ b/optimum/tools/records.py @@ -0,0 +1,156 @@ +import re +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, Dict, Protocol, Optional +from urllib.parse import urlparse + +from opensearchpy import OpenSearch + + +PERFORMANCE_RECORD_LATENCY_MS = "latency" +PERFORMANCE_RECORD_THROUGHPUT_SAMPLE_PER_SEC = "throughput" + + +@dataclass +class PerformanceRecord: + metric: str + kind: str + value: Any + + when: datetime = field(default_factory=lambda: datetime.now()) + meta: Dict[str, Any] = field(default_factory=dict) + + @staticmethod + def latency(metric: str, value_ms: float, meta: Optional[Dict[str, Any]] = None, when: Optional[datetime] = None): + r""" + Create a PerformanceRecord tracking latency information + + Args: + `metric` (`str`): + Metric identifier + `value_ms` (`float`): + The recorded latency, in millisecond, for the underlying metric record + `meta` (`Optional[Dict[str, Any]]`, defaults to `{}`) + Information relative to the recorded metric to store alongside the metric readout + `when` (`Optional[datetime]`, defaults to `datetime.now()`) + Indicates when the underlying metric was recorded + Returns: + The performance record for the target metric representing latency + """ + return PerformanceRecord( + metric=metric, kind=PERFORMANCE_RECORD_LATENCY_MS, value=value_ms, when=when, meta=meta + ) + + @staticmethod + def throughput(metric: str, value_sample_per_sec: float, meta: Optional[Dict[str, Any]] = None, when: Optional[datetime] = None): + r""" + Create a PerformanceRecord tracking throughput information + + Args: + `metric` (`str`): + Metric identifier + `value_sample_per_sec` (`float`): + The recorded throughput, in samples per second, for the underlying metric record + `meta` (`Optional[Dict[str, Any]]`, defaults to `{}`) + Information relative to the recorded metric to store alongside the metric readout + `when` (`Optional[datetime]`, defaults to `datetime.now()`) + Indicates when the underlying metric was recorded + Returns: + The performance record for the target metric representing throughput + """ + return PerformanceRecord( + metric=metric, + kind=PERFORMANCE_RECORD_THROUGHPUT_SAMPLE_PER_SEC, + value=value_sample_per_sec, + when=when, + meta=meta + ) + + def as_document(self) -> Dict[str, Any]: + r""" + Convert the actual `PerformanceRecord` to a dictionary based representation compatible with document storage + Returns: + Dictionary of strings keys with the information stored in this record + """ + parcel = { "date": self.when.timestamp(), "metric": self.metric, "kind": self.kind, "value": self.value } + return parcel | self.meta + + +class PerformanceTrackerStore(Protocol): + + @staticmethod + def from_uri(uri: str) -> "PerformanceTrackerStore": + pass + + def push(self, collection: str, record: "PerformanceRecord"): + pass + + + +class OpenSearchPerformanceTrackerStore(PerformanceTrackerStore): + # Extract region and service from AWS url (ex: us-east-1.es.amazonaws.com) + AWS_URL_RE = re.compile(r"([a-z]+-[a-z]+-[0-9])\.(.*)?\.amazonaws.com") + + def __init__(self, url: str, auth): + uri = urlparse(url) + self._client = OpenSearch( + [{"host": uri.hostname, "port": uri.port or 443}], + http_auth = auth, + http_compress = True, + use_ssl = True + ) + + # Sanity check + self._client.info() + + @staticmethod + def from_uri(uri: str) -> "PerformanceTrackerStore": + if not (_uri := urlparse(uri)).scheme.startswith("es"): + raise ValueError(f"Invalid URI {uri}: should start with os:// or os+aws://") + + if _uri.scheme == "es+aws": + from boto3 import Session as AwsSession + from botocore.credentials import Credentials as AwsCredentials + from opensearchpy import AWSV4SignerAuth, Urllib3AWSV4SignerAuth + + # Create AWS session from the (eventual) creds + if not _uri.username and not _uri.password: + session = AwsSession() + creds = session.get_credentials() + else: + creds = AwsCredentials(_uri.username, _uri.password) + + # Parse the url to extract region and service + if len(match := re.findall(OpenSearchPerformanceTrackerStore.AWS_URL_RE, _uri.netloc)) != 1: + raise ValueError(f"Failed to parse AWS es service URL {uri}") + + region, service = match[0] + auth = Urllib3AWSV4SignerAuth(creds, region, service) + else: + auth = (_uri.username, _uri.password) + + return OpenSearchPerformanceTrackerStore(uri, auth) + + def _ensure_collection_exists(self, collection: str): + if not self._client.indices.exists(collection): + self._client.indices.create(collection) + + def push(self, collection: str, record: "PerformanceRecord"): + self._ensure_collection_exists(collection) + self._client.index(collection, record.as_document()) + + +class AutoPerformanceTracker: + + @staticmethod + def from_uri(uri: str) -> "PerformanceTrackerStore": + if uri.startswith("es://") or uri.startswith("es+aws://"): + return OpenSearchPerformanceTrackerStore.from_uri(uri) + + raise ValueError( + f"Unable to determine the service associated with URI: {uri}. " + "Valid schemas are es:// or es+aws://" + ) + + + diff --git a/setup.py b/setup.py index 06b16675a8..266b994ff3 100644 --- a/setup.py +++ b/setup.py @@ -91,7 +91,7 @@ "quality": QUALITY_REQUIRE, "benchmark": BENCHMARK_REQUIRE, "doc-build": ["accelerate"], - "perf-tracking": ["opensearch-py >= 2.7"] + "perf-tracking": ["boto3 >= 1.35", "opensearch-py >= 2.7"] } setup( From 2732d539db63ef1d5fcc60ec26e1973d980924c7 Mon Sep 17 00:00:00 2001 From: Morgan Funtowicz Date: Tue, 27 Aug 2024 15:39:41 +0200 Subject: [PATCH 3/3] (doc) add some more documentation --- optimum/tools/records.py | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/optimum/tools/records.py b/optimum/tools/records.py index 5a61bb849d..ccab7b62db 100644 --- a/optimum/tools/records.py +++ b/optimum/tools/records.py @@ -77,17 +77,48 @@ def as_document(self) -> Dict[str, Any]: class PerformanceTrackerStore(Protocol): + r""" + Base interface defining a performance tracker tool + """ @staticmethod def from_uri(uri: str) -> "PerformanceTrackerStore": + r""" + Create the `PerformanceTrackerStore` from the provided URI information + + Args: + `uri` (`str`): + URI specifying over which protocol and where will be stored the record(s) + + Returns: + Instance of a `PerformanceTrackerStore` which information are inferred from the specified URI + """ pass def push(self, collection: str, record: "PerformanceRecord"): + r""" + Attempt to append the provided record to the underlying tracker putting under the specified collection + + Args: + `collection` (`str`): + Name of the bucket the specified record should be pushed + `record` (`PerformanceRecord`): + The materialized record to push + """ pass class OpenSearchPerformanceTrackerStore(PerformanceTrackerStore): + r""" + Amazon Web Services (AWS) OpenSearch based PerformanceTrackerStore + + Supported URIs are as follows: + - os://: + - os+aws://: + - os+aws://: - will use the stored aws credentials on the system + """ + # Extract region and service from AWS url (ex: us-east-1.es.amazonaws.com) AWS_URL_RE = re.compile(r"([a-z]+-[a-z]+-[0-9])\.(.*)?\.amazonaws.com") @@ -111,7 +142,7 @@ def from_uri(uri: str) -> "PerformanceTrackerStore": if _uri.scheme == "es+aws": from boto3 import Session as AwsSession from botocore.credentials import Credentials as AwsCredentials - from opensearchpy import AWSV4SignerAuth, Urllib3AWSV4SignerAuth + from opensearchpy import Urllib3AWSV4SignerAuth # Create AWS session from the (eventual) creds if not _uri.username and not _uri.password: