Skip to content

Commit

Permalink
Pclouds 3653 azure log forwarder gzip compression when sending to dt (#…
Browse files Browse the repository at this point in the history
…94)

added gzip
  • Loading branch information
szymon-nagel-dt authored Feb 7, 2024
1 parent f9fbd3b commit 1847204
Showing 1 changed file with 19 additions and 6 deletions.
25 changes: 19 additions & 6 deletions logs_ingest/dynatrace_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import gzip
import json
import os
import ssl
Expand All @@ -28,6 +29,7 @@
from . import logging

should_verify_ssl_certificate = os.environ.get("REQUIRE_VALID_CERTIFICATE", "True") in ["True", "true"]

number_of_concurrent_send_calls = get_int_environment_value("NUMBER_OF_CONCURRENT_SEND_CALLS", 2)
ssl_context = ssl.create_default_context()
if not should_verify_ssl_certificate:
Expand Down Expand Up @@ -79,19 +81,29 @@ async def process_batch(batch: LogBatch):
async def _send_logs(session, dynatrace_token, encoded_body_bytes, log_ingest_url, self_monitoring):
self_monitoring.all_requests += 1
is_request_successful = False
headers = {
"Authorization": f"Api-Token {dynatrace_token}",
"Content-Type": "application/json; charset=utf-8",
"Content-Encoding": "gzip"
}

encoded_body_bytes = gzip.compress(encoded_body_bytes, compresslevel=6)
compressed_size_kb = len(encoded_body_bytes) / 1024.0

logging.info(f'Log ingest payload size compressed: {compressed_size_kb} kB')

status, reason, response = await _perform_http_request(
session,
method="POST",
url=log_ingest_url,
headers=headers,
encoded_body_bytes=encoded_body_bytes,
headers={
"Authorization": f"Api-Token {dynatrace_token}",
"Content-Type": "application/json; charset=utf-8"
}
)
if status > 299:
logging.error(f'Log ingest error: {status}, reason: {reason}, url: {log_ingest_url}, body: "{response}"',
"log-ingest-error")
logging.error(
f'Log ingest error: {status}, reason: {reason}, url: {log_ingest_url}, body: "{response}"',
"log-ingest-error",
)
if status == 400:
self_monitoring.dynatrace_connectivities.append(DynatraceConnectivity.InvalidInput)
elif status == 401:
Expand All @@ -113,6 +125,7 @@ async def _send_logs(session, dynatrace_token, encoded_body_bytes, log_ingest_ur
return is_request_successful



async def _perform_http_request(session, method, url, encoded_body_bytes, headers) -> Tuple[int, str, str]:
timeout = aiohttp.ClientTimeout(total=10)
async with session.request(method, url, headers=headers, data=encoded_body_bytes, ssl=ssl_context, timeout=timeout) as response:
Expand Down

0 comments on commit 1847204

Please sign in to comment.