Skip to content

Commit

Permalink
Managed kafka code samples (GoogleCloudPlatform#11994)
Browse files Browse the repository at this point in the history
* samples(managedkafka): Add snippets for all API methods

* address checklist

* Add requirements.txt. Address stylistic/organizational comments. Refactor get/list to return the values they are retrieving.

* address updating consumer group

* Remove blank space
  • Loading branch information
luongadam authored Jul 15, 2024
1 parent ebc9766 commit b97d17e
Show file tree
Hide file tree
Showing 19 changed files with 1,226 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
/composer/**/* @GoogleCloudPlatform/cloud-dpes-composer @GoogleCloudPlatform/python-samples-reviewers @GoogleCloudPlatform/cloud-samples-reviewers
/pubsub/**/* @GoogleCloudPlatform/api-pubsub-and-pubsublite @GoogleCloudPlatform/python-samples-reviewers @GoogleCloudPlatform/cloud-samples-reviewers
/pubsublite/**/* @GoogleCloudPlatform/api-pubsub-and-pubsublite @GoogleCloudPlatform/python-samples-reviewers @GoogleCloudPlatform/cloud-samples-reviewers
/managedkafka/**/* @GoogleCloudPlatform/api-pubsub-and-pubsublite @GoogleCloudPlatform/python-samples-reviewers @GoogleCloudPlatform/cloud-samples-reviewers
/cloud_tasks/**/* @GoogleCloudPlatform/torus-dpe @GoogleCloudPlatform/python-samples-reviewers @GoogleCloudPlatform/cloud-samples-reviewers

# For practicing
Expand Down
154 changes: 154 additions & 0 deletions managedkafka/snippets/clusters/clusters_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from unittest import mock
from unittest.mock import MagicMock

import create_cluster
import delete_cluster
import get_cluster
from google.api_core.operation import Operation
from google.cloud import managedkafka_v1
import list_clusters
import pytest
import update_cluster

PROJECT_ID = "test-project-id"
REGION = "us-central1"
CLUSTER_ID = "test-cluster-id"


@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.create_cluster")
def test_create_cluster(
mock_method: MagicMock,
capsys: pytest.CaptureFixture[str],
):
cpu = 3
memory_bytes = 3221225472
subnet = "test-subnet"
operation = mock.MagicMock(spec=Operation)
cluster = managedkafka_v1.Cluster()
cluster.name = managedkafka_v1.ManagedKafkaClient.cluster_path(
PROJECT_ID, REGION, CLUSTER_ID
)
operation.result = mock.MagicMock(return_value=cluster)
mock_method.return_value = operation

create_cluster.create_cluster(
project_id=PROJECT_ID,
region=REGION,
cluster_id=CLUSTER_ID,
subnet=subnet,
cpu=cpu,
memory_bytes=memory_bytes,
)

out, _ = capsys.readouterr()
assert "Created cluster" in out
assert CLUSTER_ID in out
mock_method.assert_called_once()


@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.get_cluster")
def test_get_cluster(
mock_method: MagicMock,
capsys: pytest.CaptureFixture[str],
):
cluster = managedkafka_v1.Cluster()
cluster.name = managedkafka_v1.ManagedKafkaClient.cluster_path(
PROJECT_ID, REGION, CLUSTER_ID
)
mock_method.return_value = cluster

get_cluster.get_cluster(
project_id=PROJECT_ID,
region=REGION,
cluster_id=CLUSTER_ID,
)

out, _ = capsys.readouterr()
assert "Got cluster" in out
assert CLUSTER_ID in out
mock_method.assert_called_once()


@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.update_cluster")
def test_update_cluster(
mock_method: MagicMock,
capsys: pytest.CaptureFixture[str],
):
new_memory_bytes = 3221225475
operation = mock.MagicMock(spec=Operation)
cluster = managedkafka_v1.Cluster()
cluster.name = managedkafka_v1.ManagedKafkaClient.cluster_path(
PROJECT_ID, REGION, CLUSTER_ID
)
cluster.capacity_config.memory_bytes = new_memory_bytes
operation.result = mock.MagicMock(return_value=cluster)
mock_method.return_value = operation

update_cluster.update_cluster(
project_id=PROJECT_ID,
region=REGION,
cluster_id=CLUSTER_ID,
memory_bytes=new_memory_bytes,
)

out, _ = capsys.readouterr()
assert "Updated cluster" in out
assert CLUSTER_ID in out
assert str(new_memory_bytes) in out
mock_method.assert_called_once()


@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.list_clusters")
def test_list_clusters(
mock_method: MagicMock,
capsys: pytest.CaptureFixture[str],
):
cluster = managedkafka_v1.Cluster()
cluster.name = managedkafka_v1.ManagedKafkaClient.cluster_path(
PROJECT_ID, REGION, CLUSTER_ID
)
response = [cluster]
mock_method.return_value = response

list_clusters.list_clusters(
project_id=PROJECT_ID,
region=REGION,
)

out, _ = capsys.readouterr()
assert "Got cluster" in out
assert CLUSTER_ID in out
mock_method.assert_called_once()


@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.delete_cluster")
def test_delete_cluster(
mock_method: MagicMock,
capsys: pytest.CaptureFixture[str],
):
operation = mock.MagicMock(spec=Operation)
mock_method.return_value = operation

delete_cluster.delete_cluster(
project_id=PROJECT_ID,
region=REGION,
cluster_id=CLUSTER_ID,
)

out, _ = capsys.readouterr()
assert "Deleted cluster" in out
mock_method.assert_called_once()
71 changes: 71 additions & 0 deletions managedkafka/snippets/clusters/create_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START managedkafka_create_cluster]
from google.api_core.exceptions import GoogleAPICallError
from google.cloud import managedkafka_v1


def create_cluster(
project_id: str,
region: str,
cluster_id: str,
subnet: str,
cpu: int,
memory_bytes: int,
) -> None:
"""
Create a Kafka cluster.
Args:
project_id: Google Cloud project ID.
region: Cloud region.
cluster_id: ID of the Kafka cluster.
subnet: VPC subnet from which the cluster is accessible. The expected format is projects/{project_id}/regions{region}/subnetworks/{subnetwork}.
cpu: Number of vCPUs to provision for the cluster.
memory_bytes: The memory to provision for the cluster in bytes.
Raises:
This method will raise the exception if the operation errors or
the timeout before the operation completes is reached.
"""

client = managedkafka_v1.ManagedKafkaClient()

cluster = managedkafka_v1.Cluster()
cluster.name = client.cluster_path(project_id, region, cluster_id)
cluster.capacity_config.vcpu_count = cpu
cluster.capacity_config.memory_bytes = memory_bytes
cluster.gcp_config.access_config.network_configs.subnet = subnet
cluster.rebalance_config.mode = (
managedkafka_v1.RebalanceConfig.Mode.AUTO_REBALANCE_ON_SCALE_UP
)

request = managedkafka_v1.CreateClusterRequest(
parent=client.common_location_path(project_id, region),
cluster_id=cluster_id,
cluster=cluster,
)

try:
# The duration of this operation can vary considerably, typically taking 10-40 minutes.
# We can set a timeout of 3000s (50 minutes).
operation = client.create_cluster(request=request, timeout=3000)
response = operation.result()
print("Created cluster:", response)
except GoogleAPICallError:
print(operation.operation.error)


# [END managedkafka_create_cluster]
52 changes: 52 additions & 0 deletions managedkafka/snippets/clusters/delete_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START managedkafka_delete_cluster]
from google.api_core.exceptions import GoogleAPICallError
from google.cloud import managedkafka_v1


def delete_cluster(
project_id: str,
region: str,
cluster_id: str,
) -> None:
"""
Delete a Kafka cluster.
Args:
project_id: Google Cloud project ID.
region: Cloud region.
cluster_id: ID of the Kafka cluster.
Raises:
This method will raise the exception if the operation errors or
the timeout before the operation completes is reached.
"""

client = managedkafka_v1.ManagedKafkaClient()

request = managedkafka_v1.DeleteClusterRequest(
name=client.cluster_path(project_id, region, cluster_id),
)

try:
operation = client.delete_cluster(request=request)
operation.result()
print("Deleted cluster")
except GoogleAPICallError:
print(operation.operation.error)


# [END managedkafka_delete_cluster]
46 changes: 46 additions & 0 deletions managedkafka/snippets/clusters/get_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START managedkafka_get_cluster]
from google.cloud import managedkafka_v1


def get_cluster(
project_id: str,
region: str,
cluster_id: str,
) -> managedkafka_v1.Cluster:
"""
Get a Kafka cluster.
Args:
project_id: Google Cloud project ID.
region: Cloud region.
cluster_id: ID of the Kafka cluster.
"""

client = managedkafka_v1.ManagedKafkaClient()

cluster_path = client.cluster_path(project_id, region, cluster_id)
request = managedkafka_v1.GetClusterRequest(
name=cluster_path,
)

cluster = client.get_cluster(request=request)
print("Got cluster:", cluster)

return cluster


# [END managedkafka_get_cluster]
46 changes: 46 additions & 0 deletions managedkafka/snippets/clusters/list_clusters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START managedkafka_list_clusters]
from typing import List

from google.cloud import managedkafka_v1


def list_clusters(
project_id: str,
region: str,
) -> List[str]:
"""
List Kafka clusters in a given project ID and region.
Args:
project_id: Google Cloud project ID.
region: Cloud region.
"""

client = managedkafka_v1.ManagedKafkaClient()

request = managedkafka_v1.ListClustersRequest(
parent=client.common_location_path(project_id, region),
)

response = client.list_clusters(request=request)
for cluster in response:
print("Got cluster:", cluster)

return [cluster.name for cluster in response]


# [END managedkafka_list_clusters]
Loading

0 comments on commit b97d17e

Please sign in to comment.