Skip to content

Commit

Permalink
TDL-23384: Add retry for chunked encoding errors (#61)
Browse files Browse the repository at this point in the history
* add retry for chunked encoding error

* Add exponential backoff to `request_export`

* Remove backoff because this is a generator

Throwing an exception in the generator seems to just raise a
`StopIteration`

* Add retry logic to the caller of `client.request_export`

* Add a test for retrying on `ChunkedEncodingError`s

* Bump to `v1.5.1`, update changelog

---------

Co-authored-by: Leslie VanDeMark <[email protected]>
  • Loading branch information
luandy64 and leslievandemark authored Aug 2, 2023
1 parent 6403ea4 commit de02287
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 3 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 1.5.1
* Add retry logic for `ChunkedEncodingError`s [#61](https://github.com/singer-io/tap-mixpanel/pull/61)

## 1.5.0
* Adds `export_events` as optional param to filter the data for export stream based on event names [#56](https://github.com/singer-io/tap-mixpanel/pull/56)

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from setuptools import setup, find_packages

setup(name='tap-mixpanel',
version='1.5.0',
version='1.5.1',
description='Singer.io tap for extracting data from the mixpanel API',
author='[email protected]',
classifiers=['Programming Language :: Python :: 3 :: Only'],
Expand Down
4 changes: 2 additions & 2 deletions tap_mixpanel/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import jsonlines
import requests
import singer
from requests.exceptions import ConnectionError, Timeout
from requests.exceptions import ChunkedEncodingError, ConnectionError, Timeout
from requests.models import ProtocolError
from singer import metrics

Expand Down Expand Up @@ -204,7 +204,7 @@ def check_access(self):

@backoff.on_exception(
backoff.expo,
(Server5xxError, Server429Error, ReadTimeoutError, ConnectionError, Timeout, ProtocolError),
(Server5xxError, Server429Error, ReadTimeoutError, ConnectionError, Timeout, ProtocolError, ChunkedEncodingError),
max_tries=BACKOFF_MAX_TRIES_REQUEST,
factor=3,
logger=LOGGER,
Expand Down
9 changes: 9 additions & 0 deletions tap_mixpanel/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import urllib
import pytz
import requests
import backoff
import singer
from singer import Transformer, metadata, metrics, utils
from singer.utils import strptime_to_utc
Expand Down Expand Up @@ -695,6 +697,13 @@ class Export(MixPanel):
replication_method = "INCREMENTAL"
params = {}


@backoff.on_exception(
backoff.expo,
(requests.exceptions.ChunkedEncodingError,),
max_tries=5,
factor=2,
)
def get_and_transform_records(
self,
querystring,
Expand Down
39 changes: 39 additions & 0 deletions tests/unittests/test_error_handling.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import unittest
import requests

import jsonlines
from unittest import mock
from parameterized import parameterized

from tap_mixpanel import client
from tap_mixpanel import streams

# Mock response
REQUEST_TIMEOUT = 300
Expand Down Expand Up @@ -249,3 +251,40 @@ def test_check_access_handle_timeout_error(self, mock_request, mock_time):

# Verify that requests.Session.request is called 5 times
self.assertEqual(mock_request.call_count, 5)

@mock.patch("jsonlines.jsonlines.Reader.iter", side_effect=requests.exceptions.ChunkedEncodingError)
def test_ChunkedEncodingError(self, mock_jsonlines, mock_time):
"""
Check whether the request backoffs properly for `check_access` method for 5 times in case of Timeout error.
"""
mock_client = client.MixpanelClient(api_secret="mock_api_secret", api_domain="mock_api_domain", request_timeout=REQUEST_TIMEOUT)
mock_client._MixpanelClient__verified = True

fake_response = MockResponse(500)
fake_response.iter_lines = lambda : []
mock_client.perform_request = lambda *args, **kwargs: fake_response

stream = streams.Export(mock_client)

with self.assertRaises(requests.exceptions.ChunkedEncodingError) as error:
stream.get_and_transform_records(
querystring={},
project_timezone=None,
max_bookmark_value=None,
state=None,
config=None,
catalog=None,
selected_streams=None,
last_datetime=None,
endpoint_total=None,
limit=None,
total_records=None,
parent_total=None,
record_count=None,
page=None,
offset=None,
parent_record=None,
date_total=None,
)

self.assertEqual(mock_jsonlines.call_count, 5)

0 comments on commit de02287

Please sign in to comment.