From 9cf8e5721d98d909e361e4ee7b84c000e5400f7f Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Fri, 1 Oct 2021 08:59:25 -0600 Subject: [PATCH] [WIP] Azure SDK Instrumentation (Storage + Queue) (#1316) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add skeleton for azure instrumentation * Finish encoding the azureblob operation name table * Fix docstring * Bunch of fixes for azure instrumentation * Add some tests * Add handler for azurequeue * Add Azure queue tests * Implement table storage handler * Remove (rotated) key 😅 * Add .env to .gitignore * Add first azuretable test Plus a bunch of refactoring to make it pass since it's a different library * Add more table tests * Fix default case if we don't recognize the service * Add handler for azure file share * Add Azure fileshare tests * CHANGELOG --- .gitignore | 1 + CHANGELOG.asciidoc | 3 +- elasticapm/instrumentation/packages/azure.py | 438 +++++++++++++++++++ elasticapm/instrumentation/register.py | 1 + tests/instrumentation/azure_tests.py | 270 ++++++++++++ 5 files changed, 712 insertions(+), 1 deletion(-) create mode 100644 elasticapm/instrumentation/packages/azure.py create mode 100644 tests/instrumentation/azure_tests.py diff --git a/.gitignore b/.gitignore index 20f3df943..e217e9f45 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,4 @@ tests/python-agent-junit.xml *.code-workspace .pytest_cache/ .python-version +.env diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 7bcdcf514..ff5ede99f 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -37,12 +37,13 @@ endif::[] [float] ===== Features +* Add instrumentation for Azure Storage (blob/table/fileshare) and Azure Queue {pull}1316[#1316] [float] ===== Bug fixes * Improve span coverage for `asyncpg` {pull}1328[#1328] -* aiohttp: Correctly pass custom client to tracing middleware {pull}1345[#1345] +* aiohttp: Correctly pass custom client to tracing middleware {pull}1345[#1345] * Fixed an issue with httpx instrumentation {pull}1337[#1337] diff --git a/elasticapm/instrumentation/packages/azure.py b/elasticapm/instrumentation/packages/azure.py new file mode 100644 index 000000000..20bc8508d --- /dev/null +++ b/elasticapm/instrumentation/packages/azure.py @@ -0,0 +1,438 @@ +# BSD 3-Clause License +# +# Copyright (c) 2019, Elasticsearch BV +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import json +from collections import namedtuple + +from elasticapm.instrumentation.packages.base import AbstractInstrumentedModule +from elasticapm.traces import capture_span +from elasticapm.utils.compat import urlparse +from elasticapm.utils.logging import get_logger + +logger = get_logger("elasticapm.instrument") + +HandlerInfo = namedtuple("HandlerInfo", ("signature", "span_type", "span_subtype", "span_action", "context")) + + +class AzureInstrumentation(AbstractInstrumentedModule): + name = "azure" + + instrument_list = [ + ("azure.core.pipeline._base", "Pipeline.run"), + ("azure.cosmosdb.table.common._http.httpclient", "_HTTPClient.perform_request"), + ] + + def call(self, module, method, wrapped, instance, args, kwargs): + if len(args) == 1: + request = args[0] + else: + request = kwargs["request"] + + if hasattr(request, "url"): # Azure Storage HttpRequest + parsed_url = urlparse.urlparse(request.url) + hostname = parsed_url.hostname + port = parsed_url.port + path = parsed_url.path + query_params = urlparse.parse_qs(parsed_url.query) + else: # CosmosDB HTTPRequest + hostname = request.host + port = hostname.split(":")[1] if ":" in hostname else 80 + path = request.path + query_params = request.query + + # Detect the service + service = None + if ".blob.core." in hostname: + service = "azureblob" + service_type = "storage" + elif ".queue.core." in hostname: + service = "azurequeue" + service_type = "messaging" + elif ".table.core." in hostname: + service = "azuretable" + service_type = "storage" + elif ".file.core." in hostname: + service = "azurefile" + service_type = "storage" + + # Do not create a span if we don't recognize the service + if not service: + return wrapped(*args, **kwargs) + + context = { + "destination": { + "address": hostname, + "port": port, + } + } + + handler_info = handlers[service](request, hostname, path, query_params, service, service_type, context) + + with capture_span( + handler_info.signature, + span_type=handler_info.span_type, + leaf=True, + span_subtype=handler_info.span_subtype, + span_action=handler_info.span_action, + extra=handler_info.context, + ): + return wrapped(*args, **kwargs) + + +def handle_azureblob(request, hostname, path, query_params, service, service_type, context): + """ + Returns the HandlerInfo for Azure Blob Storage operations + """ + account_name = hostname.split(".")[0] + context["destination"]["service"] = { + "name": service, + "resource": "{}/{}".format(service, account_name), + "type": service_type, + } + method = request.method + headers = request.headers + blob = path[1:] + + operation_name = "Unknown" + if method.lower() == "delete": + operation_name = "Delete" + elif method.lower() == "get": + operation_name = "Download" + if "container" in query_params.get("restype", []): + operation_name = "GetProperties" + if "acl" in query_params.get("comp", []): + operation_name = "GetAcl" + elif "list" in query_params.get("comp", []): + operation_name = "ListBlobs" + elif "metadata" in query_params.get("comp", []): + operation_name = "GetMetadata" + elif "list" in query_params.get("comp", []): + operation_name = "ListContainers" + elif "tags" in query_params.get("comp", []): + operation_name = "GetTags" + if query_params.get("where"): + operation_name = "FindTags" + elif "blocklist" in query_params.get("comp", []): + operation_name = "GetBlockList" + elif "pagelist" in query_params.get("comp", []): + operation_name = "GetPageRanges" + elif "stats" in query_params.get("comp", []): + operation_name = "Stats" + elif "blobs" in query_params.get("comp", []): + operation_name = "FilterBlobs" + elif method.lower() == "head": + operation_name = "GetProperties" + if "container" in query_params.get("restype", []) and query_params.get("comp") == "metadata": + operation_name = "GetMetadata" + elif "container" in query_params.get("restype", []) and query_params.get("comp") == "acl": + operation_name = "GetAcl" + elif method.lower() == "post": + if "batch" in query_params.get("comp", []): + operation_name = "Batch" + elif "query" in query_params.get("comp", []): + operation_name = "Query" + elif "userdelegationkey" in query_params.get("comp", []): + operation_name = "GetUserDelegationKey" + elif method.lower() == "put": + operation_name = "Create" + if "x-ms-copy-source" in headers: + operation_name = "Copy" + # These are repetitive and unnecessary, but included in case the table at + # https://github.com/elastic/apm/blob/master/specs/agents/tracing-instrumentation-azure.md + # changes in the future + if "block" in query_params.get("comp", []): + operation_name = "Copy" + elif "page" in query_params.get("comp", []): + operation_name = "Copy" + elif "incrementalcopy" in query_params.get("comp", []): + operation_name = "Copy" + elif "appendblock" in query_params.get("comp", []): + operation_name = "Copy" + elif "x-ms-blob-type" in headers: + operation_name = "Upload" + elif "x-ms-page-write" in headers and query_params.get("comp") == "page": + operation_name = "Clear" + elif "copy" in query_params.get("comp", []): + operation_name = "Abort" + elif "block" in query_params.get("comp", []): + operation_name = "Upload" + elif "blocklist" in query_params.get("comp", []): + operation_name = "Upload" + elif "page" in query_params.get("comp", []): + operation_name = "Upload" + elif "appendblock" in query_params.get("comp", []): + operation_name = "Upload" + elif "metadata" in query_params.get("comp", []): + operation_name = "SetMetadata" + elif "container" in query_params.get("restype", []) and query_params.get("comp") == "acl": + operation_name = "SetAcl" + elif "properties" in query_params.get("comp", []): + operation_name = "SetProperties" + elif "lease" in query_params.get("comp", []): + operation_name = "Lease" + elif "snapshot" in query_params.get("comp", []): + operation_name = "Snapshot" + elif "undelete" in query_params.get("comp", []): + operation_name = "Undelete" + elif "tags" in query_params.get("comp", []): + operation_name = "SetTags" + elif "tier" in query_params.get("comp", []): + operation_name = "SetTier" + elif "expiry" in query_params.get("comp", []): + operation_name = "SetExpiry" + elif "seal" in query_params.get("comp", []): + operation_name = "Seal" + elif "rename" in query_params.get("comp", []): + operation_name = "Rename" + + signature = "AzureBlob {} {}".format(operation_name, blob) + + return HandlerInfo(signature, service_type, service, operation_name, context) + + +def handle_azurequeue(request, hostname, path, query_params, service, service_type, context): + """ + Returns the HandlerInfo for Azure Queue operations + """ + account_name = hostname.split(".")[0] + method = request.method + resource_name = path.split("/")[1] if "/" in path else account_name # /queuename/messages + context["destination"]["service"] = { + "name": service, + "resource": "{}/{}".format(service, resource_name), + "type": service_type, + } + + operation_name = "UNKNOWN" + preposition = "to " + if method.lower() == "delete": + operation_name = "DELETE" + preposition = "" + if path.endswith("/messages") and "popreceipt" not in query_params: + operation_name = "CLEAR" + elif query_params.get("popreceipt", []): + # Redundant, but included in case the table at + # https://github.com/elastic/apm/blob/master/specs/agents/tracing-instrumentation-azure.md + # changes in the future + operation_name = "DELETE" + preposition = "from " + elif method.lower() == "get": + operation_name = "RECEIVE" + preposition = "from " + if "list" in query_params.get("comp", []): + operation_name = "LISTQUEUES" + elif "properties" in query_params.get("comp", []): + operation_name = "GETPROPERTIES" + elif "stats" in query_params.get("comp", []): + operation_name = "STATS" + elif "metadata" in query_params.get("comp", []): + operation_name = "GETMETADATA" + elif "acl" in query_params.get("comp", []): + operation_name = "GETACL" + elif "true" in query_params.get("peekonly", []): + operation_name = "PEEK" + elif method.lower() == "head": + operation_name = "RECEIVE" + preposition = "from " + if "metadata" in query_params.get("comp", []): + operation_name = "GETMETADATA" + elif "acl" in query_params.get("comp", []): + operation_name = "GETACL" + elif method.lower() == "options": + operation_name = "PREFLIGHT" + preposition = "from " + elif method.lower() == "post": + operation_name = "SEND" + preposition = "to " + elif method.lower() == "put": + operation_name = "CREATE" + preposition = "" + if "metadata" in query_params.get("comp", []): + operation_name = "SETMETADATA" + preposition = "for " + elif "acl" in query_params.get("comp", []): + operation_name = "SETACL" + preposition = "for " + elif "properties" in query_params.get("comp", []): + operation_name = "SETPROPERTIES" + preposition = "for " + elif query_params.get("popreceipt", []): + operation_name = "UPDATE" + preposition = "" + + # If `preposition` is included, it should have a trailing space + signature = "AzureQueue {} {}{}".format(operation_name, preposition, resource_name) + + return HandlerInfo(signature, service_type, service, operation_name.lower(), context) + + +def handle_azuretable(request, hostname, path, query_params, service, service_type, context): + """ + Returns the HandlerInfo for Azure Table Storage operations + """ + account_name = hostname.split(".")[0] + method = request.method + body = request.body + try: + body = json.loads(body) + except json.decoder.JSONDecodeError: # str not bytes + body = {} + # /tablename(PartitionKey='',RowKey='') + resource_name = path.split("/", 1)[1] if "/" in path else path + context["destination"]["service"] = { + "name": service, + "resource": "{}/{}".format(service, account_name), + "type": service_type, + } + + operation_name = "Unknown" + if method.lower() == "put": + operation_name = "Update" + if "properties" in query_params.get("comp", []): + operation_name = "SetProperties" + elif "acl" in query_params.get("comp", []): + operation_name = "SetAcl" + elif method.lower() == "post": + if resource_name == "Tables": + resource_name = body.get("TableName", resource_name) + operation_name = "Create" + else: # / + operation_name = "Insert" + elif method.lower() == "get": + operation_name = "Query" # for both /Tables and /table() + if "properties" in query_params.get("comp", []): + operation_name = "GetProperties" + elif "stats" in query_params.get("comp", []): + operation_name = "Stats" + elif "acl" in query_params.get("comp", []): + operation_name = "GetAcl" + elif method.lower() == "delete": + operation_name = "Delete" + if "Tables" in resource_name and "'" in resource_name: + resource_name = resource_name.split("'")[1] # /Tables('') + elif method.lower() == "options": + operation_name = "Preflight" + elif method.lower() == "head" and "acl" in query_params.get("comp", []): + operation_name = "GetAcl" + elif method.lower() == "merge": + operation_name = "Merge" + + signature = "AzureTable {} {}".format(operation_name, resource_name) + + return HandlerInfo(signature, service_type, service, operation_name, context) + + +def handle_azurefile(request, hostname, path, query_params, service, service_type, context): + """ + Returns the HandlerInfo for Azure File Share Storage operations + """ + account_name = hostname.split(".")[0] + method = request.method + resource_name = path.split("/", 1)[1] if "/" in path else account_name + headers = request.headers + context["destination"]["service"] = { + "name": service, + "resource": "{}/{}".format(service, account_name), + "type": service_type, + } + + operation_name = "Unknown" + if method.lower() == "get": + operation_name = "Download" + if "list" in query_params.get("comp", []): + operation_name = "List" + elif "properties" in query_params.get("comp", []): + operation_name = "GetProperties" + elif "share" in query_params.get("restype", []): + operation_name = "GetProperties" + elif "metadata" in query_params.get("comp", []): + operation_name = "GetMetadata" + elif "acl" in query_params.get("comp", []): + operation_name = "GetAcl" + elif "stats" in query_params.get("comp", []): + operation_name = "Stats" + elif "filepermission" in query_params.get("comp", []): + operation_name = "GetPermission" + elif "listhandles" in query_params.get("comp", []): + operation_name = "ListHandles" + elif "rangelist" in query_params.get("comp", []): + operation_name = "ListRanges" + elif method.lower() == "put": + operation_name = "Create" + if "properties" in query_params.get("comp", []): + operation_name = "SetProperties" + if "share" in query_params.get("restype", []): + operation_name = "SetProperties" + elif "snapshot" in query_params.get("comp", []): + operation_name = "Snapshot" + elif "metadata" in query_params.get("comp", []): + operation_name = "SetMetadata" + elif "undelete" in query_params.get("comp", []): + operation_name = "Undelete" + elif "acl" in query_params.get("comp", []): + operation_name = "SetAcl" + elif "filepermission" in query_params.get("comp", []): + operation_name = "SetPermission" + elif "directory" in query_params.get("restype", []): + operation_name = "Create" + elif "forceclosehandles" in query_params.get("comp", []): + operation_name = "CloseHandles" + elif "range" in query_params.get("comp", []): + operation_name = "Upload" + elif "x-ms-copy-source" in headers: + operation_name = "Copy" + elif "x-ms-copy-action" in headers and headers["x-ms-copy-action"] == "abort": + operation_name = "Abort" + elif "lease" in query_params.get("comp", []): + operation_name = "Lease" + elif method.lower() == "options": + operation_name = "Preflight" + elif method.lower() == "head": + operation_name = "GetProperties" + if "share" in query_params.get("restype", []): + operation_name = "GetProperties" + elif "metadata" in query_params.get("comp", []): + operation_name = "GetMetadata" + elif "acl" in query_params.get("comp", []): + operation_name = "GetAcl" + elif method.lower() == "delete": + operation_name = "Delete" + + signature = "AzureFile {} {}".format(operation_name, resource_name) + + return HandlerInfo(signature, service_type, service, operation_name, context) + + +handlers = { + "azureblob": handle_azureblob, + "azurequeue": handle_azurequeue, + "azuretable": handle_azuretable, + "azurefile": handle_azurefile, +} diff --git a/elasticapm/instrumentation/register.py b/elasticapm/instrumentation/register.py index f6d1344e7..2786b9667 100644 --- a/elasticapm/instrumentation/register.py +++ b/elasticapm/instrumentation/register.py @@ -65,6 +65,7 @@ "elasticapm.instrumentation.packages.graphql.GraphQLBackendInstrumentation", "elasticapm.instrumentation.packages.httpcore.HTTPCoreInstrumentation", "elasticapm.instrumentation.packages.httplib2.Httplib2Instrumentation", + "elasticapm.instrumentation.packages.azure.AzureInstrumentation", } if sys.version_info >= (3, 7): diff --git a/tests/instrumentation/azure_tests.py b/tests/instrumentation/azure_tests.py new file mode 100644 index 000000000..aeaab03c0 --- /dev/null +++ b/tests/instrumentation/azure_tests.py @@ -0,0 +1,270 @@ +# BSD 3-Clause License +# +# Copyright (c) 2019, Elasticsearch BV +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +import os +import uuid + +import pytest + +from elasticapm.conf import constants + +azureblob = pytest.importorskip("azure.storage.blob") +azurequeue = pytest.importorskip("azure.storage.queue") +azuretable = pytest.importorskip("azure.cosmosdb.table") +azurefile = pytest.importorskip("azure.storage.fileshare") +pytestmark = [pytest.mark.azurestorage] + +from azure.cosmosdb.table.tableservice import TableService +from azure.storage.blob import BlobServiceClient +from azure.storage.fileshare import ShareClient +from azure.storage.queue import QueueClient + +CONNECTION_STRING = os.getenv("AZURE_STORAGE_CONNECTION_STRING") + +if not CONNECTION_STRING: + pytestmark.append( + pytest.mark.skip("Skipping azure storage tests, no AZURE_STORAGE_CONNECTION_STRING environment variable set") + ) + + +@pytest.fixture() +def container_client(blob_service_client): + container_name = "apm-agent-python-ci-" + str(uuid.uuid4()) + container_client = blob_service_client.create_container(container_name) + + yield container_client + + blob_service_client.delete_container(container_name) + + +@pytest.fixture() +def blob_service_client(): + blob_service_client = BlobServiceClient.from_connection_string(CONNECTION_STRING) + return blob_service_client + + +@pytest.fixture() +def queue_client(): + queue_name = "apm-agent-python-ci-" + str(uuid.uuid4()) + queue_client = QueueClient.from_connection_string(CONNECTION_STRING, queue_name) + queue_client.create_queue() + + yield queue_client + + queue_client.delete_queue() + + +@pytest.fixture() +def table_service(): + table_name = "apmagentpythonci" + str(uuid.uuid4().hex) + table_service = TableService(connection_string=CONNECTION_STRING) + table_service.create_table(table_name) + table_service.table_name = table_name + + yield table_service + + table_service.delete_table(table_name) + + +@pytest.fixture() +def share_client(): + share_name = "apmagentpythonci" + str(uuid.uuid4().hex) + share_client = ShareClient.from_connection_string(conn_str=CONNECTION_STRING, share_name=share_name) + share_client.create_share() + + yield share_client + + share_client.delete_share() + + +def test_blob_list_blobs(instrument, elasticapm_client, container_client): + elasticapm_client.begin_transaction("transaction.test") + list(container_client.list_blobs()) + elasticapm_client.end_transaction("MyView") + span = elasticapm_client.events[constants.SPAN][0] + + assert span["name"] == "AzureBlob ListBlobs {}".format(container_client.container_name) + assert span["type"] == "storage" + assert span["subtype"] == "azureblob" + assert span["action"] == "ListBlobs" + + +def test_blob_create_container(instrument, elasticapm_client, blob_service_client): + elasticapm_client.begin_transaction("transaction.test") + container_name = str(uuid.uuid4()) + container_client = blob_service_client.create_container(container_name) + blob_service_client.delete_container(container_name) + elasticapm_client.end_transaction("MyView") + span = elasticapm_client.events[constants.SPAN][0] + + assert span["name"] == "AzureBlob Create {}".format(container_name) + assert span["type"] == "storage" + assert span["subtype"] == "azureblob" + assert span["action"] == "Create" + + +def test_blob_upload(instrument, elasticapm_client, container_client, blob_service_client): + elasticapm_client.begin_transaction("transaction.test") + # Upload this file to the container + blob_client = blob_service_client.get_blob_client(container=container_client.container_name, blob=__file__) + with open(__file__, "rb") as data: + blob_client.upload_blob(data) + elasticapm_client.end_transaction("MyView") + span = elasticapm_client.events[constants.SPAN][0] + + assert span["name"] == "AzureBlob Upload {}/{}".format(container_client.container_name, __file__) + assert span["type"] == "storage" + assert span["subtype"] == "azureblob" + assert span["action"] == "Upload" + + +def test_queue(instrument, elasticapm_client, queue_client): + elasticapm_client.begin_transaction("transaction.test") + # Send a message + queue_client.send_message("Test message") + list(queue_client.peek_messages()) + messages = queue_client.receive_messages() + for msg_batch in messages.by_page(): + for msg in msg_batch: + queue_client.delete_message(msg) + elasticapm_client.end_transaction("MyView") + + span = elasticapm_client.events[constants.SPAN][0] + assert span["name"] == "AzureQueue SEND to {}".format(queue_client.queue_name) + assert span["type"] == "messaging" + assert span["subtype"] == "azurequeue" + assert span["action"] == "send" + + span = elasticapm_client.events[constants.SPAN][1] + assert span["name"] == "AzureQueue PEEK from {}".format(queue_client.queue_name) + assert span["type"] == "messaging" + assert span["subtype"] == "azurequeue" + assert span["action"] == "peek" + + span = elasticapm_client.events[constants.SPAN][2] + assert span["name"] == "AzureQueue RECEIVE from {}".format(queue_client.queue_name) + assert span["type"] == "messaging" + assert span["subtype"] == "azurequeue" + assert span["action"] == "receive" + + span = elasticapm_client.events[constants.SPAN][3] + assert span["name"] == "AzureQueue DELETE from {}".format(queue_client.queue_name) + assert span["type"] == "messaging" + assert span["subtype"] == "azurequeue" + assert span["action"] == "delete" + + +def test_table_create(instrument, elasticapm_client): + table_name = "apmagentpythonci" + str(uuid.uuid4().hex) + table_service = TableService(connection_string=CONNECTION_STRING) + + elasticapm_client.begin_transaction("transaction.test") + table_service.create_table(table_name) + table_service.delete_table(table_name) + elasticapm_client.end_transaction("MyView") + + span = elasticapm_client.events[constants.SPAN][0] + + assert span["name"] == "AzureTable Create {}".format(table_name) + assert span["type"] == "storage" + assert span["subtype"] == "azuretable" + assert span["action"] == "Create" + + +def test_table(instrument, elasticapm_client, table_service): + table_name = table_service.table_name + elasticapm_client.begin_transaction("transaction.test") + task = {"PartitionKey": "tasksSeattle", "RowKey": "001", "description": "Take out the trash", "priority": 200} + table_service.insert_entity(table_name, task) + task = {"PartitionKey": "tasksSeattle", "RowKey": "001", "description": "Take out the garbage", "priority": 250} + table_service.update_entity(table_name, task) + task = table_service.get_entity(table_name, "tasksSeattle", "001") + table_service.delete_entity(table_name, "tasksSeattle", "001") + elasticapm_client.end_transaction("MyView") + + span = elasticapm_client.events[constants.SPAN][0] + assert span["name"] == "AzureTable Insert {}".format(table_name) + assert span["type"] == "storage" + assert span["subtype"] == "azuretable" + assert span["action"] == "Insert" + + span = elasticapm_client.events[constants.SPAN][1] + assert span["name"] == "AzureTable Update {}(PartitionKey='tasksSeattle',RowKey='001')".format(table_name) + assert span["type"] == "storage" + assert span["subtype"] == "azuretable" + assert span["action"] == "Update" + + span = elasticapm_client.events[constants.SPAN][2] + assert span["name"] == "AzureTable Query {}(PartitionKey='tasksSeattle',RowKey='001')".format(table_name) + assert span["type"] == "storage" + assert span["subtype"] == "azuretable" + assert span["action"] == "Query" + + span = elasticapm_client.events[constants.SPAN][3] + assert span["name"] == "AzureTable Delete {}(PartitionKey='tasksSeattle',RowKey='001')".format(table_name) + assert span["type"] == "storage" + assert span["subtype"] == "azuretable" + assert span["action"] == "Delete" + + +def test_fileshare(instrument, elasticapm_client, share_client): + elasticapm_client.begin_transaction("transaction.test") + # Upload this file to the share + file_client = share_client.get_file_client("testfile.txt") + with open(__file__, "rb") as data: + file_client.upload_file(data) + file_client.download_file() + file_client.delete_file() + elasticapm_client.end_transaction("MyView") + + span = elasticapm_client.events[constants.SPAN][0] + assert span["name"] == "AzureFile Create {}/testfile.txt".format(share_client.share_name) + assert span["type"] == "storage" + assert span["subtype"] == "azurefile" + assert span["action"] == "Create" + + span = elasticapm_client.events[constants.SPAN][1] + assert span["name"] == "AzureFile Upload {}/testfile.txt".format(share_client.share_name) + assert span["type"] == "storage" + assert span["subtype"] == "azurefile" + assert span["action"] == "Upload" + + span = elasticapm_client.events[constants.SPAN][2] + assert span["name"] == "AzureFile Download {}/testfile.txt".format(share_client.share_name) + assert span["type"] == "storage" + assert span["subtype"] == "azurefile" + assert span["action"] == "Download" + + span = elasticapm_client.events[constants.SPAN][3] + assert span["name"] == "AzureFile Delete {}/testfile.txt".format(share_client.share_name) + assert span["type"] == "storage" + assert span["subtype"] == "azurefile" + assert span["action"] == "Delete"