Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lowered backoff max retries, limited number of threads created #100

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 24 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ This tap:
- Bookmark: last_modified_date (date-time)
- Transformations: Fields camelCase to snake_case, Abstract/generalize custom_field_sets

[**custom_field_sets (GET v1)**](https://support.mambu.com/docs/custom-fields-api)
[**custom_field_sets (GET v2)**](https://api.mambu.com/#custom-field-sets-getall)
- Endpoint: https://instance.sandbox.mambu.com/api/customfieldsets
- Primary keys: id
- Foreign keys: None
Expand All @@ -100,8 +100,8 @@ This tap:
- Replication strategy: Full table (ALL for parent deposit_id)
- Transformations: Fields camelCase to snake_case

[**deposit_products (GET v1)**](https://support.mambu.com/docs/savings-products-api)
- Endpoint: https://instance.sandbox.mambu.com/api/savingsproducts/DSP
[**deposit_products (GET v2)**](https://api.mambu.com/#deposit-products-getall)
- Endpoint: https://instance.sandbox.mambu.com/api/depositproducts
- Primary keys: id
- Foreign keys: None
- Replication strategy: Incremental (query all, filter results)
Expand Down Expand Up @@ -137,7 +137,7 @@ This tap:
- Bookmark query field: lastModifiedDate
- Transformations: Fields camelCase to snake_case, Abstract/generalize custom_field_sets

[**loan_products (GET v1)**](https://support.mambu.com/docs/loan-products-api)
[**loan_products (GET v2)**](https://api.mambu.com/#loan-products-getall)
- Endpoint: https://instance.sandbox.mambu.com/api/loanproducts
- Primary keys: id
- Foreign keys: None
Expand Down Expand Up @@ -174,18 +174,19 @@ This tap:
- Bookmark: last_modified_date (date-time)
- Transformations: Fields camelCase to snake_case, Abstract/generalize custom_field_sets

[**gl_accounts (GET v1)**](https://support.mambu.com/docs/gl-accounts-api)
[**gl_accounts (GET v2)**](https://api.mambu.com/#gl-accounts-getall)
- Endpoint: https://instance.sandbox.mambu.com/api/glaccounts
- Primary keys: gl_code
- Replication strategy: Incremental (query filtered based on date and account type)
- Bookmark: last_modified_date (date-time)
- Transformations: Fields camelCase to snake_case, Abstract/generalize custom_field_sets

[**gl_journal_entries (POST v1)**](https://support.mambu.com/docs/en/gl-journal-entries-api#post-search)
- Endpoint: https://instance.sandbox.mambu.com/api/gljournalentries/search
[**gl_journal_entries (POST v2)**](https://api.mambu.com/#journal-entries-search)
- Endpoint: https://instance.sandbox.mambu.com/api/gljournalentries:search
- Primary keys: entry_id
- Replication strategy: Incremental (query filtered based on date)
- Bookmark: creation_date (date-time)
- Sort by: entry_id:ASC
- Transformations: Fields camelCase to snake_case, Abstract/generalize custom_field_sets

[**activities (GET v1)**](https://support.mambu.com/docs/activities-api)
Expand Down Expand Up @@ -225,26 +226,20 @@ This tap:
Clone this repository, and then install using setup.py. We recommend using a virtualenv:

```bash
virtualenv -p python3 venv
python3 -m venv venv
source venv/bin/activate
python setup.py install
OR
cd .../tap-mambu
pip install .
```
2. Dependent libraries
The following dependent libraries were installed.
For easier development (including testing packages):
```bash
pip install singer-python
pip install singer-tools
pip install target-stitch
pip install target-json

python3 -m venv venv
source venv/bin/activate
pip install -e ".[mambu-tests]"
```
- [singer-tools](https://github.com/singer-io/singer-tools)
- [target-stitch](https://github.com/singer-io/target-stitch)

3. Create your tap's `config.json` file. The `subdomain` is everything before `.mambu.com` in the Mambu instance URL. For the URL: `https://stitch.sandbox.mambu.com`, the subdomain would be `stitch.sandbox`. Lookback window applies only to `loan transactions` stream.

2. Create your tap's `config.json` file. The `subdomain` is everything before `.mambu.com` in the Mambu instance URL. For the URL: `https://stitch.sandbox.mambu.com`, the subdomain would be `stitch.sandbox`.

```json
{
Expand All @@ -253,7 +248,6 @@ This tap:
"apikey": "YOUR_APIKEY",
"subdomain": "YOUR_SUBDOMAIN",
"start_date": "2019-01-01T00:00:00Z",
"lookback_window": 30,
"user_agent": "tap-mambu <api_user_email@your_company.com>",
"page_size": "500",
"apikey_audit": "AUDIT_TRAIL_APIKEY"
Expand Down Expand Up @@ -295,8 +289,8 @@ This tap:
}
```

4. Run the Tap in Discovery Mode
This creates a catalog.json for selecting objects/fields to integrate:
3. Run the Tap in Discovery Mode.
This creates a catalog.json for selecting objects/ fields to integrate:
```bash
tap-mambu --config config.json --discover > catalog.json
```
Expand Down Expand Up @@ -324,7 +318,7 @@ This tap:
}
```

5. Run the Tap in Sync Mode (with catalog) and [write out to state file](https://github.com/singer-io/getting-started/blob/master/docs/RUNNING_AND_DEVELOPING.md#running-a-singer-tap-with-a-singer-target)
4. Run the Tap in Sync Mode (with catalog) and [write out to state file](https://github.com/singer-io/getting-started/blob/master/docs/RUNNING_AND_DEVELOPING.md#running-a-singer-tap-with-a-singer-target)

For Sync mode:
```bash
Expand All @@ -342,7 +336,7 @@ This tap:
tail -1 state.json > state.json.tmp && mv state.json.tmp state.json
```

6. Test the Tap
5. Test the Tap

While developing the Mambu tap, the following utilities were run in accordance with Singer.io best practices:
Pylint to improve [code quality](https://github.com/singer-io/getting-started/blob/master/docs/BEST_PRACTICES.md#code-quality):
Expand Down Expand Up @@ -392,6 +386,10 @@ This tap:
| deposit_products | 4 | 1 |
+----------------------+---------+---------+
```

6. Optional, useful tools:
- [singer-tools](https://github.com/singer-io/singer-tools)
- [target-stitch](https://github.com/singer-io/target-stitch)
---

Copyright &copy; 2019 Stitch
31 changes: 30 additions & 1 deletion mambu_tests/helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
from urllib.parse import parse_qs
from requests import Response

import pytz

Expand Down Expand Up @@ -53,6 +54,18 @@ def __init__(self, page_size=100):
self.request = MagicMock()


def generate_full_response(bookmark_field):
response = Response()

response.status_code = 200
response.headers = {'items-total': 1200}
response.json = MagicMock()
response.json.return_value = [{"id": index, bookmark_field: f"2022-06-05T00:00:00.{index:06d}Z-07:00"}
for index in range(400)]

return response


class ClientWithDataMock(ClientMock):
def __init__(self, page_size=100, bookmark_field="creationDate",
limit_field="limit", offset_field="offset", custom_data=None):
Expand All @@ -74,7 +87,23 @@ def serve_request(self, *args, **kwargs):
split_params = parse_qs(params)
limit = int(split_params.get(self.limit_field, [None])[0])
offset = int(split_params.get(self.offset_field, [None])[0])
return self.data_to_serve[offset:limit+offset]
return self.data_to_serve[offset: (limit + offset)]


class ClientWithDataMultithreadedMock(ClientWithDataMock):
def __init__(self, page_size=100, bookmark_field="creationDate",
limit_field="limit", offset_field="offset", custom_data=None):
super().__init__(page_size=page_size, bookmark_field=bookmark_field,
limit_field=limit_field, offset_field=offset_field,
custom_data=custom_data)
self._bookmark_field = bookmark_field
self.first_request_call = True

def serve_request(self, *args, **kwargs):
if self.first_request_call:
self.first_request_call = False
return generate_full_response(self._bookmark_field)
return super().serve_request(*args, **kwargs)


class MultithreadedOffsetGeneratorFake(MultithreadedOffsetGenerator):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,17 @@ def test_fetch_batch_continuously_sleep_branch(mock_time_sleep, mock_all_fetch_b
@patch.object(MultithreadedBookmarkGenerator, 'prepare_batch',
side_effect=MultithreadedBookmarkGenerator.prepare_batch,
autospec=True)
@patch("tap_mambu.tap_generators.multithreaded_bookmark_generator.MultithreadedRequestsPool.queue_request")
@patch("tap_mambu.tap_generators.multithreaded_bookmark_generator."
"MultithreadedBookmarkGenerator._queue_first_batch")
@patch("tap_mambu.tap_generators.multithreaded_bookmark_generator."
"MultithreadedBookmarkGenerator._get_number_of_records")
@patch("tap_mambu.tap_generators.multithreaded_bookmark_generator."
"MultithreadedRequestsPool.queue_request")
def test_queue_batches(mock_queue_request,
mock_get_number_of_records,
mock_queue_first_batch,
mock_prepare_batch):

mock_client = ClientMock()
mock_endpoint_path = 'test_endpoint_path'
mock_endpoint_api_method = 'POST'
Expand All @@ -123,7 +131,9 @@ def test_queue_batches(mock_queue_request,
mock_batch_limit = 4000
mock_artificial_limit = mock_client.page_size

mock_queue_request.side_effect = [Mock() for _ in range(0, mock_batch_limit, mock_artificial_limit)]
mock_get_number_of_records.return_value = mock_batch_limit + 1

mock_queue_request.side_effect = [Mock() for _ in range(mock_artificial_limit, mock_batch_limit, mock_artificial_limit)]

generator = MultithreadedBookmarkGeneratorFake(client=mock_client)
generator.overlap_window = mock_overlap_window
Expand All @@ -141,10 +151,10 @@ def test_queue_batches(mock_queue_request,

assert len(features) == mock_batch_limit // mock_artificial_limit

mock_params['offset'] = 0
mock_params['offset'] = mock_artificial_limit
mock_params['limit'] = mock_artificial_limit + mock_overlap_window
calls = []
for offset in range(0, mock_batch_limit, mock_artificial_limit):
for _ in range(mock_artificial_limit, mock_batch_limit, mock_artificial_limit):
calls.append(call(mock_client, 'test_stream', mock_endpoint_path, mock_endpoint_api_method,
mock_endpoint_api_version, mock_endpoint_api_key_type, mock_endpoint_body, dict(mock_params)))
mock_params['offset'] += mock_artificial_limit
Expand All @@ -153,8 +163,8 @@ def test_queue_batches(mock_queue_request,
mock_queue_request.assert_has_calls(calls)

# test if the methods are called the correct amount of times
assert mock_prepare_batch.call_count == mock_batch_limit / mock_client.page_size
assert mock_queue_request.call_count == mock_batch_limit / mock_client.page_size
assert mock_prepare_batch.call_count == mock_batch_limit / mock_client.page_size - 1
assert mock_queue_request.call_count == mock_batch_limit / mock_client.page_size - 1
assert generator.offset == mock_batch_limit - mock_client.page_size


Expand Down
23 changes: 17 additions & 6 deletions mambu_tests/multithreading/test_multithreaded_offset_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,15 @@ def test_all_fetch_batch_steps_flow(mock_queue_batches, mock_collect_batches, mo
@patch.object(MultithreadedOffsetGenerator, 'prepare_batch',
side_effect=MultithreadedOffsetGenerator.prepare_batch,
autospec=True)
@patch("tap_mambu.tap_generators.multithreaded_offset_generator.MultithreadedRequestsPool.queue_request")
@patch("tap_mambu.tap_generators.multithreaded_bookmark_generator."
"MultithreadedOffsetGenerator._queue_first_batch")
@patch("tap_mambu.tap_generators.multithreaded_bookmark_generator."
"MultithreadedOffsetGenerator._get_number_of_records")
@patch("tap_mambu.tap_generators.multithreaded_offset_generator."
"MultithreadedRequestsPool.queue_request")
def test_queue_batches(mock_queue_request,
mock_get_number_of_records,
mock_queue_first_batch,
mock_prepare_batch):
mock_client = ClientMock()
mock_endpoint_path = 'test_endpoint_path'
Expand All @@ -256,7 +263,10 @@ def test_queue_batches(mock_queue_request,
mock_batch_limit = 4000
mock_artificial_limit = mock_client.page_size

mock_queue_request.side_effect = [Mock() for _ in range(0, mock_batch_limit, mock_artificial_limit)]
mock_get_number_of_records.return_value = mock_batch_limit + 1

mock_queue_request.side_effect = [Mock() for _ in
range(mock_artificial_limit, mock_batch_limit, mock_artificial_limit)]

generator = MultithreadedOffsetGeneratorFake(client=mock_client)
generator.overlap_window = mock_overlap_window
Expand All @@ -280,7 +290,7 @@ def test_queue_batches(mock_queue_request,
mock_params['offset'] = 0
mock_params['limit'] = mock_artificial_limit + mock_overlap_window
calls = []
for offset in range(0, while_upper_limit):
for _ in range(1, while_upper_limit):
calls.append(call(mock_client, 'test_stream', mock_endpoint_path, mock_endpoint_api_method,
mock_endpoint_api_version, mock_endpoint_api_key_type, mock_endpoint_body, dict(mock_params)))
mock_params['offset'] += mock_artificial_limit
Expand All @@ -289,10 +299,11 @@ def test_queue_batches(mock_queue_request,
mock_queue_request.assert_has_calls(calls)

# test if the methods are called the correct amount of times
assert mock_prepare_batch.call_count == while_upper_limit
assert mock_queue_request.call_count == while_upper_limit
assert mock_prepare_batch.call_count == while_upper_limit - 1 # -1 because the first step from the while is skipped
assert mock_queue_request.call_count == while_upper_limit - 1
# used only artificial_limit because the offset is increased only by the artificial_limit value (without overlap)
assert generator.offset == while_upper_limit * mock_artificial_limit
# -1 because the queue_first_batch is mocked
assert generator.offset == (while_upper_limit - 1) * mock_artificial_limit


@patch("tap_mambu.tap_generators.multithreaded_offset_generator.transform_json")
Expand Down
7 changes: 4 additions & 3 deletions mambu_tests/multithreading/test_requests_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def test_queue_request(mock_thread_pool_exec_submit):

# test if the submit params are the same as queue_request params
mock_thread_pool_exec_submit.assert_called_once_with(MultithreadedRequestsPool.run, 'client', 'stream_name',
'stream_path', 'GET', 'v2', '', {}, {})
'stream_path', 'GET', 'v2', '', {}, {}, False)


@patch("tap_mambu.helpers.client.MambuClient.request")
Expand Down Expand Up @@ -73,15 +73,16 @@ def test_run(mock_multithreaded_request_pool_run, mock_performance_metrics, mock
mock_multithreaded_request_pool_run.assert_any_call(client, stream_name,
endpoint_paths[idx], api_methods[idx],
endpoint_api_version, endpoint_api_key_type,
endpoint_body, endpoint_params[idx])
endpoint_body, endpoint_params[idx], False)
# test if the client request method is called with the correct params
mock_client_request.assert_any_call(method=api_methods[idx],
path=endpoint_paths[idx],
version=endpoint_api_version,
apikey_type=endpoint_api_key_type,
params=f'offset={endpoint_params[idx]["offset"]}&limit={endpoint_params[idx]["limit"]}',
endpoint=stream_name,
json=endpoint_body)
json=endpoint_body,
full_response=False)

# test if the performance metrics collector is called
mock_performance_metrics.assert_called_with(metric_name='generator')
20 changes: 14 additions & 6 deletions mambu_tests/tap_generators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from mock import MagicMock

from tap_mambu.tap_generators.multithreaded_offset_generator import MultithreadedOffsetGenerator
from tap_mambu.tap_generators.child_generator import ChildGenerator
from ..constants import config_json
from tap_mambu.helpers.generator_processor_pairs import get_generator_processor_for_stream
from ..helpers import ClientWithDataMock, ClientMock
from ..helpers import ClientWithDataMock, ClientMock, ClientWithDataMultithreadedMock

MULTITHREADED_STREAMS_WITHOUT_PAGINATION_DETAILS_ON = ['activities', 'interest_accrual_breakdown']


def setup_generator_base_test(stream_name, client_mock=None, with_data=False, custom_data=None,
Expand All @@ -15,9 +16,16 @@ def setup_generator_base_test(stream_name, client_mock=None, with_data=False, cu
if client_mock is None:
client = ClientMock(int(config_json.get("page_size", 200)))
if with_data:
client = ClientWithDataMock(int(config_json.get("page_size", 200)), custom_data=custom_data,
offset_field=offset_field, limit_field=limit_field,
bookmark_field=bookmark_field)
if issubclass(generator_class, MultithreadedOffsetGenerator) and \
stream_name not in MULTITHREADED_STREAMS_WITHOUT_PAGINATION_DETAILS_ON:
client = ClientWithDataMultithreadedMock(int(config_json.get("page_size", 200)),
custom_data=custom_data,
offset_field=offset_field, limit_field=limit_field,
bookmark_field=bookmark_field)
else:
client = ClientWithDataMock(int(config_json.get("page_size", 200)), custom_data=custom_data,
offset_field=offset_field, limit_field=limit_field,
bookmark_field=bookmark_field)

generator = generator_class(stream_name=stream_name,
client=client,
Expand Down
1 change: 0 additions & 1 deletion mambu_tests/tap_generators/test_activities_generator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import mock
from singer import utils
from . import setup_generator_base_test

Expand Down
Loading