Skip to content

Commit

Permalink
Refactor for log streaming query params (#7480)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiec-msft committed Apr 9, 2024
1 parent f0f14b3 commit 5a1a733
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 29 deletions.
23 changes: 23 additions & 0 deletions src/spring/azext_spring/log_stream/log_stream_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,35 @@
from collections import defaultdict
from knack.log import get_logger
from knack.util import CLIError
from six.moves.urllib import parse

from .writer import DefaultWriter


logger = get_logger(__name__)


class LogStreamBaseQueryOptions:
def __init__(self, follow, lines, since, limit):
self.follow = follow
self.lines = lines
self.since = since
self.limit = limit


def attach_logs_query_options(url, queryOptions: LogStreamBaseQueryOptions):
params = {}
params["tailLines"] = queryOptions.lines
params["limitBytes"] = queryOptions.limit
if queryOptions.since:
params["sinceSeconds"] = queryOptions.since
if queryOptions.follow:
params["follow"] = True

url += "?{}".format(parse.urlencode(params)) if params else ""
return url


# pylint: disable=bare-except, too-many-statements
def iter_lines(response, limit=2 ** 20, chunk_size=None):
'''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
from azure.cli.core.commands.client_factory import get_subscription_id
from knack.log import get_logger
from knack.util import CLIError
from six.moves.urllib import parse
from threading import Thread
from time import sleep

from .managed_component import (Acs, Flux, Scg, ScgOperator,
ManagedComponentInstance, supported_components, get_component)

from ..log_stream.writer import (DefaultWriter, PrefixWriter)
from ..log_stream.log_stream_operations import log_stream_from_url
from ..log_stream.log_stream_operations import (attach_logs_query_options, log_stream_from_url,
LogStreamBaseQueryOptions)
from ..log_stream.log_stream_validators import validate_thread_number
from .._utils import (get_bearer_auth, get_hostname)

Expand All @@ -31,21 +31,13 @@ def __init__(self, component, instance):
self.instance = instance


class QueryOptions:
def __init__(self, follow, lines, since, limit):
self.follow = follow
self.lines = lines
self.since = since
self.limit = limit


def managed_component_logs(cmd, client, resource_group, service,
name=None, all_instances=None, instance=None,
follow=None, max_log_requests=5, lines=50, since=None, limit=2048):
auth = get_bearer_auth(cmd.cli_ctx)
exceptions = []
threads = None
queryOptions = QueryOptions(follow=follow, lines=lines, since=since, limit=limit)
queryOptions = LogStreamBaseQueryOptions(follow=follow, lines=lines, since=since, limit=limit)
if not name and instance:
threads = _get_log_threads_without_component(cmd, client, resource_group, service,
instance, auth, exceptions, queryOptions)
Expand Down Expand Up @@ -89,7 +81,7 @@ def _get_component(component):


def _get_log_stream_urls(cmd, client, resource_group, service, component_name,
all_instances, instance, queryOptions: QueryOptions):
all_instances, instance, queryOptions: LogStreamBaseQueryOptions):
component_api_name = _get_component(component_name).get_api_name()
hostname = get_hostname(cmd.cli_ctx, client, resource_group, service)
url_dict = {}
Expand Down Expand Up @@ -128,10 +120,10 @@ def _get_log_stream_urls(cmd, client, resource_group, service, component_name,
return url_dict


def _get_stream_url(hostname, component_name, instance_name, queryOptions: QueryOptions):
def _get_stream_url(hostname, component_name, instance_name, queryOptions: LogStreamBaseQueryOptions):
url_template = "https://{}/api/logstream/managedComponents/{}/instances/{}"
url = url_template.format(hostname, component_name, instance_name)
url = _attach_logs_query_options(url, queryOptions)
url = attach_logs_query_options(url, queryOptions)
return url


Expand Down Expand Up @@ -174,28 +166,16 @@ def _sequential_start_threads(threads: [Thread]):
# so that ctrl+c can stop the command


def _get_log_threads_without_component(cmd, client, resource_group, service, instance_name, auth, exceptions, queryOptions: QueryOptions):
def _get_log_threads_without_component(cmd, client, resource_group, service, instance_name, auth, exceptions,
queryOptions: LogStreamBaseQueryOptions):
hostname = get_hostname(cmd.cli_ctx, client, resource_group, service)
url_template = "https://{}/api/logstream/managedComponentInstances/{}"
url = url_template.format(hostname, instance_name)
url = _attach_logs_query_options(url, queryOptions)
url = attach_logs_query_options(url, queryOptions)

return [Thread(target=log_stream_from_url, args=(url, auth, None, exceptions, _get_default_writer()))]


def _attach_logs_query_options(url, queryOptions: QueryOptions):
params = {}
params["tailLines"] = queryOptions.lines
params["limitBytes"] = queryOptions.limit
if queryOptions.since:
params["sinceSeconds"] = queryOptions.since
if queryOptions.follow:
params["follow"] = True

url += "?{}".format(parse.urlencode(params)) if params else ""
return url


def _get_prefix_writer(prefix):
"""
Define this method, so that we can mock this method in scenario test to test output
Expand Down

0 comments on commit 5a1a733

Please sign in to comment.