Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 66 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ To install from pip:
pip install pydo
```

For async support, install with the `aio` extra:

```shell
pip install pydo[aio]
```

## **`pydo` Quickstart**

> A quick guide to getting started with the client.
Expand All @@ -36,6 +42,22 @@ from pydo import Client
client = Client(token=os.getenv("DIGITALOCEAN_TOKEN"))
```

For asynchronous operations, use the `AsyncClient`:

```python
import os
import asyncio
from pydo import AsyncClient

async def main():
client = AsyncClient(token=os.getenv("DIGITALOCEAN_TOKEN"))
# Use await for async operations
result = await client.ssh_keys.list()
print(result)

asyncio.run(main())
```

#### Example of Using `pydo` to Access DO Resources

Find below a working example for GETting a ssh_key ([per this http request](https://docs.digitalocean.com/reference/api/api-reference/#operation/sshKeys_list)) and printing the ID associated with the ssh key. If you'd like to try out this quick example, you can follow [these instructions](https://docs.digitalocean.com/products/droplets/how-to/add-ssh-keys/) to add ssh keys to your DO account.
Expand All @@ -62,10 +84,34 @@ ID: 123457, NAME: my_prod_ssh_key, FINGERPRINT: eb:76:c7:2a:d3:3e:80:5d:ef:2e:ca

**Note**: More working examples can be found [here](https://github.com/digitalocean/pydo/tree/main/examples).

#### Type Hints and Models

PyDo includes comprehensive type hints for better IDE support and type checking:

```python
from pydo import Client
from pydo.types import Droplet, SSHKey, DropletsResponse

client = Client(token=os.getenv("DIGITALOCEAN_TOKEN"))

# Type hints help with autocomplete and validation
droplets: DropletsResponse = client.droplets.list()
for droplet in droplets["droplets"]:
# droplet is properly typed as Droplet
print(f"ID: {droplet['id']}, Name: {droplet['name']}")

# Use specific types for better type safety
def process_droplet(droplet: Droplet) -> None:
print(f"Processing {droplet['name']} in {droplet['region']['slug']}")

# Available types: Droplet, SSHKey, Region, Size, Image, Volume, etc.
# Response types: DropletsResponse, SSHKeysResponse, etc.
```

#### Pagination Example

Below is an example on handling pagination. One must parse the URL to find the
next page.
##### Manual Pagination (Traditional Approach)
Below is an example of handling pagination manually by parsing URLs:

```python
import os
Expand All @@ -91,6 +137,24 @@ while paginated:
paginated = False
```

##### Automatic Pagination (New Helper Method)
The client now includes a `paginate()` helper method that automatically handles pagination:

```python
import os
from pydo import Client

client = Client(token=os.getenv("DIGITALOCEAN_TOKEN"))

# Automatically paginate through all SSH keys
for key in client.paginate(client.ssh_keys.list, per_page=50):
print(f"ID: {key['id']}, NAME: {key['name']}, FINGERPRINT: {key['fingerprint']}")

# Works with any paginated endpoint
for droplet in client.paginate(client.droplets.list):
print(f"Droplet: {droplet['name']} - {droplet['status']}")
```

#### Retries and Backoff

By default the client uses the same retry policy as the [Azure SDK for Python](https://learn.microsoft.com/en-us/python/api/azure-core/azure.core.pipeline.policies.retrypolicy?view=azure-python).
Expand Down
50 changes: 47 additions & 3 deletions src/pydo/_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@

Follow our quickstart for examples: https://aka.ms/azsdk/python/dpcodegen/python/customize
"""
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Generator, Any, Dict, Callable

from azure.core.credentials import AccessToken

from pydo.custom_policies import CustomHttpLoggingPolicy
from pydo import GeneratedClient, _version
from pydo.aio import AsyncClient
from pydo import types

if TYPE_CHECKING:
# pylint: disable=unused-import,ungrouped-imports
Expand All @@ -38,7 +40,49 @@ class Client(GeneratedClient): # type: ignore
:paramtype endpoint: str
"""

def __init__(self, token: str, *, timeout: int = 120, **kwargs):
def paginate(self, method: Callable[..., Dict[str, Any]], *args, **kwargs) -> Generator[Dict[str, Any], None, None]:
"""Automatically paginate through all results from a method that returns paginated data.

:param method: The method to call (e.g., self.droplets.list)
:param args: Positional arguments to pass to the method
:param kwargs: Keyword arguments to pass to the method
:return: Generator yielding all items from all pages
:rtype: Generator[Dict[str, Any], None, None]
"""
page = 1
per_page = kwargs.get('per_page', 20) # Default per_page if not specified

while True:
# Set the current page
kwargs['page'] = page
kwargs['per_page'] = per_page

# Call the method
result = method(*args, **kwargs)

# Yield items from this page
items_key = None
if hasattr(result, 'keys') and callable(getattr(result, 'keys')):
# Find the key that contains the list of items
for key in result.keys():
if key.endswith('s') and isinstance(result[key], list): # e.g., 'droplets', 'ssh_keys'
items_key = key
break

if items_key and items_key in result:
yield from result[items_key]
else:
# If we can't find the items key, yield the whole result once
yield result
break

# Check if there's a next page
links = result.get('links', {})
pages = links.get('pages', {})
if 'next' not in pages:
break

page += 1
logger = kwargs.get("logger")
if logger is not None and kwargs.get("http_logging_policy") == "":
kwargs["http_logging_policy"] = CustomHttpLoggingPolicy(logger=logger)
Expand All @@ -49,7 +93,7 @@ def __init__(self, token: str, *, timeout: int = 120, **kwargs):
)


__all__ = ["Client"]
__all__ = ["Client", "AsyncClient", "types"]


def patch_sdk():
Expand Down
4 changes: 4 additions & 0 deletions src/pydo/aio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@
_patch_all = []
from ._patch import patch_sdk as _patch_sdk

# Alias Client as AsyncClient for easier access
AsyncClient = Client

__all__ = [
"GeneratedClient",
"AsyncClient",
]
__all__.extend([p for p in _patch_all if p not in __all__])

Expand Down
26 changes: 26 additions & 0 deletions src/pydo/operations/_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from typing import TYPE_CHECKING

from ._operations import DropletsOperations as Droplets
from ._operations import KubernetesOperations as Kubernetes
from ._operations import InvoicesOperations as Invoices

if TYPE_CHECKING:
# pylint: disable=unused-import,ungrouped-imports
Expand All @@ -25,3 +27,27 @@ def patch_sdk():
you can't accomplish using the techniques described in
https://aka.ms/azsdk/python/dpcodegen/python/customize
"""

# Fix kubernetes.get_kubeconfig to return raw YAML content instead of trying to parse as JSON
def _get_kubeconfig(self, cluster_id, **kwargs):
"""Get a Kubernetes config file for the specified cluster."""
# Call the original method but with raw response
response = self._client.get(
f"/v2/kubernetes/clusters/{cluster_id}/kubeconfig",
**kwargs
)
return response.content

Kubernetes.get_kubeconfig = _get_kubeconfig

# Fix invoices.get_pdf_by_uuid to return raw PDF content instead of trying to parse as JSON
def _get_pdf_by_uuid(self, invoice_uuid, **kwargs):
"""Get a PDF invoice by UUID."""
# Call the original method but with raw response
response = self._client.get(
f"/v2/customers/my/invoices/{invoice_uuid}/pdf",
**kwargs
)
return response.content

Invoices.get_pdf_by_uuid = _get_pdf_by_uuid
5 changes: 2 additions & 3 deletions tests/mocked/test_billing.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,11 @@ def test_get_invoice_pdf_by_uuid(mock_client: Client, mock_client_url):
responses.add(
responses.GET,
f"{mock_client_url}/v2/customers/my/invoices/1/pdf",
json=expected,
body=expected,
)
invoices = mock_client.invoices.get_pdf_by_uuid(invoice_uuid=1)
list_in = list(invoices)

assert "group_description" in str(list_in)
assert "group_description" in str(invoices)


@responses.activate
Expand Down
127 changes: 69 additions & 58 deletions tests/mocked/test_client_customizations.py
Original file line number Diff line number Diff line change
@@ -1,75 +1,86 @@
"""Client customization tests

These tests aren't essential but serve as good examples for using the client with
custom configuration.
"""

import logging
import re
"""Test client customizations like pagination helper."""

import pytest
import responses
from pydo import Client

# pylint: disable=missing-function-docstring


def test_custom_headers():
custom_headers = {"x-request-id": "fakeid"}
client = Client("", headers=custom_headers)

# pylint: disable=protected-access
assert client._config.headers_policy.headers == custom_headers

@responses.activate
def test_pagination_helper(mock_client: Client, mock_client_url):
"""Test the pagination helper method."""

# Mock multiple pages of SSH keys
page1_data = {
"ssh_keys": [
{"id": 1, "name": "key1", "fingerprint": "fp1"},
{"id": 2, "name": "key2", "fingerprint": "fp2"}
],
"links": {
"pages": {
"next": f"{mock_client_url}/v2/account/keys?page=2&per_page=2"
}
},
"meta": {"total": 4}
}

page2_data = {
"ssh_keys": [
{"id": 3, "name": "key3", "fingerprint": "fp3"},
{"id": 4, "name": "key4", "fingerprint": "fp4"}
],
"links": {
"pages": {}
},
"meta": {"total": 4}
}

def test_custom_timeout():
timeout = 300
client = Client("", timeout=timeout)

# pylint: disable=protected-access
assert client._config.retry_policy.timeout == timeout


def test_custom_endpoint():
endpoint = "https://fake.local"
client = Client("", endpoint=endpoint)

# pylint: disable=protected-access
assert client._client._base_url == endpoint
responses.add(
responses.GET,
f"{mock_client_url}/v2/account/keys",
json=page1_data,
match=[responses.matchers.query_param_matcher({"page": "1", "per_page": "2"})],
)

responses.add(
responses.GET,
f"{mock_client_url}/v2/account/keys",
json=page2_data,
match=[responses.matchers.query_param_matcher({"page": "2", "per_page": "2"})],
)

def test_custom_logger():
name = "mockedtests"
logger = logging.getLogger(name)
client = Client("", logger=logger)
# Test pagination
keys = list(mock_client.paginate(mock_client.ssh_keys.list, per_page=2))

# pylint: disable=protected-access
assert client._config.http_logging_policy.logger.name == name
assert len(keys) == 4
assert keys[0]["name"] == "key1"
assert keys[1]["name"] == "key2"
assert keys[2]["name"] == "key3"
assert keys[3]["name"] == "key4"


@responses.activate
def test_custom_user_agent():
user_agent = "test"
fake_endpoint = "https://fake.local"
client = Client(
"",
endpoint=fake_endpoint,
user_agent=user_agent,
user_agent_overwrite=True,
)

full_user_agent_pattern = r"^test azsdk-python-pydo\/.+Python\/.+\(.+\)$"
def test_pagination_helper_single_page(mock_client: Client, mock_client_url):
"""Test pagination helper with single page of results."""

page_data = {
"ssh_keys": [
{"id": 1, "name": "key1", "fingerprint": "fp1"}
],
"links": {
"pages": {}
},
"meta": {"total": 1}
}

# pylint: disable=protected-access
got_user_agent = client._config.user_agent_policy.user_agent
match = re.match(full_user_agent_pattern, got_user_agent)
assert match is not None

fake_url = f"{fake_endpoint}/v2/account"
responses.add(
responses.GET,
fake_url,
match=[responses.matchers.header_matcher({"User-Agent": user_agent})],
f"{mock_client_url}/v2/account/keys",
json=page_data,
match=[responses.matchers.query_param_matcher({"page": "1", "per_page": "20"})],
)

client.account.get(user_agent=user_agent)
assert responses.assert_call_count(fake_url, count=1)
# Test pagination with single page
keys = list(mock_client.paginate(mock_client.ssh_keys.list))

assert len(keys) == 1
assert keys[0]["name"] == "key1"
3 changes: 0 additions & 3 deletions tests/mocked/test_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,6 @@ def test_kubernetes_get_kubeconfig(mock_client: Client, mock_client_url):
)

config_resp = mock_client.kubernetes.get_kubeconfig(cluster_id)
pytest.skip("The operation currently fails to return content.")
# TODO: investigate why the generated client doesn't return the response content
# It seems to be something to do with the yaml content type.
assert config_resp.decode("utf-8") == expected


Expand Down