From 6506a0b86b8c1843053369785201c09287ce2e86 Mon Sep 17 00:00:00 2001 From: Siddharth Sudheer Date: Thu, 28 Mar 2019 17:07:59 -0400 Subject: [PATCH 1/6] =?UTF-8?q?=E2=80=A2=20Added=20ability=20to=20use=20ta?= =?UTF-8?q?p=20for=20private=20Shopify=20app=20and=20upgraded=20version=20?= =?UTF-8?q?to=201.1.9.=20=E2=80=A2=20Added=20async=20capability=20for=20or?= =?UTF-8?q?ders=20and=20upgraded=20version=20to=201.2.0.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 6 ++ README.md | 16 +++- install_local.sh | 42 ++++++++++ sample_config.json | 7 +- setup.py | 4 +- tap_shopify/__init__.py | 21 ++++- tap_shopify/context.py | 0 tap_shopify/schemas/abandoned_checkouts.json | 0 tap_shopify/schemas/collects.json | 0 tap_shopify/schemas/custom_collections.json | 0 tap_shopify/schemas/customers.json | 0 tap_shopify/schemas/definitions.json | 0 tap_shopify/schemas/metafields.json | 0 tap_shopify/schemas/order_refunds.json | 0 tap_shopify/schemas/orders.json | 0 tap_shopify/schemas/products.json | 0 tap_shopify/schemas/transactions.json | 0 tap_shopify/streams/__init__.py | 0 tap_shopify/streams/abandoned_checkouts.py | 0 tap_shopify/streams/base.py | 40 ++++++++- tap_shopify/streams/collects.py | 0 tap_shopify/streams/custom_collections.py | 0 tap_shopify/streams/customers.py | 0 tap_shopify/streams/metafields.py | 0 tap_shopify/streams/order_refunds.py | 2 + tap_shopify/streams/orders.py | 86 +++++++++++++++++++- tap_shopify/streams/products.py | 0 tap_shopify/streams/transactions.py | 0 28 files changed, 210 insertions(+), 14 deletions(-) create mode 100755 install_local.sh mode change 100644 => 100755 tap_shopify/__init__.py mode change 100644 => 100755 tap_shopify/context.py mode change 100644 => 100755 tap_shopify/schemas/abandoned_checkouts.json mode change 100644 => 100755 tap_shopify/schemas/collects.json mode change 100644 => 100755 tap_shopify/schemas/custom_collections.json mode change 100644 => 100755 tap_shopify/schemas/customers.json mode change 100644 => 100755 tap_shopify/schemas/definitions.json mode change 100644 => 100755 tap_shopify/schemas/metafields.json mode change 100644 => 100755 tap_shopify/schemas/order_refunds.json mode change 100644 => 100755 tap_shopify/schemas/orders.json mode change 100644 => 100755 tap_shopify/schemas/products.json mode change 100644 => 100755 tap_shopify/schemas/transactions.json mode change 100644 => 100755 tap_shopify/streams/__init__.py mode change 100644 => 100755 tap_shopify/streams/abandoned_checkouts.py mode change 100644 => 100755 tap_shopify/streams/base.py mode change 100644 => 100755 tap_shopify/streams/collects.py mode change 100644 => 100755 tap_shopify/streams/custom_collections.py mode change 100644 => 100755 tap_shopify/streams/customers.py mode change 100644 => 100755 tap_shopify/streams/metafields.py mode change 100644 => 100755 tap_shopify/streams/order_refunds.py mode change 100644 => 100755 tap_shopify/streams/orders.py mode change 100644 => 100755 tap_shopify/streams/products.py mode change 100644 => 100755 tap_shopify/streams/transactions.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e65e1bc..58d86bef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## 1.2.0 + * Updated Tap to get orders asynchronously, using asyncio and aiohttp libraries. To use Async method can be configured in the tap-config. + +## 1.1.9 + * Updated Tap to allow for Private Shopify App which can be configured in the tap-config. + ## 1.1.8 * Uses patternProperties to match extra fields on transactions receipts [#33](https://github.com/singer-io/tap-shopify/pull/33) diff --git a/README.md b/README.md index 46a9223d..e2fab21c 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,10 @@ # tap-shopify +## Source Repository +* Git Repo: https://github.com/singer-io/tap-bing-ads +* To Sync Fork: https://help.github.com/articles/syncing-a-fork/ + +## Information This is a [Singer](https://singer.io) tap that produces JSON-formatted data following the [Singer spec](https://github.com/singer-io/getting-started/blob/master/SPEC.md). @@ -21,7 +26,6 @@ This tap: - When Metafields are selected, this tap will sync the Shopify store's top-level Metafields and any additional Metafields for selected tables that also have them (ie: Orders, Products, Customers) ## Quick Start - 1. Install pip install tap-shopify @@ -32,9 +36,13 @@ This tap: ```json { - "start_date": "2010-01-01", - "api_key": "", - "shop": "test_shop" + "start_date": "2019-01-01T00:00:00Z", + "end_date": "2019-01-31T00:00:00Z", + "shop": "test_shop", + "is_private_app": true, + "api_key": "<>", + "api_password": "<>", + "use_async": true } ``` diff --git a/install_local.sh b/install_local.sh new file mode 100755 index 00000000..74554f04 --- /dev/null +++ b/install_local.sh @@ -0,0 +1,42 @@ +#! /bin/bash +get_value_from_file() { + key=$1 + file=$2 + + val=( $(cat $file | grep -Eo "${key}=.{1,50}" | sed "s/^${key}=\"\(.*\)\",$/\1/") ) + echo $val +} + + + +GREEN='\033[0;32m' +NC='\033[0m' # No Color +YELLOW='\033[0;33m' +RED='\033[0;31m' + + + +lib_version=( $(get_value_from_file 'version' setup.py) ) +lib_name=( $(get_value_from_file 'name' setup.py) ) + + + +python3 setup.py sdist bdist_wheel +pip uninstall --y $lib_name +if pip install ./dist/${lib_name}-${lib_version}.tar.gz ; then + rm -rf ./build + rm -rf ./dist + rm -rf ./*.egg-info + echo + echo -e "${GREEN}##########################################################${NC}" + echo -e "${GREEN} Successfully Installed: ${YELLOW}${lib_name}-${lib_version}${NC}" + echo -e "${GREEN}##########################################################${NC}" + echo +else + echo + echo -e "${RED}##########################################################${NC}" + echo -e "${RED} Failed to Install: ${YELLOW}${lib_name}-${lib_version}${NC}" + echo -e "${RED}##########################################################${NC}" + echo +fi + diff --git a/sample_config.json b/sample_config.json index 6eaca4de..58f13d02 100644 --- a/sample_config.json +++ b/sample_config.json @@ -1,4 +1,9 @@ { + "start_date": "2019-01-01T00:00:00Z", + "end_date": "2019-01-31T00:00:00Z", + "shop": "someshop", + "is_private_app": true, "api_key": "foobarcharnockbat", - "start_date": "2017-01-01T00:00:00Z" + "api_password": "batnockcharbarfoo", + "use_async": true } diff --git a/setup.py b/setup.py index 37ce2805..97f749b4 100755 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ setup( name="tap-shopify", - version="1.1.8", + version="1.2.0", description="Singer.io tap for extracting Shopify data", author="Stitch", url="http://github.com/singer-io/tap-shopify", @@ -18,6 +18,8 @@ 'pylint', 'ipdb', 'requests==2.20.0', + 'aiohttp==3.5.4', + 'asyncio==3.4.3' ] }, entry_points=""" diff --git a/tap_shopify/__init__.py b/tap_shopify/__init__.py old mode 100644 new mode 100755 index 0a6f4506..7b9367f8 --- a/tap_shopify/__init__.py +++ b/tap_shopify/__init__.py @@ -14,14 +14,23 @@ from tap_shopify.context import Context import tap_shopify.streams # Load stream objects into Context -REQUIRED_CONFIG_KEYS = ["shop", "api_key"] +REQUIRED_CONFIG_KEYS = ["start_date", "shop", "api_key"] + LOGGER = singer.get_logger() def initialize_shopify_client(): api_key = Context.config['api_key'] shop = Context.config['shop'] - session = shopify.Session(shop, api_key) - shopify.ShopifyResource.activate_session(session) + + if Context.config.get('is_private_app', False): + api_password = Context.config['api_password'] + shop_url = "https://{}:{}@{}.myshopify.com/admin".format(api_key, api_password, shop) + shopify.ShopifyResource.set_site(shop_url) + shopify.Shop.current() + else: + session = shopify.Session(shop, api_key) + shopify.ShopifyResource.activate_session(session) + def get_abs_path(path): return os.path.join(os.path.dirname(os.path.realpath(__file__)), path) @@ -163,6 +172,12 @@ def main(): # Parse command line arguments args = utils.parse_args(REQUIRED_CONFIG_KEYS) + # Check if Private App and has required Password in Config + if Context.config.get('is_private_app', False) and 'api_password' not in Context.config: + error_message = "Config is missing required key 'api_password'. " + error_message += "If the 'is_private_key' is set to True, 'api_password' is a required Config key." + raise Exception(error_message) + # If discover flag was passed, run discovery mode and dump output to stdout if args.discover: catalog = discover() diff --git a/tap_shopify/context.py b/tap_shopify/context.py old mode 100644 new mode 100755 diff --git a/tap_shopify/schemas/abandoned_checkouts.json b/tap_shopify/schemas/abandoned_checkouts.json old mode 100644 new mode 100755 diff --git a/tap_shopify/schemas/collects.json b/tap_shopify/schemas/collects.json old mode 100644 new mode 100755 diff --git a/tap_shopify/schemas/custom_collections.json b/tap_shopify/schemas/custom_collections.json old mode 100644 new mode 100755 diff --git a/tap_shopify/schemas/customers.json b/tap_shopify/schemas/customers.json old mode 100644 new mode 100755 diff --git a/tap_shopify/schemas/definitions.json b/tap_shopify/schemas/definitions.json old mode 100644 new mode 100755 diff --git a/tap_shopify/schemas/metafields.json b/tap_shopify/schemas/metafields.json old mode 100644 new mode 100755 diff --git a/tap_shopify/schemas/order_refunds.json b/tap_shopify/schemas/order_refunds.json old mode 100644 new mode 100755 diff --git a/tap_shopify/schemas/orders.json b/tap_shopify/schemas/orders.json old mode 100644 new mode 100755 diff --git a/tap_shopify/schemas/products.json b/tap_shopify/schemas/products.json old mode 100644 new mode 100755 diff --git a/tap_shopify/schemas/transactions.json b/tap_shopify/schemas/transactions.json old mode 100644 new mode 100755 diff --git a/tap_shopify/streams/__init__.py b/tap_shopify/streams/__init__.py old mode 100644 new mode 100755 diff --git a/tap_shopify/streams/abandoned_checkouts.py b/tap_shopify/streams/abandoned_checkouts.py old mode 100644 new mode 100755 diff --git a/tap_shopify/streams/base.py b/tap_shopify/streams/base.py old mode 100644 new mode 100755 index e4ba2433..0547617a --- a/tap_shopify/streams/base.py +++ b/tap_shopify/streams/base.py @@ -82,6 +82,9 @@ class Stream(): key_properties = ['id'] # Controls which SDK object we use to call the API by default. replication_object = None + replication_object_async = None + endpoint = None + result_key = None def get_bookmark(self): bookmark = (singer.get_bookmark(Context.state, @@ -120,8 +123,8 @@ def call_api(self, query_params): def get_objects(self): updated_at_min = self.get_bookmark() - - stop_time = singer.utils.now().replace(microsecond=0) + end_date = Context.config.get("end_date", None) + stop_time = utils.strptime_with_tz(end_date) if end_date is not None else singer.utils.now().replace(microsecond=0) date_window_size = float(Context.config.get("date_window_size", DATE_WINDOW_SIZE)) results_per_page = int(Context.config.get("results_per_page", RESULTS_PER_PAGE)) @@ -182,11 +185,40 @@ def get_objects(self): updated_at_min = updated_at_max + def get_objects_async(self): + updated_at_min = self.get_bookmark() + end_date = Context.config.get("end_date", None) + updated_at_max = utils.strptime_with_tz(end_date) if end_date is not None else singer.utils.now().replace(microsecond=0) + results_per_page = int(Context.config.get("results_per_page", RESULTS_PER_PAGE)) + + DT_FMT = '%Y-%m-%dT%H:%M:%S' + query_params = { + "updated_at_min": utils.strftime(updated_at_min, format_str=DT_FMT), + "updated_at_max": utils.strftime(updated_at_max, format_str=DT_FMT), + "status": "any" + } + + objects = self.replication_object_async.find( + endpoint = self.endpoint, + retry_limit = MAX_RETRIES, + results_per_page = results_per_page, + **query_params + ) + + for obj in objects: + yield obj + + self.update_bookmark(utils.strftime(updated_at_max)) + def sync(self): """Yield's processed SDK object dicts to the caller. This is the default implementation. Get's all of self's objects and calls to_dict on them with no further processing. """ - for obj in self.get_objects(): - yield obj.to_dict() + if self.name in ['orders'] and Context.config.get("use_async", False): + for obj in self.get_objects_async(): + yield obj + else: + for obj in self.get_objects(): + yield obj.to_dict() diff --git a/tap_shopify/streams/collects.py b/tap_shopify/streams/collects.py old mode 100644 new mode 100755 diff --git a/tap_shopify/streams/custom_collections.py b/tap_shopify/streams/custom_collections.py old mode 100644 new mode 100755 diff --git a/tap_shopify/streams/customers.py b/tap_shopify/streams/customers.py old mode 100644 new mode 100755 diff --git a/tap_shopify/streams/metafields.py b/tap_shopify/streams/metafields.py old mode 100644 new mode 100755 diff --git a/tap_shopify/streams/order_refunds.py b/tap_shopify/streams/order_refunds.py old mode 100644 new mode 100755 index 6a30e151..cf85d2cf --- a/tap_shopify/streams/order_refunds.py +++ b/tap_shopify/streams/order_refunds.py @@ -23,6 +23,8 @@ def get_refunds(self, parent_object, since_id): def get_objects(self): selected_parent = Context.stream_objects['orders']() selected_parent.name = "refund_orders" + selected_parent.endpoint = "" + selected_parent.result_key = "refunds" # Page through all `orders`, bookmarking at `refund_orders` for parent_object in selected_parent.get_objects(): diff --git a/tap_shopify/streams/orders.py b/tap_shopify/streams/orders.py old mode 100644 new mode 100755 index bb360204..2f727ecb --- a/tap_shopify/streams/orders.py +++ b/tap_shopify/streams/orders.py @@ -1,10 +1,94 @@ import shopify - from tap_shopify.context import Context from tap_shopify.streams.base import Stream +import singer +from singer import utils +import asyncio +import aiohttp +import math +from urllib.parse import urlencode + + +LOGGER = singer.get_logger() + +class OrderAsync(shopify.Order): + def __init__(self, endpoint, retry_limit, results_per_page, params): + super() + self.retry_limit = retry_limit + self.results_per_page = results_per_page + self.params = params + self.params['limit'] = str(self.results_per_page) + self.base_url = shopify.ShopifyResource.get_site() + self.current_shop = shopify.Shop.current() + self.bucket_size = 80 if self.current_shop.plan_name == "shopify_plus" else 40 + self.shop_display_url = "https://{}".format(self.current_shop.myshopify_domain) + self.endpoint = endpoint + + async def _get_async(self, url, headers=None, params=None, retry_attempt=0): + async with aiohttp.ClientSession() as session: + async with session.get(url=url, headers=headers, params=params) as response: + if response.status == 429: + sleep_time_str = math.floor(float(response.headers.get('Retry-After'))) + LOGGER.info("Received 429 -- sleeping for {} seconds".format(sleep_time_str)) + await asyncio.sleep(sleep_time_str) + return await self._get_async(url, headers, params, retry_attempt=retry_attempt+1) + if response.status in range(500, 599): + return await self._get_async(url, headers, params, retry_attempt=retry_attempt+1) + else: + resp = await response.json() + if 'page' in params: + resp['page'] = params['page'] + return resp + + async def _request_count(self): + endpoint = '{}/count.json'.format(self.endpoint) + url = self.base_url + endpoint + resp = await self._get_async(url, params=self.params) + LOGGER.info("GET {}{}?{}".format(self.shop_display_url, endpoint, urlencode(self.params))) + return resp['count'] + + async def _request(self, job): + endpoint = '{}.json'.format(self.endpoint) + url = self.base_url + endpoint + resp = await self._get_async(url, params=job['params']) + LOGGER.info("GET {}{}?{}".format(self.shop_display_url, endpoint, urlencode(job['params']))) + return resp + + async def _runner(self): + result_set_size = await self._request_count() + num_pages = math.ceil(result_set_size/self.results_per_page) + jobs = [{'params': { **self.params, 'page': p+1 }} for p in range(num_pages)] + chunked_jobs = utils.chunk(jobs, self.bucket_size) + + all_jobs_results_dict = {} + for chunk_of_jobs in chunked_jobs: + futures = [self._request(i) for i in chunk_of_jobs] + resp_dict = {} + for i, future in enumerate(asyncio.as_completed(futures)): + result = await future + orders = result['orders'] if 'orders' in result else [] + resp_dict[result['page']] = orders + all_jobs_results_dict = {**all_jobs_results_dict, **resp_dict} + ordered_results = [] + for k,v in sorted(all_jobs_results_dict.items()): + if len(v) > 0: + ordered_results += v + return ordered_results + + def Run(self): + result = asyncio.run(self._runner()) + return result + + @classmethod + def find(cls, endpoint, retry_limit=5, results_per_page=250, **kwargs): + return OrderAsync(endpoint, retry_limit, results_per_page, params=kwargs).Run() + class Orders(Stream): name = 'orders' replication_object = shopify.Order + replication_object_async = OrderAsync + endpoint = "/orders" + result_key = "orders" Context.stream_objects['orders'] = Orders diff --git a/tap_shopify/streams/products.py b/tap_shopify/streams/products.py old mode 100644 new mode 100755 diff --git a/tap_shopify/streams/transactions.py b/tap_shopify/streams/transactions.py old mode 100644 new mode 100755 From a9d0468c4a9af82ee8b7a3bae00c159260985013 Mon Sep 17 00:00:00 2001 From: Siddharth Sudheer Date: Thu, 28 Mar 2019 18:21:10 -0400 Subject: [PATCH 2/6] added async thought process to readme --- README.md | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/README.md b/README.md index e2fab21c..27a6edd0 100644 --- a/README.md +++ b/README.md @@ -64,6 +64,32 @@ This tap: tap-shopify -c config.json --catalog catalog-file.json +## Performance + +### Shopify Constraints +Shopify API's throttle is designed to allow your app to make unlimited requests at a steady rate over time while also having the capacity to make infrequent bursts. The throttle operates using a [leaky bucket](https://en.wikipedia.org/wiki/Leaky_bucket) algorithm. The bucket size and leak rate properties determine the API's burst behavior and call rate. + +| | Bucket Size | Leak Rate | Max. Results/Call | +| --------------- |------------:| ----------:| ------------------:| +| Shopify Regular | 40 | 2/second | 250 | +| Shopify Plus | 80 | 4/second | 250 | + +### Async Logic +1. I found that using Shopify's SDK is slower by almost a factor of 10 compared to calling the REST endpoints. + * So, we scrap that, and just make calls directly to the REST endpoints. +2. Check if Shop is Regular or Plus to determine Bucket Size. +3. Check the total number of orders for the date range you are pulling data for. +4. Num. Pages = Total Number of Orders/250 +5. Each call you can get one page and you are allowed to make 40 calls (or 80 calls). + * Num. orders you can retrieve by making all 40 calls (or 80 calls) asynchronously = 10,000 (or 20,000) + +6. Say, total number of orders = 100,000. + * Then, Num. Pages = 100,000/250 = 400 + * We cannot make 400 calls asynchronously, so we chunk them into 40 calls each, which gives us a list A that contains nested lists of 40 calls. + * We iterate through list A, and using `asyncio` and `aiohttp` we make 40 asynchronous calls, retrieve the results. Then, move onto the next 40 calls. + * Then, finally return all the results by page order. + + --- Copyright © 2019 Stitch From c6287207bdbf4589adf855afdfadcb4feff22d30 Mon Sep 17 00:00:00 2001 From: Siddharth Sudheer Date: Thu, 28 Mar 2019 19:41:52 -0400 Subject: [PATCH 3/6] Changed OrderAsync to RunAsync, and enabled async for couple other streams. --- .gitignore | 1 + tap_shopify/streams/abandoned_checkouts.py | 5 +- tap_shopify/streams/base.py | 83 ++++++++++++++++++++- tap_shopify/streams/customers.py | 6 +- tap_shopify/streams/order_refunds.py | 20 +++-- tap_shopify/streams/orders.py | 85 +--------------------- tap_shopify/streams/products.py | 6 +- 7 files changed, 112 insertions(+), 94 deletions(-) diff --git a/.gitignore b/.gitignore index a6551241..64781e3f 100644 --- a/.gitignore +++ b/.gitignore @@ -102,3 +102,4 @@ rsa-key tags singer-check-tap-data state.json +publish_to_artifactory.sh \ No newline at end of file diff --git a/tap_shopify/streams/abandoned_checkouts.py b/tap_shopify/streams/abandoned_checkouts.py index e3887f37..1acc69f6 100755 --- a/tap_shopify/streams/abandoned_checkouts.py +++ b/tap_shopify/streams/abandoned_checkouts.py @@ -1,9 +1,12 @@ import shopify from tap_shopify.context import Context -from tap_shopify.streams.base import Stream +from tap_shopify.streams.base import Stream, RunAsync class AbandonedCheckouts(Stream): name = 'abandoned_checkouts' replication_object = shopify.Checkout + replication_object_async = RunAsync + endpoint = "/checkouts" + result_key = "checkouts" Context.stream_objects['abandoned_checkouts'] = AbandonedCheckouts diff --git a/tap_shopify/streams/base.py b/tap_shopify/streams/base.py index 0547617a..71213e5b 100755 --- a/tap_shopify/streams/base.py +++ b/tap_shopify/streams/base.py @@ -9,6 +9,10 @@ import singer from singer import utils from tap_shopify.context import Context +import asyncio +import aiohttp +import shopify +from urllib.parse import urlencode LOGGER = singer.get_logger() @@ -21,6 +25,9 @@ # We will retry a 500 error a maximum of 5 times before giving up MAX_RETRIES = 5 +# Streams that can run Async +ASYNC_AVAILABLE_STREAMS = ['orders', 'products', 'customers', 'abandoned_checkouts'] + def is_not_status_code_fn(status_code): def gen_fn(exc): if getattr(exc, 'code', None) and exc.code not in status_code: @@ -200,6 +207,7 @@ def get_objects_async(self): objects = self.replication_object_async.find( endpoint = self.endpoint, + result_key = self.result_key, retry_limit = MAX_RETRIES, results_per_page = results_per_page, **query_params @@ -216,9 +224,82 @@ def sync(self): This is the default implementation. Get's all of self's objects and calls to_dict on them with no further processing. """ - if self.name in ['orders'] and Context.config.get("use_async", False): + if Context.config.get("use_async", False) and self.name in ASYNC_AVAILABLE_STREAMS: for obj in self.get_objects_async(): yield obj else: for obj in self.get_objects(): yield obj.to_dict() + + +class RunAsync(): + def __init__(self, endpoint, result_key, retry_limit, results_per_page, params): + super() + self.retry_limit = retry_limit + self.results_per_page = results_per_page + self.params = params + self.params['limit'] = str(self.results_per_page) + self.base_url = shopify.ShopifyResource.get_site() + self.current_shop = shopify.Shop.current() + self.bucket_size = 80 if self.current_shop.plan_name == "shopify_plus" else 40 + self.shop_display_url = "https://{}".format(self.current_shop.myshopify_domain) + self.endpoint = endpoint + self.result_key = result_key + + async def _get_async(self, url, headers=None, params=None, retry_attempt=0): + async with aiohttp.ClientSession() as session: + async with session.get(url=url, headers=headers, params=params) as response: + if response.status == 429: + sleep_time_str = math.floor(float(response.headers.get('Retry-After'))) + LOGGER.info("Received 429 -- sleeping for {} seconds".format(sleep_time_str)) + await asyncio.sleep(sleep_time_str) + return await self._get_async(url, headers, params, retry_attempt=retry_attempt+1) + if response.status in range(500, 599): + return await self._get_async(url, headers, params, retry_attempt=retry_attempt+1) + else: + resp = await response.json() + if 'page' in params: + resp['page'] = params['page'] + return resp + + async def _request_count(self): + endpoint = '{}/count.json'.format(self.endpoint) + url = self.base_url + endpoint + resp = await self._get_async(url, params=self.params) + LOGGER.info("GET {}{}?{}".format(self.shop_display_url, endpoint, urlencode(self.params))) + return resp['count'] + + async def _request(self, job): + endpoint = '{}.json'.format(self.endpoint) + url = self.base_url + endpoint + resp = await self._get_async(url, params=job['params']) + LOGGER.info("GET {}{}?{}".format(self.shop_display_url, endpoint, urlencode(job['params']))) + return resp + + async def _runner(self): + result_set_size = await self._request_count() + num_pages = math.ceil(result_set_size/self.results_per_page) + jobs = [{'params': { **self.params, 'page': p+1 }} for p in range(num_pages)] + chunked_jobs = utils.chunk(jobs, self.bucket_size) + + all_jobs_results_dict = {} + for chunk_of_jobs in chunked_jobs: + futures = [self._request(i) for i in chunk_of_jobs] + resp_dict = {} + for i, future in enumerate(asyncio.as_completed(futures)): + result = await future + resp_dict[result['page']] = result[self.result_key] if self.result_key in result else [] + all_jobs_results_dict = {**all_jobs_results_dict, **resp_dict} + ordered_results = [] + for k,v in sorted(all_jobs_results_dict.items()): + if len(v) > 0: + ordered_results += v + return ordered_results + + def Run(self): + result = asyncio.run(self._runner()) + return result + + @classmethod + def find(cls, endpoint, result_key, retry_limit=5, results_per_page=250, **kwargs): + return RunAsync(endpoint, result_key, retry_limit, results_per_page, params=kwargs).Run() \ No newline at end of file diff --git a/tap_shopify/streams/customers.py b/tap_shopify/streams/customers.py index 8afab28c..54171a95 100755 --- a/tap_shopify/streams/customers.py +++ b/tap_shopify/streams/customers.py @@ -1,11 +1,13 @@ import shopify - -from tap_shopify.streams.base import Stream +from tap_shopify.streams.base import Stream, RunAsync from tap_shopify.context import Context class Customers(Stream): name = 'customers' replication_object = shopify.Customer + replication_object_async = RunAsync + endpoint = "/customers" + result_key = "customers" Context.stream_objects['customers'] = Customers diff --git a/tap_shopify/streams/order_refunds.py b/tap_shopify/streams/order_refunds.py index cf85d2cf..3bfb2621 100755 --- a/tap_shopify/streams/order_refunds.py +++ b/tap_shopify/streams/order_refunds.py @@ -23,8 +23,6 @@ def get_refunds(self, parent_object, since_id): def get_objects(self): selected_parent = Context.stream_objects['orders']() selected_parent.name = "refund_orders" - selected_parent.endpoint = "" - selected_parent.result_key = "refunds" # Page through all `orders`, bookmarking at `refund_orders` for parent_object in selected_parent.get_objects(): @@ -43,9 +41,21 @@ def get_objects(self): refunds[-1].id, max([o.id for o in refunds]))) since_id = refunds[-1].id + def get_objects_async(self): + selected_parent = Context.stream_objects['orders']() + selected_parent.name = "refund_orders" + + for parent_object in selected_parent.get_objects_async(): + for refund in parent_object['refunds']: + yield refund + def sync(self): - for refund in self.get_objects(): - refund_dict = refund.to_dict() - yield refund_dict + if Context.config.get("use_async", False): + for obj in self.get_objects_async(): + yield obj + else: + for refund in self.get_objects(): + refund_dict = refund.to_dict() + yield refund_dict Context.stream_objects['order_refunds'] = OrderRefunds diff --git a/tap_shopify/streams/orders.py b/tap_shopify/streams/orders.py index 2f727ecb..b59da156 100755 --- a/tap_shopify/streams/orders.py +++ b/tap_shopify/streams/orders.py @@ -1,93 +1,12 @@ import shopify from tap_shopify.context import Context -from tap_shopify.streams.base import Stream -import singer -from singer import utils -import asyncio -import aiohttp -import math -from urllib.parse import urlencode - - -LOGGER = singer.get_logger() - -class OrderAsync(shopify.Order): - def __init__(self, endpoint, retry_limit, results_per_page, params): - super() - self.retry_limit = retry_limit - self.results_per_page = results_per_page - self.params = params - self.params['limit'] = str(self.results_per_page) - self.base_url = shopify.ShopifyResource.get_site() - self.current_shop = shopify.Shop.current() - self.bucket_size = 80 if self.current_shop.plan_name == "shopify_plus" else 40 - self.shop_display_url = "https://{}".format(self.current_shop.myshopify_domain) - self.endpoint = endpoint - - async def _get_async(self, url, headers=None, params=None, retry_attempt=0): - async with aiohttp.ClientSession() as session: - async with session.get(url=url, headers=headers, params=params) as response: - if response.status == 429: - sleep_time_str = math.floor(float(response.headers.get('Retry-After'))) - LOGGER.info("Received 429 -- sleeping for {} seconds".format(sleep_time_str)) - await asyncio.sleep(sleep_time_str) - return await self._get_async(url, headers, params, retry_attempt=retry_attempt+1) - if response.status in range(500, 599): - return await self._get_async(url, headers, params, retry_attempt=retry_attempt+1) - else: - resp = await response.json() - if 'page' in params: - resp['page'] = params['page'] - return resp - - async def _request_count(self): - endpoint = '{}/count.json'.format(self.endpoint) - url = self.base_url + endpoint - resp = await self._get_async(url, params=self.params) - LOGGER.info("GET {}{}?{}".format(self.shop_display_url, endpoint, urlencode(self.params))) - return resp['count'] - - async def _request(self, job): - endpoint = '{}.json'.format(self.endpoint) - url = self.base_url + endpoint - resp = await self._get_async(url, params=job['params']) - LOGGER.info("GET {}{}?{}".format(self.shop_display_url, endpoint, urlencode(job['params']))) - return resp - - async def _runner(self): - result_set_size = await self._request_count() - num_pages = math.ceil(result_set_size/self.results_per_page) - jobs = [{'params': { **self.params, 'page': p+1 }} for p in range(num_pages)] - chunked_jobs = utils.chunk(jobs, self.bucket_size) - - all_jobs_results_dict = {} - for chunk_of_jobs in chunked_jobs: - futures = [self._request(i) for i in chunk_of_jobs] - resp_dict = {} - for i, future in enumerate(asyncio.as_completed(futures)): - result = await future - orders = result['orders'] if 'orders' in result else [] - resp_dict[result['page']] = orders - all_jobs_results_dict = {**all_jobs_results_dict, **resp_dict} - ordered_results = [] - for k,v in sorted(all_jobs_results_dict.items()): - if len(v) > 0: - ordered_results += v - return ordered_results - - def Run(self): - result = asyncio.run(self._runner()) - return result - - @classmethod - def find(cls, endpoint, retry_limit=5, results_per_page=250, **kwargs): - return OrderAsync(endpoint, retry_limit, results_per_page, params=kwargs).Run() +from tap_shopify.streams.base import Stream, RunAsync class Orders(Stream): name = 'orders' replication_object = shopify.Order - replication_object_async = OrderAsync + replication_object_async = RunAsync endpoint = "/orders" result_key = "orders" diff --git a/tap_shopify/streams/products.py b/tap_shopify/streams/products.py index 6556f514..8670ff32 100755 --- a/tap_shopify/streams/products.py +++ b/tap_shopify/streams/products.py @@ -1,11 +1,13 @@ import shopify - -from tap_shopify.streams.base import Stream +from tap_shopify.streams.base import Stream, RunAsync from tap_shopify.context import Context class Products(Stream): name = 'products' replication_object = shopify.Product + replication_object_async = RunAsync + endpoint = "/products" + result_key = "products" Context.stream_objects['products'] = Products From 4afee06957117ec060e5cb3c69c13f110ca23c35 Mon Sep 17 00:00:00 2001 From: Siddharth Sudheer Date: Fri, 29 Mar 2019 16:30:52 -0400 Subject: [PATCH 4/6] when stream run in async mode, write batches of results to target rather than yielding results. Removed async for abandoned checkouts as you cannot paginate for this. --- tap_shopify/__init__.py | 38 ++++++--- tap_shopify/context.py | 1 + tap_shopify/streams/abandoned_checkouts.py | 3 - tap_shopify/streams/base.py | 95 ++++++++++++---------- tap_shopify/streams/customers.py | 2 +- tap_shopify/streams/order_refunds.py | 3 + tap_shopify/streams/orders.py | 2 +- tap_shopify/streams/products.py | 2 +- 8 files changed, 82 insertions(+), 64 deletions(-) diff --git a/tap_shopify/__init__.py b/tap_shopify/__init__.py index 7b9367f8..4a5ca2fa 100755 --- a/tap_shopify/__init__.py +++ b/tap_shopify/__init__.py @@ -125,6 +125,7 @@ def sync(): stream["key_properties"], bookmark_properties=stream["replication_key"]) Context.counts[stream["tap_stream_id"]] = 0 + Context.durations[stream["tap_stream_id"]] = None # If there is a currently syncing stream bookmark, shuffle the # stream order so it gets sync'd first @@ -134,8 +135,10 @@ def sync(): # Loop over streams in catalog for catalog_entry in Context.catalog['streams']: + stream_start_time = time.time() stream_id = catalog_entry['tap_stream_id'] stream = Context.stream_objects[stream_id]() + stream.schema = catalog_entry['schema'] if not Context.is_selected(stream_id): LOGGER.info('Skipping stream: %s', stream_id) @@ -147,24 +150,33 @@ def sync(): Context.state['bookmarks'] = {} Context.state['bookmarks']['currently_sync_stream'] = stream_id - with Transformer() as transformer: - for rec in stream.sync(): - extraction_time = singer.utils.now() - record_schema = catalog_entry['schema'] - record_metadata = metadata.to_map(catalog_entry['metadata']) - rec = transformer.transform(rec, record_schema, record_metadata) - singer.write_record(stream_id, - rec, - time_extracted=extraction_time) - Context.counts[stream_id] += 1 + if Context.config.get("use_async", False) and stream.async_available: + Context.counts[stream_id] = stream.sync_async() + else: + with Transformer() as transformer: + for rec in stream.sync(): + extraction_time = singer.utils.now() + record_metadata = metadata.to_map(catalog_entry['metadata']) + rec = transformer.transform(rec, stream.schema, record_metadata) + singer.write_record(stream_id, + rec, + time_extracted=extraction_time) + Context.counts[stream_id] += 1 Context.state['bookmarks'].pop('currently_sync_stream') singer.write_state(Context.state) + stream_job_duration = time.strftime("%H:%M:%S", time.gmtime(time.time() - stream_start_time)) + Context.durations[stream_id] = stream_job_duration - LOGGER.info('----------------------') + div = "-"*50 + info_msg = "\n{d}".format(d=div) + info_msg += "\nShop: {}".format(Context.config['shop']) + info_msg += "\n{d}\n".format(d=div) for stream_id, stream_count in Context.counts.items(): - LOGGER.info('%s: %d', stream_id, stream_count) - LOGGER.info('----------------------') + info_msg += "\n{}: {}".format(stream_id, stream_count) + info_msg += "\nDuration: {}".format(Context.durations[stream_id]) + info_msg += "\n{d}\n".format(d=div) + LOGGER.info(info_msg) @utils.handle_top_exception(LOGGER) def main(): diff --git a/tap_shopify/context.py b/tap_shopify/context.py index cb72a35e..e503e587 100755 --- a/tap_shopify/context.py +++ b/tap_shopify/context.py @@ -7,6 +7,7 @@ class Context(): stream_map = {} stream_objects = {} counts = {} + durations = {} @classmethod def get_catalog_entry(cls, stream_name): diff --git a/tap_shopify/streams/abandoned_checkouts.py b/tap_shopify/streams/abandoned_checkouts.py index 1acc69f6..9dd7798d 100755 --- a/tap_shopify/streams/abandoned_checkouts.py +++ b/tap_shopify/streams/abandoned_checkouts.py @@ -5,8 +5,5 @@ class AbandonedCheckouts(Stream): name = 'abandoned_checkouts' replication_object = shopify.Checkout - replication_object_async = RunAsync - endpoint = "/checkouts" - result_key = "checkouts" Context.stream_objects['abandoned_checkouts'] = AbandonedCheckouts diff --git a/tap_shopify/streams/base.py b/tap_shopify/streams/base.py index 71213e5b..623aa2d7 100755 --- a/tap_shopify/streams/base.py +++ b/tap_shopify/streams/base.py @@ -13,6 +13,7 @@ import aiohttp import shopify from urllib.parse import urlencode +from singer import Transformer LOGGER = singer.get_logger() @@ -25,9 +26,6 @@ # We will retry a 500 error a maximum of 5 times before giving up MAX_RETRIES = 5 -# Streams that can run Async -ASYNC_AVAILABLE_STREAMS = ['orders', 'products', 'customers', 'abandoned_checkouts'] - def is_not_status_code_fn(status_code): def gen_fn(exc): if getattr(exc, 'code', None) and exc.code not in status_code: @@ -92,6 +90,8 @@ class Stream(): replication_object_async = None endpoint = None result_key = None + schema = None + async_available = False def get_bookmark(self): bookmark = (singer.get_bookmark(Context.state, @@ -192,7 +192,23 @@ def get_objects(self): updated_at_min = updated_at_max - def get_objects_async(self): + + def sync(self): + """Yield's processed SDK object dicts to the caller. + + This is the default implementation. Get's all of self's objects + and calls to_dict on them with no further processing. + """ + for obj in self.get_objects(): + yield obj.to_dict() + + + def sync_async(self): + """ + Gets objects for endpoint, and writes singer records. + Returns the total number of records received and + emitted to target. + """ updated_at_min = self.get_bookmark() end_date = Context.config.get("end_date", None) updated_at_max = utils.strptime_with_tz(end_date) if end_date is not None else singer.utils.now().replace(microsecond=0) @@ -205,46 +221,37 @@ def get_objects_async(self): "status": "any" } - objects = self.replication_object_async.find( + recs_count = RunAsync.sync( + schema = self.schema, + stream_id = self.name, endpoint = self.endpoint, result_key = self.result_key, - retry_limit = MAX_RETRIES, - results_per_page = results_per_page, - **query_params + params = query_params, + retry_limit = MAX_RETRIES, + results_per_page = results_per_page ) - for obj in objects: - yield obj - self.update_bookmark(utils.strftime(updated_at_max)) - def sync(self): - """Yield's processed SDK object dicts to the caller. - - This is the default implementation. Get's all of self's objects - and calls to_dict on them with no further processing. - """ - if Context.config.get("use_async", False) and self.name in ASYNC_AVAILABLE_STREAMS: - for obj in self.get_objects_async(): - yield obj - else: - for obj in self.get_objects(): - yield obj.to_dict() + return recs_count class RunAsync(): - def __init__(self, endpoint, result_key, retry_limit, results_per_page, params): - super() + def __init__(self, schema, stream_id, endpoint, result_key, params, retry_limit, results_per_page): + self.schema = schema + self.stream_id = stream_id + self.endpoint = endpoint + self.result_key = result_key + self.params = params self.retry_limit = retry_limit self.results_per_page = results_per_page - self.params = params + self.params['limit'] = str(self.results_per_page) self.base_url = shopify.ShopifyResource.get_site() self.current_shop = shopify.Shop.current() self.bucket_size = 80 if self.current_shop.plan_name == "shopify_plus" else 40 self.shop_display_url = "https://{}".format(self.current_shop.myshopify_domain) - self.endpoint = endpoint - self.result_key = result_key + self.rec_count = 0 async def _get_async(self, url, headers=None, params=None, retry_attempt=0): async with aiohttp.ClientSession() as session: @@ -257,10 +264,7 @@ async def _get_async(self, url, headers=None, params=None, retry_attempt=0): if response.status in range(500, 599): return await self._get_async(url, headers, params, retry_attempt=retry_attempt+1) else: - resp = await response.json() - if 'page' in params: - resp['page'] = params['page'] - return resp + return await response.json() async def _request_count(self): endpoint = '{}/count.json'.format(self.endpoint) @@ -282,24 +286,25 @@ async def _runner(self): jobs = [{'params': { **self.params, 'page': p+1 }} for p in range(num_pages)] chunked_jobs = utils.chunk(jobs, self.bucket_size) - all_jobs_results_dict = {} for chunk_of_jobs in chunked_jobs: futures = [self._request(i) for i in chunk_of_jobs] - resp_dict = {} for i, future in enumerate(asyncio.as_completed(futures)): result = await future - resp_dict[result['page']] = result[self.result_key] if self.result_key in result else [] - all_jobs_results_dict = {**all_jobs_results_dict, **resp_dict} - ordered_results = [] - for k,v in sorted(all_jobs_results_dict.items()): - if len(v) > 0: - ordered_results += v - return ordered_results + if self.result_key in result: + self._write_singer_records(result[self.result_key]) + + def _write_singer_records(self, recs): + with Transformer() as transformer: + for rec in recs: + extraction_time = singer.utils.now() + transformed_rec = transformer.transform(rec, self.schema) + singer.write_record(self.stream_id, transformed_rec, time_extracted=extraction_time) + self.rec_count += 1 def Run(self): - result = asyncio.run(self._runner()) - return result + asyncio.run(self._runner()) + return self.rec_count @classmethod - def find(cls, endpoint, result_key, retry_limit=5, results_per_page=250, **kwargs): - return RunAsync(endpoint, result_key, retry_limit, results_per_page, params=kwargs).Run() \ No newline at end of file + def sync(cls, schema, stream_id, endpoint, result_key, params, retry_limit=5, results_per_page=250): + return RunAsync(schema, stream_id, endpoint, result_key, params, retry_limit, results_per_page).Run() \ No newline at end of file diff --git a/tap_shopify/streams/customers.py b/tap_shopify/streams/customers.py index 54171a95..6c00f9d9 100755 --- a/tap_shopify/streams/customers.py +++ b/tap_shopify/streams/customers.py @@ -6,8 +6,8 @@ class Customers(Stream): name = 'customers' replication_object = shopify.Customer - replication_object_async = RunAsync endpoint = "/customers" result_key = "customers" + async_available = True Context.stream_objects['customers'] = Customers diff --git a/tap_shopify/streams/order_refunds.py b/tap_shopify/streams/order_refunds.py index 3bfb2621..bf9c27de 100755 --- a/tap_shopify/streams/order_refunds.py +++ b/tap_shopify/streams/order_refunds.py @@ -11,6 +11,8 @@ class OrderRefunds(Stream): name = 'order_refunds' replication_object = shopify.Refund replication_key = 'created_at' + result_key = "refunds" + async_available = False @shopify_error_handling def get_refunds(self, parent_object, since_id): @@ -58,4 +60,5 @@ def sync(self): refund_dict = refund.to_dict() yield refund_dict + Context.stream_objects['order_refunds'] = OrderRefunds diff --git a/tap_shopify/streams/orders.py b/tap_shopify/streams/orders.py index b59da156..142759e0 100755 --- a/tap_shopify/streams/orders.py +++ b/tap_shopify/streams/orders.py @@ -6,8 +6,8 @@ class Orders(Stream): name = 'orders' replication_object = shopify.Order - replication_object_async = RunAsync endpoint = "/orders" result_key = "orders" + async_available = True Context.stream_objects['orders'] = Orders diff --git a/tap_shopify/streams/products.py b/tap_shopify/streams/products.py index 8670ff32..d81b10fe 100755 --- a/tap_shopify/streams/products.py +++ b/tap_shopify/streams/products.py @@ -6,8 +6,8 @@ class Products(Stream): name = 'products' replication_object = shopify.Product - replication_object_async = RunAsync endpoint = "/products" result_key = "products" + async_available = True Context.stream_objects['products'] = Products From 6956e6cc631ac3c873daebddf966aaf1a622b21d Mon Sep 17 00:00:00 2001 From: Siddharth Sudheer Date: Tue, 2 Apr 2019 15:16:39 -0400 Subject: [PATCH 5/6] async spike --- spikes/async/first_comparison.py | 287 +++ spikes/async/orders-schema.json | 3665 +++++++++++++++++++++++++++++ spikes/async/original_tap.py | 95 + spikes/async/page_async_tap.py | 161 ++ spikes/async/sidd_async_tap.py | 191 ++ spikes/async/sidd_endpoint_tap.py | 120 + spikes/async/tester.py | 62 + tap_shopify/streams/base.py | 106 +- 8 files changed, 4659 insertions(+), 28 deletions(-) create mode 100755 spikes/async/first_comparison.py create mode 100644 spikes/async/orders-schema.json create mode 100755 spikes/async/original_tap.py create mode 100755 spikes/async/page_async_tap.py create mode 100755 spikes/async/sidd_async_tap.py create mode 100755 spikes/async/sidd_endpoint_tap.py create mode 100755 spikes/async/tester.py diff --git a/spikes/async/first_comparison.py b/spikes/async/first_comparison.py new file mode 100755 index 00000000..bccef14d --- /dev/null +++ b/spikes/async/first_comparison.py @@ -0,0 +1,287 @@ +#!/usr/bin/env python3 +import math +import shopify +from datetime import datetime +import time +import sys +import requests +import time +import asyncio +import aiohttp +import json +import singer +from singer import utils + +LOGGER = singer.get_logger() + +from urllib.parse import urlparse, urlencode + +SHOP_NAME = '' +API_KEY = '' +PASSWORD = '' +shop_url = "https://%s:%s@%s.myshopify.com/admin" % (API_KEY, PASSWORD, SHOP_NAME) +shopify.ShopifyResource.set_site(shop_url) +RESULTS_PER_PAGE = 250 +ORDERS_ENDPOINT='/orders' + + + + +def request_orders(params): + req_url = shop_url + '/orders.json' + resp = requests.get(req_url, params=params) + if resp.status_code == 429: + sleep_time_str = resp.headers['Retry-After'] + print("!!!!!!!!!!!!!!!!!!") + print("Received 429 -- sleeping for {} seconds".format(sleep_time_str)) + print("!!!!!!!!!!!!!!!!!!") + time.sleep(math.floor(float(sleep_time_str))) + resp = requests.get(params) + return resp.json()['orders'] if 'orders' in resp.json() else [] + + +def print_stats(ls, num_orders): + _fmt_duration = lambda x: str(round(x,2)) + "s" + num_times = len(ls) + total_run_time = sum([v['duration'] for v in ls.values()]) + avg_run_time = total_run_time/num_times + + print(" Total Num. Runs : {}".format(num_times)) + print(" Num. Orders/Run : {}".format(num_orders)) + print(" Total Run Time : {}".format(_fmt_duration(total_run_time))) + print(" Avg. Run Time : {}".format(_fmt_duration(avg_run_time))) + + div = " " + "-"*46 + print("{d}\n Granular Stats\n{d}".format(d=div)) + for i, v in ls.items(): + print(" Run {} (num_results={}) : {}".format(i+1, v['num_rows'], _fmt_duration(v['duration']))) + print("{d}\n".format(d=div)) + + +class Error(Exception): + """Base exception for the API interaction module""" + +class OutOfOrderIdsError(Error): + """Raised if our expectation of ordering by ID is violated""" + +def since_id_bs(): + updated_at_min = utils.strptime_with_tz("2019-02-01 00:00:00") + stop_time = utils.strptime_with_tz("2019-03-27 20:00:00") + results_per_page = 250 + curr_since_id = 1 + final = [] + + while updated_at_min < stop_time: + since_id = curr_since_id + + if since_id != 1: + LOGGER.info("Resuming sync from since_id %d", since_id) + + updated_at_max = stop_time + if updated_at_max > stop_time: + updated_at_max = stop_time + while True: + query_params = { + "since_id": since_id, + "updated_at_min": updated_at_min, + "updated_at_max": updated_at_max, + "limit": results_per_page, + "status": "any" + } + objects = shopify.Order.find(**query_params) + for obj in objects: + if obj.id < since_id: + raise OutOfOrderIdsError("obj.id < since_id: {} < {}".format( + obj.id, since_id)) + final.append(obj.to_dict()) + + if len(objects) < results_per_page: + return final + + if objects[-1].id != max([o.id for o in objects]): + # This verifies the api behavior expectation we have + # that all pages are internally ordered by the + # `since_id`. + raise OutOfOrderIdsError("{} is not the max id in objects ({})".format( + objects[-1].id, max([o.id for o in objects]))) + since_id = objects[-1].id + + # Put since_id into the state. + curr_since_id = since_id + + updated_at_min = updated_at_max + + +class OrderAsync(shopify.Order): + def __init__(self, endpoint, retry_limit, results_per_page, params): + super() + self.retry_limit = retry_limit + self.results_per_page = results_per_page + self.params = params + self.params['limit'] = str(self.results_per_page) + self.base_url = shopify.ShopifyResource.get_site() + self.current_shop = shopify.Shop.current() + self.bucket_size = 80 if self.current_shop.plan_name == "shopify_plus" else 40 + self.shop_display_url = "https://{}".format(self.current_shop.myshopify_domain) + self.endpoint = endpoint + + async def _get_async(self, url, headers=None, params=None, retry_attempt=0): + async with aiohttp.ClientSession() as session: + async with session.get(url=url, headers=headers, params=params) as response: + if response.status == 429: + sleep_time_str = math.floor(float(response.headers.get('Retry-After'))) + LOGGER.info("Received 429 -- sleeping for {} seconds".format(sleep_time_str)) + await asyncio.sleep(sleep_time_str) + return await self._get_async(url, headers, params, retry_attempt=retry_attempt+1) + if response.status in range(500, 599): + return await self._get_async(url, headers, params, retry_attempt=retry_attempt+1) + else: + resp = await response.json() + resp['page'] = params['page'] if 'page' in params else None + return resp + + async def _request_count(self): + endpoint = '{}/count.json'.format(self.endpoint) + url = self.base_url + endpoint + resp = await self._get_async(url, params=self.params) + LOGGER.info("GET {}{}?{}".format(self.shop_display_url, endpoint, urlencode(self.params))) + return resp['count'] + + async def _request(self, job): + endpoint = '{}.json'.format(self.endpoint) + url = self.base_url + endpoint + resp = await self._get_async(url, params=job['params']) + LOGGER.info("GET {}{}?{}".format(self.shop_display_url, endpoint, urlencode(job['params']))) + return resp + + async def _runner(self): + result_set_size = await self._request_count() + num_pages = math.ceil(result_set_size/self.results_per_page) + jobs = [{'params': { **self.params, 'page': p+1 }} for p in range(num_pages)] + chunked_jobs = utils.chunk(jobs, self.bucket_size) + + all_jobs_results_dict = {} + for chunk_of_jobs in chunked_jobs: + futures = [self._request(i) for i in chunk_of_jobs] + resp_dict = {} + for i, future in enumerate(asyncio.as_completed(futures)): + result = await future + orders = result['orders'] if 'orders' in result else [] + resp_dict[result['page']] = orders + all_jobs_results_dict = {**all_jobs_results_dict, **resp_dict} + ordered_results = [] + for k,v in sorted(all_jobs_results_dict.items()): + if len(v) > 0: + ordered_results += v + return ordered_results + + def Run(self): + result = asyncio.run(self._runner()) + return result + + @classmethod + def find(cls, endpoint, retry_limit=5, results_per_page=250, **kwargs): + return OrderAsync(endpoint, retry_limit, results_per_page, params=kwargs).Run() + + +def main(): + # start_date = "2019-03-26T00:00:00-0:00" + # end_date = "2019-03-27T20:00:00-0:00" + start_date = "2019-03-26 00:00:00" + end_date = "2019-03-27 20:00:00" + updated_at_min = utils.strptime_with_tz(start_date) + updated_at_max = utils.strptime_with_tz(end_date) if end_date is not None else singer.utils.now().replace(microsecond=0) + DT_FMT = '%Y-%m-%dT%H:%M:%S' + query_params = { + "updated_at_min": utils.strftime(updated_at_min, format_str=DT_FMT), + "updated_at_max": utils.strftime(updated_at_max, format_str=DT_FMT), + "status": "any" + } + + print(query_params) + + # 2014-04-25T16:15:47-04:00 + + async_start_time = time.time() + orders = OrderAsync.find(endpoint=ORDERS_ENDPOINT, **query_params) + async_duration = time.time() - async_start_time + print("Orders: ", len(orders)) + print("Duration: ", async_duration) + + # pg = [o['page'] for o in orders] + # print(pg) + # for v in orders: + # print("{}".format(len(v))) + # for k,v in orders.items(): + # print("Page {}: {}".format(k, len(v))) + + # since_id_orders = since_id_bs() + + # print("NUM. ORDERS RECV Request: {}".format(len(orders))) + # print("NUM. ORDERS RECV Since_ID: {}".format(len(since_id_orders))) + + # orders_ids = [x['id'] for x in orders] + # since_id_orders = [x['id'] for x in since_id_orders] + + + # not_in_since_orders_ids = [] + # for o in orders_ids: + # if o not in since_id_orders: + # not_in_since_orders_ids.append(o) + + # not_in_orders_ids = [] + # for o in since_id_orders: + # if o not in orders_ids: + # not_in_orders_ids.append(o) + + # print('not_in_since_orders_ids: ', not_in_since_orders_ids) + # print('not_in_orders_ids: ', not_in_orders_ids) + + + # TOTAL ORDERS: 127407 +# def main(): +# NUM_TIMES_TO_RUN = 1 +# BUCKET_SIZE = 80 +# NUM_ORDERS = 40000 +# sync_durations = {} +# async_durations = {} + +# sleep_between_sync_and_async = 5 + +# for n in range(NUM_TIMES_TO_RUN): +# order_jobs = int(NUM_ORDERS/RESULTS_PER_PAGE) +# params = {"limit": str(RESULTS_PER_PAGE)} +# # Running Sync +# sync_start_time = time.time() +# sync_results = [] +# for j in range(order_jobs): +# sync_results += request_orders(params) +# sync_duration = time.time() - sync_start_time +# sync_durations[n] = {'duration': sync_duration, 'num_rows': len(sync_results)} + +# print("Sleeping {}s between sync and async just to clear out".format(sleep_between_sync_and_async)) +# time.sleep(sleep_between_sync_and_async) + +# Running Async +# async_start_time = time.time() +# async_jobs = [params for p in range(order_jobs)] +# async_jobs_chunked = chunks(async_jobs, BUCKET_SIZE) +# async_results = [] +# for chunk in async_jobs_chunked: +# async_results += GetOrdersAsync(shop_url, chunk).Run() +# async_duration = time.time() - async_start_time +# async_durations[n] = {'duration': async_duration, 'num_rows': len(async_results)} + +# div = "="*50 + +# print("{d}\n Sync Stats\n{d}".format(d=div)) +# print_stats(sync_durations, NUM_ORDERS) +# print("{d}\n".format(d=div)) + +# print("{d}\n Async Stats\n{d}".format(d=div)) +# print_stats(async_durations, NUM_ORDERS) +# print("{d}\n".format(d=div)) + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/spikes/async/orders-schema.json b/spikes/async/orders-schema.json new file mode 100644 index 00000000..01146cec --- /dev/null +++ b/spikes/async/orders-schema.json @@ -0,0 +1,3665 @@ +{ + "stream": "orders", + "tap_stream_id": "orders", + "schema": { + "properties": { + "presentment_currency": { + "type": [ + "null", + "string" + ] + }, + "subtotal_price_set": {}, + "total_discounts_set": {}, + "total_line_items_price_set": {}, + "total_price_set": {}, + "total_shipping_price_set": {}, + "total_tax_set": {}, + "total_price": { + "type": [ + "null", + "number" + ] + }, + "line_items": { + "items": { + "properties": { + "applied_discounts": {}, + "total_discount_set": {}, + "pre_tax_price_set": {}, + "price_set": {}, + "grams": { + "type": [ + "null", + "integer" + ] + }, + "compare_at_price": { + "type": [ + "null", + "string" + ] + }, + "destination_location_id": { + "type": [ + "null", + "integer" + ] + }, + "key": { + "type": [ + "null", + "string" + ] + }, + "line_price": { + "type": [ + "null", + "string" + ] + }, + "origin_location_id": { + "type": [ + "null", + "integer" + ] + }, + "applied_discount": { + "type": [ + "null", + "integer" + ] + }, + "fulfillable_quantity": { + "type": [ + "null", + "integer" + ] + }, + "variant_title": { + "type": [ + "null", + "string" + ] + }, + "properties": { + "items": { + "properties": { + "name": { + "type": [ + "null", + "string" + ] + }, + "value": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "type": [ + "null", + "array" + ] + }, + "tax_code": { + "type": [ + "null", + "string" + ] + }, + "discount_allocations": { + "items": { + "properties": { + "discount_application_index": { + "type": [ + "null", + "integer" + ] + }, + "amount_set": {}, + "amount": { + "type": [ + "null", + "number" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "type": [ + "null", + "array" + ] + }, + "admin_graphql_api_id": { + "type": [ + "null", + "string" + ] + }, + "pre_tax_price": { + "type": [ + "null", + "number" + ] + }, + "sku": { + "type": [ + "null", + "string" + ] + }, + "product_exists": { + "type": [ + "null", + "boolean" + ] + }, + "total_discount": { + "type": [ + "null", + "number" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "fulfillment_status": { + "type": [ + "null", + "string" + ] + }, + "gift_card": { + "type": [ + "null", + "boolean" + ] + }, + "id": { + "type": [ + "null", + "integer", + "string" + ] + }, + "taxable": { + "type": [ + "null", + "boolean" + ] + }, + "vendor": { + "type": [ + "null", + "string" + ] + }, + "tax_lines": { + "items": { + "properties": { + "price_set": {}, + "price": { + "type": [ + "null", + "number" + ] + }, + "title": { + "type": [ + "null", + "string" + ] + }, + "rate": { + "type": [ + "null", + "number" + ] + }, + "compare_at": { + "type": [ + "null", + "string" + ] + }, + "position": { + "type": [ + "null", + "integer" + ] + }, + "source": { + "type": [ + "null", + "string" + ] + }, + "zone": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "type": [ + "null", + "array" + ] + }, + "origin_location": { + "properties": { + "country_code": { + "type": [ + "null", + "string" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "address1": { + "type": [ + "null", + "string" + ] + }, + "city": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "integer" + ] + }, + "address2": { + "type": [ + "null", + "string" + ] + }, + "province_code": { + "type": [ + "null", + "string" + ] + }, + "zip": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "price": { + "type": [ + "null", + "number" + ] + }, + "requires_shipping": { + "type": [ + "null", + "boolean" + ] + }, + "fulfillment_service": { + "type": [ + "null", + "string" + ] + }, + "variant_inventory_management": { + "type": [ + "null", + "string" + ] + }, + "title": { + "type": [ + "null", + "string" + ] + }, + "destination_location": { + "properties": { + "country_code": { + "type": [ + "null", + "string" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "address1": { + "type": [ + "null", + "string" + ] + }, + "city": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "integer" + ] + }, + "address2": { + "type": [ + "null", + "string" + ] + }, + "province_code": { + "type": [ + "null", + "string" + ] + }, + "zip": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "quantity": { + "type": [ + "null", + "integer" + ] + }, + "product_id": { + "type": [ + "null", + "integer" + ] + }, + "variant_id": { + "type": [ + "null", + "integer" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "type": [ + "null", + "array" + ] + }, + "processing_method": { + "type": [ + "null", + "string" + ] + }, + "order_number": { + "type": [ + "null", + "integer" + ] + }, + "confirmed": { + "type": [ + "null", + "boolean" + ] + }, + "total_discounts": { + "type": [ + "null", + "number" + ] + }, + "total_line_items_price": { + "type": [ + "null", + "number" + ] + }, + "order_adjustments": { + "items": { + "properties": { + "order_id": { + "type": [ + "null", + "integer" + ] + }, + "tax_amount": { + "type": [ + "null", + "number" + ] + }, + "refund_id": { + "type": [ + "null", + "integer" + ] + }, + "amount": { + "type": [ + "null", + "number" + ] + }, + "kind": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "integer" + ] + }, + "reason": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "type": [ + "null", + "array" + ] + }, + "shipping_lines": { + "items": { + "properties": { + "tax_lines": { + "items": { + "properties": { + "price_set": {}, + "price": { + "type": [ + "null", + "number" + ] + }, + "title": { + "type": [ + "null", + "string" + ] + }, + "rate": { + "type": [ + "null", + "number" + ] + }, + "compare_at": { + "type": [ + "null", + "string" + ] + }, + "position": { + "type": [ + "null", + "integer" + ] + }, + "source": { + "type": [ + "null", + "string" + ] + }, + "zone": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "type": [ + "null", + "array" + ] + }, + "phone": { + "type": [ + "null", + "string" + ] + }, + "discounted_price_set": {}, + "price_set": {}, + "price": { + "type": [ + "null", + "number" + ] + }, + "title": { + "type": [ + "null", + "string" + ] + }, + "discount_allocations": { + "items": { + "properties": { + "discount_application_index": { + "type": [ + "null", + "integer" + ] + }, + "amount": { + "type": [ + "null", + "number" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "type": [ + "null", + "array" + ] + }, + "delivery_category": { + "type": [ + "null", + "string" + ] + }, + "discounted_price": { + "type": [ + "null", + "number" + ] + }, + "code": { + "type": [ + "null", + "string" + ] + }, + "requested_fulfillment_service_id": { + "type": [ + "null", + "string" + ] + }, + "carrier_identifier": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "integer" + ] + }, + "source": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "type": [ + "null", + "array" + ] + }, + "admin_graphql_api_id": { + "type": [ + "null", + "string" + ] + }, + "device_id": { + "type": [ + "null", + "integer" + ] + }, + "cancel_reason": { + "type": [ + "null", + "string" + ] + }, + "currency": { + "type": [ + "null", + "string" + ] + }, + "payment_gateway_names": { + "items": { + "type": [ + "null", + "string" + ] + }, + "type": [ + "null", + "array" + ] + }, + "source_identifier": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "integer" + ] + }, + "processed_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "referring_site": { + "type": [ + "null", + "string" + ] + }, + "contact_email": { + "type": [ + "null", + "string" + ] + }, + "location_id": { + "type": [ + "null", + "integer" + ] + }, + "fulfillments": { + "items": { + "properties": { + "location_id": { + "type": [ + "null", + "integer" + ] + }, + "receipt": { + "type": [ + "null", + "object" + ], + "properties": { + "testcase": { + "type": [ + "null", + "boolean" + ] + }, + "authorization": { + "type": [ + "null", + "string" + ] + } + } + }, + "tracking_number": { + "type": [ + "null", + "string" + ] + }, + "created_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "shipment_status": { + "type": [ + "null", + "string" + ] + }, + "line_items": { + "items": { + "properties": { + "applied_discounts": {}, + "total_discount_set": {}, + "pre_tax_price_set": {}, + "price_set": {}, + "grams": { + "type": [ + "null", + "integer" + ] + }, + "compare_at_price": { + "type": [ + "null", + "string" + ] + }, + "destination_location_id": { + "type": [ + "null", + "integer" + ] + }, + "key": { + "type": [ + "null", + "string" + ] + }, + "line_price": { + "type": [ + "null", + "string" + ] + }, + "origin_location_id": { + "type": [ + "null", + "integer" + ] + }, + "applied_discount": { + "type": [ + "null", + "integer" + ] + }, + "fulfillable_quantity": { + "type": [ + "null", + "integer" + ] + }, + "variant_title": { + "type": [ + "null", + "string" + ] + }, + "properties": { + "items": { + "properties": { + "name": { + "type": [ + "null", + "string" + ] + }, + "value": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "type": [ + "null", + "array" + ] + }, + "tax_code": { + "type": [ + "null", + "string" + ] + }, + "discount_allocations": { + "items": { + "properties": { + "discount_application_index": { + "type": [ + "null", + "integer" + ] + }, + "amount_set": {}, + "amount": { + "type": [ + "null", + "number" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "type": [ + "null", + "array" + ] + }, + "admin_graphql_api_id": { + "type": [ + "null", + "string" + ] + }, + "pre_tax_price": { + "type": [ + "null", + "number" + ] + }, + "sku": { + "type": [ + "null", + "string" + ] + }, + "product_exists": { + "type": [ + "null", + "boolean" + ] + }, + "total_discount": { + "type": [ + "null", + "number" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "fulfillment_status": { + "type": [ + "null", + "string" + ] + }, + "gift_card": { + "type": [ + "null", + "boolean" + ] + }, + "id": { + "type": [ + "null", + "integer", + "string" + ] + }, + "taxable": { + "type": [ + "null", + "boolean" + ] + }, + "vendor": { + "type": [ + "null", + "string" + ] + }, + "tax_lines": { + "items": { + "properties": { + "price_set": {}, + "price": { + "type": [ + "null", + "number" + ] + }, + "title": { + "type": [ + "null", + "string" + ] + }, + "rate": { + "type": [ + "null", + "number" + ] + }, + "compare_at": { + "type": [ + "null", + "string" + ] + }, + "position": { + "type": [ + "null", + "integer" + ] + }, + "source": { + "type": [ + "null", + "string" + ] + }, + "zone": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "type": [ + "null", + "array" + ] + }, + "origin_location": { + "properties": { + "country_code": { + "type": [ + "null", + "string" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "address1": { + "type": [ + "null", + "string" + ] + }, + "city": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "integer" + ] + }, + "address2": { + "type": [ + "null", + "string" + ] + }, + "province_code": { + "type": [ + "null", + "string" + ] + }, + "zip": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "price": { + "type": [ + "null", + "number" + ] + }, + "requires_shipping": { + "type": [ + "null", + "boolean" + ] + }, + "fulfillment_service": { + "type": [ + "null", + "string" + ] + }, + "variant_inventory_management": { + "type": [ + "null", + "string" + ] + }, + "title": { + "type": [ + "null", + "string" + ] + }, + "destination_location": { + "properties": { + "country_code": { + "type": [ + "null", + "string" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "address1": { + "type": [ + "null", + "string" + ] + }, + "city": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "integer" + ] + }, + "address2": { + "type": [ + "null", + "string" + ] + }, + "province_code": { + "type": [ + "null", + "string" + ] + }, + "zip": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "quantity": { + "type": [ + "null", + "integer" + ] + }, + "product_id": { + "type": [ + "null", + "integer" + ] + }, + "variant_id": { + "type": [ + "null", + "integer" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "type": [ + "null", + "array" + ] + }, + "tracking_url": { + "type": [ + "null", + "string" + ] + }, + "service": { + "type": [ + "null", + "string" + ] + }, + "status": { + "type": [ + "null", + "string" + ] + }, + "admin_graphql_api_id": { + "type": [ + "null", + "string" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "tracking_urls": { + "items": { + "type": [ + "null", + "string" + ] + }, + "type": [ + "null", + "array" + ] + }, + "tracking_numbers": { + "items": { + "type": [ + "null", + "string" + ] + }, + "type": [ + "null", + "array" + ] + }, + "id": { + "type": [ + "null", + "integer" + ] + }, + "tracking_company": { + "type": [ + "null", + "string" + ] + }, + "updated_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" + } + }, + "type": [ + "null", + "object" + ] + }, + "type": [ + "null", + "array" + ] + }, + "customer": { + "type": "object", + "properties": { + "last_order_name": { + "type": [ + "null", + "string" + ] + }, + "currency": { + "type": [ + "null", + "string" + ] + }, + "email": { + "type": [ + "null", + "string" + ] + }, + "multipass_identifier": { + "type": [ + "null", + "string" + ] + }, + "default_address": { + "type": [ + "null", + "object" + ], + "properties": { + "city": { + "type": [ + "null", + "string" + ] + }, + "address1": { + "type": [ + "null", + "string" + ] + }, + "zip": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "integer" + ] + }, + "country_name": { + "type": [ + "null", + "string" + ] + }, + "province": { + "type": [ + "null", + "string" + ] + }, + "phone": { + "type": [ + "null", + "string" + ] + }, + "country": { + "type": [ + "null", + "string" + ] + }, + "first_name": { + "type": [ + "null", + "string" + ] + }, + "customer_id": { + "type": [ + "null", + "integer" + ] + }, + "default": { + "type": [ + "null", + "boolean" + ] + }, + "last_name": { + "type": [ + "null", + "string" + ] + }, + "country_code": { + "type": [ + "null", + "string" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "province_code": { + "type": [ + "null", + "string" + ] + }, + "address2": { + "type": [ + "null", + "string" + ] + }, + "company": { + "type": [ + "null", + "string" + ] + } + } + }, + "orders_count": { + "type": [ + "null", + "integer" + ] + }, + "state": { + "type": [ + "null", + "string" + ] + }, + "verified_email": { + "type": [ + "null", + "boolean" + ] + }, + "total_spent": { + "type": [ + "null", + "string" + ] + }, + "last_order_id": { + "type": [ + "null", + "integer" + ] + }, + "first_name": { + "type": [ + "null", + "string" + ] + }, + "updated_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "note": { + "type": [ + "null", + "string" + ] + }, + "phone": { + "type": [ + "null", + "string" + ] + }, + "admin_graphql_api_id": { + "type": [ + "null", + "string" + ] + }, + "addresses": { + "type": [ + "null", + "array" + ], + "items": { + "type": [ + "null", + "object" + ], + "properties": { + "city": { + "type": [ + "null", + "string" + ] + }, + "address1": { + "type": [ + "null", + "string" + ] + }, + "zip": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "integer" + ] + }, + "country_name": { + "type": [ + "null", + "string" + ] + }, + "province": { + "type": [ + "null", + "string" + ] + }, + "phone": { + "type": [ + "null", + "string" + ] + }, + "country": { + "type": [ + "null", + "string" + ] + }, + "first_name": { + "type": [ + "null", + "string" + ] + }, + "customer_id": { + "type": [ + "null", + "integer" + ] + }, + "default": { + "type": [ + "null", + "boolean" + ] + }, + "last_name": { + "type": [ + "null", + "string" + ] + }, + "country_code": { + "type": [ + "null", + "string" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "province_code": { + "type": [ + "null", + "string" + ] + }, + "address2": { + "type": [ + "null", + "string" + ] + }, + "company": { + "type": [ + "null", + "string" + ] + } + } + } + }, + "last_name": { + "type": [ + "null", + "string" + ] + }, + "tags": { + "type": [ + "null", + "string" + ] + }, + "tax_exempt": { + "type": [ + "null", + "boolean" + ] + }, + "id": { + "type": [ + "null", + "integer" + ] + }, + "accepts_marketing": { + "type": [ + "null", + "boolean" + ] + }, + "created_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" + } + } + }, + "test": { + "type": [ + "null", + "boolean" + ] + }, + "total_tax": { + "type": [ + "null", + "number" + ] + }, + "payment_details": { + "properties": { + "avs_result_code": { + "type": [ + "null", + "string" + ] + }, + "credit_card_company": { + "type": [ + "null", + "string" + ] + }, + "cvv_result_code": { + "type": [ + "null", + "string" + ] + }, + "credit_card_bin": { + "type": [ + "null", + "string" + ] + }, + "credit_card_number": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "number": { + "type": [ + "null", + "integer" + ] + }, + "email": { + "type": [ + "null", + "string" + ] + }, + "source_name": { + "type": [ + "null", + "string" + ] + }, + "landing_site_ref": { + "type": [ + "null", + "string" + ] + }, + "shipping_address": { + "properties": { + "phone": { + "type": [ + "null", + "string" + ] + }, + "country": { + "type": [ + "null", + "string" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "address1": { + "type": [ + "null", + "string" + ] + }, + "longitude": { + "type": [ + "null", + "number" + ] + }, + "address2": { + "type": [ + "null", + "string" + ] + }, + "last_name": { + "type": [ + "null", + "string" + ] + }, + "first_name": { + "type": [ + "null", + "string" + ] + }, + "province": { + "type": [ + "null", + "string" + ] + }, + "city": { + "type": [ + "null", + "string" + ] + }, + "company": { + "type": [ + "null", + "string" + ] + }, + "latitude": { + "type": [ + "null", + "number" + ] + }, + "country_code": { + "type": [ + "null", + "string" + ] + }, + "province_code": { + "type": [ + "null", + "string" + ] + }, + "zip": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "total_price_usd": { + "type": [ + "null", + "number" + ] + }, + "closed_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "discount_applications": { + "items": { + "properties": { + "target_type": { + "type": [ + "null", + "string" + ] + }, + "code": { + "type": [ + "null", + "string" + ] + }, + "description": { + "type": [ + "null", + "string" + ] + }, + "type": { + "type": [ + "null", + "string" + ] + }, + "target_selection": { + "type": [ + "null", + "string" + ] + }, + "allocation_method": { + "type": [ + "null", + "string" + ] + }, + "title": { + "type": [ + "null", + "string" + ] + }, + "value_type": { + "type": [ + "null", + "string" + ] + }, + "value": { + "type": [ + "null", + "number" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "type": [ + "null", + "array" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "note": { + "type": [ + "null", + "string" + ] + }, + "user_id": { + "type": [ + "null", + "integer" + ] + }, + "source_url": { + "type": [ + "null", + "string" + ] + }, + "subtotal_price": { + "type": [ + "null", + "number" + ] + }, + "billing_address": { + "properties": { + "phone": { + "type": [ + "null", + "string" + ] + }, + "country": { + "type": [ + "null", + "string" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "address1": { + "type": [ + "null", + "string" + ] + }, + "longitude": { + "type": [ + "null", + "number" + ] + }, + "address2": { + "type": [ + "null", + "string" + ] + }, + "last_name": { + "type": [ + "null", + "string" + ] + }, + "first_name": { + "type": [ + "null", + "string" + ] + }, + "province": { + "type": [ + "null", + "string" + ] + }, + "city": { + "type": [ + "null", + "string" + ] + }, + "company": { + "type": [ + "null", + "string" + ] + }, + "latitude": { + "type": [ + "null", + "number" + ] + }, + "country_code": { + "type": [ + "null", + "string" + ] + }, + "province_code": { + "type": [ + "null", + "string" + ] + }, + "zip": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "landing_site": { + "type": [ + "null", + "string" + ] + }, + "taxes_included": { + "type": [ + "null", + "boolean" + ] + }, + "token": { + "type": [ + "null", + "string" + ] + }, + "app_id": { + "type": [ + "null", + "integer" + ] + }, + "total_tip_received": { + "type": [ + "null", + "string" + ] + }, + "browser_ip": { + "type": [ + "null", + "string" + ] + }, + "discount_codes": { + "items": { + "properties": { + "code": { + "type": [ + "null", + "string" + ] + }, + "amount": { + "type": [ + "null", + "number" + ] + }, + "type": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "type": [ + "null", + "array" + ] + }, + "tax_lines": { + "items": { + "properties": { + "price_set": {}, + "price": { + "type": [ + "null", + "number" + ] + }, + "title": { + "type": [ + "null", + "string" + ] + }, + "rate": { + "type": [ + "null", + "number" + ] + }, + "compare_at": { + "type": [ + "null", + "string" + ] + }, + "position": { + "type": [ + "null", + "integer" + ] + }, + "source": { + "type": [ + "null", + "string" + ] + }, + "zone": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "type": [ + "null", + "array" + ] + }, + "phone": { + "type": [ + "null", + "string" + ] + }, + "note_attributes": { + "items": { + "properties": { + "name": { + "type": [ + "null", + "string" + ] + }, + "value": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "type": [ + "null", + "array" + ] + }, + "fulfillment_status": { + "type": [ + "null", + "string" + ] + }, + "order_status_url": { + "type": [ + "null", + "string" + ] + }, + "client_details": { + "properties": { + "session_hash": { + "type": [ + "null", + "string" + ] + }, + "accept_language": { + "type": [ + "null", + "string" + ] + }, + "browser_width": { + "type": [ + "null", + "integer" + ] + }, + "user_agent": { + "type": [ + "null", + "string" + ] + }, + "browser_ip": { + "type": [ + "null", + "string" + ] + }, + "browser_height": { + "type": [ + "null", + "integer" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "buyer_accepts_marketing": { + "type": [ + "null", + "boolean" + ] + }, + "checkout_token": { + "type": [ + "null", + "string" + ] + }, + "tags": { + "type": [ + "null", + "string" + ] + }, + "financial_status": { + "type": [ + "null", + "string" + ] + }, + "customer_locale": { + "type": [ + "null", + "string" + ] + }, + "checkout_id": { + "type": [ + "null", + "integer" + ] + }, + "total_weight": { + "type": [ + "null", + "integer" + ] + }, + "gateway": { + "type": [ + "null", + "string" + ] + }, + "cart_token": { + "type": [ + "null", + "string" + ] + }, + "cancelled_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "refunds": { + "items": { + "properties": { + "admin_graphql_api_id": { + "type": [ + "null", + "string" + ] + }, + "refund_line_items": { + "items": { + "properties": { + "line_item": { + "properties": { + "applied_discounts": {}, + "total_discount_set": {}, + "pre_tax_price_set": {}, + "price_set": {}, + "grams": { + "type": [ + "null", + "integer" + ] + }, + "compare_at_price": { + "type": [ + "null", + "string" + ] + }, + "destination_location_id": { + "type": [ + "null", + "integer" + ] + }, + "key": { + "type": [ + "null", + "string" + ] + }, + "line_price": { + "type": [ + "null", + "string" + ] + }, + "origin_location_id": { + "type": [ + "null", + "integer" + ] + }, + "applied_discount": { + "type": [ + "null", + "integer" + ] + }, + "fulfillable_quantity": { + "type": [ + "null", + "integer" + ] + }, + "variant_title": { + "type": [ + "null", + "string" + ] + }, + "properties": { + "items": { + "properties": { + "name": { + "type": [ + "null", + "string" + ] + }, + "value": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "type": [ + "null", + "array" + ] + }, + "tax_code": { + "type": [ + "null", + "string" + ] + }, + "discount_allocations": { + "items": { + "properties": { + "discount_application_index": { + "type": [ + "null", + "integer" + ] + }, + "amount_set": {}, + "amount": { + "type": [ + "null", + "number" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "type": [ + "null", + "array" + ] + }, + "admin_graphql_api_id": { + "type": [ + "null", + "string" + ] + }, + "pre_tax_price": { + "type": [ + "null", + "number" + ] + }, + "sku": { + "type": [ + "null", + "string" + ] + }, + "product_exists": { + "type": [ + "null", + "boolean" + ] + }, + "total_discount": { + "type": [ + "null", + "number" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "fulfillment_status": { + "type": [ + "null", + "string" + ] + }, + "gift_card": { + "type": [ + "null", + "boolean" + ] + }, + "id": { + "type": [ + "null", + "integer", + "string" + ] + }, + "taxable": { + "type": [ + "null", + "boolean" + ] + }, + "vendor": { + "type": [ + "null", + "string" + ] + }, + "tax_lines": { + "items": { + "properties": { + "price_set": {}, + "price": { + "type": [ + "null", + "number" + ] + }, + "title": { + "type": [ + "null", + "string" + ] + }, + "rate": { + "type": [ + "null", + "number" + ] + }, + "compare_at": { + "type": [ + "null", + "string" + ] + }, + "position": { + "type": [ + "null", + "integer" + ] + }, + "source": { + "type": [ + "null", + "string" + ] + }, + "zone": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "type": [ + "null", + "array" + ] + }, + "origin_location": { + "properties": { + "country_code": { + "type": [ + "null", + "string" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "address1": { + "type": [ + "null", + "string" + ] + }, + "city": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "integer" + ] + }, + "address2": { + "type": [ + "null", + "string" + ] + }, + "province_code": { + "type": [ + "null", + "string" + ] + }, + "zip": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "price": { + "type": [ + "null", + "number" + ] + }, + "requires_shipping": { + "type": [ + "null", + "boolean" + ] + }, + "fulfillment_service": { + "type": [ + "null", + "string" + ] + }, + "variant_inventory_management": { + "type": [ + "null", + "string" + ] + }, + "title": { + "type": [ + "null", + "string" + ] + }, + "destination_location": { + "properties": { + "country_code": { + "type": [ + "null", + "string" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "address1": { + "type": [ + "null", + "string" + ] + }, + "city": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "integer" + ] + }, + "address2": { + "type": [ + "null", + "string" + ] + }, + "province_code": { + "type": [ + "null", + "string" + ] + }, + "zip": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "quantity": { + "type": [ + "null", + "integer" + ] + }, + "product_id": { + "type": [ + "null", + "integer" + ] + }, + "variant_id": { + "type": [ + "null", + "integer" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "location_id": { + "type": [ + "null", + "integer" + ] + }, + "line_item_id": { + "type": [ + "null", + "integer" + ] + }, + "quantity": { + "type": [ + "null", + "integer" + ] + }, + "id": { + "type": [ + "null", + "integer" + ] + }, + "total_tax": { + "type": [ + "null", + "number" + ] + }, + "restock_type": { + "type": [ + "null", + "string" + ] + }, + "subtotal": { + "type": [ + "null", + "number" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "type": [ + "null", + "array" + ] + }, + "restock": { + "type": [ + "null", + "boolean" + ] + }, + "note": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "integer" + ] + }, + "user_id": { + "type": [ + "null", + "integer" + ] + }, + "created_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "processed_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "order_adjustments": { + "items": { + "properties": { + "order_id": { + "type": [ + "null", + "integer" + ] + }, + "tax_amount": { + "type": [ + "null", + "number" + ] + }, + "refund_id": { + "type": [ + "null", + "integer" + ] + }, + "amount": { + "type": [ + "null", + "number" + ] + }, + "kind": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "integer" + ] + }, + "reason": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "type": [ + "null", + "array" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "type": [ + "null", + "array" + ] + }, + "created_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "updated_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "reference": { + "type": [ + "null", + "string" + ] + } + }, + "type": "object", + "selected": true + }, + "metadata": [ + { + "breadcrumb": [], + "metadata": { + "table-key-properties": [ + "id" + ], + "forced-replication-method": "INCREMENTAL", + "valid-replication-keys": [ + "updated_at" + ], + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "presentment_currency" + ], + "metadata": { + "inclusion": "available", + "selected": false + } + }, + { + "breadcrumb": [ + "properties", + "subtotal_price_set" + ], + "metadata": { + "inclusion": "available", + "selected": false + } + }, + { + "breadcrumb": [ + "properties", + "total_discounts_set" + ], + "metadata": { + "inclusion": "available", + "selected": false + } + }, + { + "breadcrumb": [ + "properties", + "total_line_items_price_set" + ], + "metadata": { + "inclusion": "available", + "selected": false + } + }, + { + "breadcrumb": [ + "properties", + "total_price_set" + ], + "metadata": { + "inclusion": "available", + "selected": false + } + }, + { + "breadcrumb": [ + "properties", + "total_shipping_price_set" + ], + "metadata": { + "inclusion": "available", + "selected": false + } + }, + { + "breadcrumb": [ + "properties", + "total_tax_set" + ], + "metadata": { + "inclusion": "available", + "selected": false + } + }, + { + "breadcrumb": [ + "properties", + "total_price" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "line_items" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "processing_method" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "order_number" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "confirmed" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "total_discounts" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "total_line_items_price" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "order_adjustments" + ], + "metadata": { + "inclusion": "available", + "selected": false + } + }, + { + "breadcrumb": [ + "properties", + "shipping_lines" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "admin_graphql_api_id" + ], + "metadata": { + "inclusion": "available", + "selected": false + } + }, + { + "breadcrumb": [ + "properties", + "device_id" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "cancel_reason" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "currency" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "payment_gateway_names" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "source_identifier" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "id" + ], + "metadata": { + "inclusion": "automatic", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "processed_at" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "referring_site" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "contact_email" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "location_id" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "fulfillments" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "customer" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "test" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "total_tax" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "payment_details" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "number" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "email" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "source_name" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "landing_site_ref" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "shipping_address" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "total_price_usd" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "closed_at" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "discount_applications" + ], + "metadata": { + "inclusion": "available", + "selected": false + } + }, + { + "breadcrumb": [ + "properties", + "name" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "note" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "user_id" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "source_url" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "subtotal_price" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "billing_address" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "landing_site" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "taxes_included" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "token" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "app_id" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "total_tip_received" + ], + "metadata": { + "inclusion": "available", + "selected": false + } + }, + { + "breadcrumb": [ + "properties", + "browser_ip" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "discount_codes" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "tax_lines" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "phone" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "note_attributes" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "fulfillment_status" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "order_status_url" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "client_details" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "buyer_accepts_marketing" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "checkout_token" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "tags" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "financial_status" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "customer_locale" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "checkout_id" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "total_weight" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "gateway" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "cart_token" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "cancelled_at" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "refunds" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "created_at" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "updated_at" + ], + "metadata": { + "inclusion": "automatic", + "selected": true + } + }, + { + "breadcrumb": [ + "properties", + "reference" + ], + "metadata": { + "inclusion": "available", + "selected": true + } + } + ], + "key_properties": [ + "id" + ], + "replication_key": "updated_at", + "replication_method": "INCREMENTAL" +} \ No newline at end of file diff --git a/spikes/async/original_tap.py b/spikes/async/original_tap.py new file mode 100755 index 00000000..7f4983f3 --- /dev/null +++ b/spikes/async/original_tap.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python3 +import math +import shopify +from datetime import datetime, timedelta +import time +import sys +import asyncio +import aiohttp +import json +from urllib.parse import urlparse, urlencode +import singer +from singer import utils, metadata +from singer import Transformer + +LOGGER = singer.get_logger() + + +DATE_WINDOW_SIZE=1 +WRITE_TO_TARGET = False + +class Error(Exception): + """Base exception for the API interaction module""" + +class OutOfOrderIdsError(Error): + """Raised if our expectation of ordering by ID is violated""" + + +def get_objects(config): + updated_at_min = utils.strptime_with_tz(config['start_date']) + end_date = config['end_date'] + stop_time = utils.strptime_with_tz(end_date) if end_date is not None else singer.utils.now().replace(microsecond=0) + date_window_size = DATE_WINDOW_SIZE + results_per_page = config['results_per_page'] + + while updated_at_min < stop_time: + since_id = 1 + + if since_id != 1: + LOGGER.info("Resuming sync from since_id %d", since_id) + + updated_at_max = updated_at_min + timedelta(days=date_window_size) + if updated_at_max > stop_time: + updated_at_max = stop_time + while True: + query_params = { + "since_id": since_id, + "updated_at_min": updated_at_min, + "updated_at_max": updated_at_max, + "limit": results_per_page, + "status": "any" + } + objects = shopify.Order.find(**query_params) + for obj in objects: + if obj.id < since_id: + raise OutOfOrderIdsError("obj.id < since_id: {} < {}".format( + obj.id, since_id)) + yield obj + + if len(objects) < results_per_page: + break + + if objects[-1].id != max([o.id for o in objects]): + raise OutOfOrderIdsError("{} is not the max id in objects ({})".format( + objects[-1].id, max([o.id for o in objects]))) + since_id = objects[-1].id + + + updated_at_min = updated_at_max + + +def sync(config): + for obj in get_objects(config): + yield obj.to_dict() + + +@utils.handle_top_exception(LOGGER) +def ORIGINAL_TAP(config): + shop_url = "https://{k}:{p}@{s}.myshopify.com/admin".format(k=config['api_key'],p=config['api_password'],s=config['shop_name']) + shopify.ShopifyResource.set_site(shop_url) + + start_time = time.time() + if WRITE_TO_TARGET: + singer.write_schema(config['stream_id'], config['stream_schema'], config['key_properties'], bookmark_properties=config['replication_key']) + rec_count = 0 + with Transformer() as transformer: + for rec in sync(config): + extraction_time = singer.utils.now() + record_metadata = metadata.to_map(config['stream_metadata']) + rec = transformer.transform(rec, config['stream_schema'], record_metadata) + if WRITE_TO_TARGET: + singer.write_record(config['stream_id'], rec, time_extracted=extraction_time) + rec_count += 1 + duration = time.strftime("%H:%M:%S", time.gmtime(time.time() - start_time)) + + return (rec_count, duration) \ No newline at end of file diff --git a/spikes/async/page_async_tap.py b/spikes/async/page_async_tap.py new file mode 100755 index 00000000..6d21d079 --- /dev/null +++ b/spikes/async/page_async_tap.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python3 +import math +import shopify +from datetime import datetime +import time +import sys +import asyncio +import aiohttp +import json +from urllib.parse import urlparse, urlencode +import singer +from singer import utils +from singer import Transformer + + +LOGGER = singer.get_logger() + + +SHOP_NAME = '' +API_KEY = '' +PASSWORD = '' +shop_url = "https://%s:%s@%s.myshopify.com/admin" % (API_KEY, PASSWORD, SHOP_NAME) +shopify.ShopifyResource.set_site(shop_url) + + +RESULT_KEY = 'orders' +ENDPOINT = '/orders' +MAX_RETRIES = 5 +RESULTS_PER_PAGE = 250 +START_DATE = "2019-03-26 00:00:00" +END_DATE = "2019-03-27 20:00:00" + +with open('orders-schema.json') as f: + STREAM = json.load(f) + +STREAM_ID = STREAM['tap_stream_id'] +SCHEMA = STREAM['schema'] +KEY_PROPS = STREAM['key_properties'] +REPLICATION_KEY = STREAM['replication_key'] + +# Streams that can run Async +ASYNC_AVAILABLE_STREAMS = ['orders', 'products', 'customers', 'abandoned_checkouts'] + +class RunAsync(): + def __init__(self, schema, stream_id, endpoint, result_key, params, retry_limit, results_per_page): + self.schema = schema + self.stream_id = stream_id + self.endpoint = endpoint + self.result_key = result_key + self.params = params + self.retry_limit = retry_limit + self.results_per_page = results_per_page + + self.params['limit'] = str(self.results_per_page) + self.base_url = shopify.ShopifyResource.get_site() + self.current_shop = shopify.Shop.current() + self.bucket_size = 80 if self.current_shop.plan_name == "shopify_plus" else 40 + self.shop_display_url = "https://{}".format(self.current_shop.myshopify_domain) + self.rec_count = 0 + + async def _get_async(self, url, headers=None, params=None, retry_attempt=0): + async with aiohttp.ClientSession() as session: + async with session.get(url=url, headers=headers, params=params) as response: + if response.status == 429: + sleep_time_str = math.floor(float(response.headers.get('Retry-After'))) + LOGGER.info("Received 429 -- sleeping for {} seconds".format(sleep_time_str)) + await asyncio.sleep(sleep_time_str) + return await self._get_async(url, headers, params, retry_attempt=retry_attempt+1) + if response.status in range(500, 599): + return await self._get_async(url, headers, params, retry_attempt=retry_attempt+1) + else: + return await response.json() + + async def _request_count(self): + endpoint = '{}/count.json'.format(self.endpoint) + url = self.base_url + endpoint + resp = await self._get_async(url, params=self.params) + LOGGER.info("GET {}{}?{}".format(self.shop_display_url, endpoint, urlencode(self.params))) + return resp['count'] + + async def _request(self, job): + endpoint = '{}.json'.format(self.endpoint) + url = self.base_url + endpoint + resp = await self._get_async(url, params=job['params']) + LOGGER.info("GET {}{}?{}".format(self.shop_display_url, endpoint, urlencode(job['params']))) + return resp + + async def _runner(self): + result_set_size = await self._request_count() + num_pages = math.ceil(result_set_size/self.results_per_page) + jobs = [{'params': { **self.params, 'page': p+1 }} for p in range(num_pages)] + chunked_jobs = utils.chunk(jobs, self.bucket_size) + + for chunk_of_jobs in chunked_jobs: + futures = [self._request(i) for i in chunk_of_jobs] + for i, future in enumerate(asyncio.as_completed(futures)): + result = await future + if self.result_key in result: + self._write_singer_records(result[self.result_key]) + + def _write_singer_records(self, recs): + with Transformer() as transformer: + for rec in recs: + extraction_time = singer.utils.now() + transformed_rec = transformer.transform(rec, self.schema) + singer.write_record(self.stream_id, transformed_rec, time_extracted=extraction_time) + self.rec_count += 1 + + def Run(self): + asyncio.run(self._runner()) + return self.rec_count + + @classmethod + def sync(cls, schema, stream_id, endpoint, result_key, params, retry_limit=5, results_per_page=250): + return RunAsync(schema, stream_id, endpoint, result_key, params, retry_limit, results_per_page).Run() + + +def sync_async(): + """ + Gets objects for endpoint, and writes singer records. + Returns the total number of records received and + emitted to target. + """ + updated_at_min = utils.strptime_with_tz(START_DATE) + end_date = END_DATE + updated_at_max = utils.strptime_with_tz(end_date) if end_date is not None else singer.utils.now().replace(microsecond=0) + results_per_page = RESULTS_PER_PAGE + + DT_FMT = '%Y-%m-%dT%H:%M:%S' + query_params = { + "updated_at_min": utils.strftime(updated_at_min, format_str=DT_FMT), + "updated_at_max": utils.strftime(updated_at_max, format_str=DT_FMT), + "status": "any" + } + + return RunAsync.sync( + schema = SCHEMA, + stream_id = STREAM_ID, + endpoint = ENDPOINT, + result_key = RESULT_KEY, + params = query_params, + retry_limit = MAX_RETRIES, + results_per_page = results_per_page + ) + + +@utils.handle_top_exception(LOGGER) +def main(): + start_time = time.time() + singer.write_schema(STREAM_ID, SCHEMA, KEY_PROPS, bookmark_properties=REPLICATION_KEY) + rec_count = sync_async() + duration = time.strftime("%H:%M:%S", time.gmtime(time.time() - start_time)) + + LOGGER.info('--------------------------------------------') + LOGGER.info("{}: {}".format(STREAM_ID, rec_count)) + LOGGER.info("Duration: {}".format(duration)) + LOGGER.info('--------------------------------------------') + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/spikes/async/sidd_async_tap.py b/spikes/async/sidd_async_tap.py new file mode 100755 index 00000000..f7f8f4b9 --- /dev/null +++ b/spikes/async/sidd_async_tap.py @@ -0,0 +1,191 @@ +#!/usr/bin/env python3 +import math +import shopify +from datetime import datetime, timedelta +import time +import sys +import asyncio +import aiohttp +import json +from urllib.parse import urlparse, urlencode +import singer +from singer import utils +from singer import Transformer + + +LOGGER = singer.get_logger() +WRITE_TO_TARGET = False + + +def get_hourly_chunks(start, end, num_hours=1): + ranges = [] + NUM_SECONDS = num_hours * 60 * 60 + st = utils.strptime_with_tz(start) + ed = utils.strptime_with_tz(end) + + while st < ed: + curr_ed = st + timedelta(seconds=NUM_SECONDS) + if curr_ed > ed: + curr_ed = ed + ranges.append({'updated_at_min': st, 'updated_at_max': curr_ed}) + st = curr_ed + timedelta(seconds=1) + + return ranges + +class Error(Exception): + """Base exception for the API interaction module""" + +class OutOfOrderIdsError(Error): + """Raised if our expectation of ordering by ID is violated""" + +class RunAsync(): + def __init__(self, schema, stream_id, endpoint, result_key, params, retry_limit, results_per_page): + self.schema = schema + self.stream_id = stream_id + self.endpoint = endpoint + self.result_key = result_key + self.params = params + self.retry_limit = retry_limit + self.results_per_page = results_per_page + + self.params['limit'] = str(self.results_per_page) + self.base_url = shopify.ShopifyResource.get_site() + self.current_shop = shopify.Shop.current() + self.bucket_size = 80 if self.current_shop.plan_name == "shopify_plus" else 40 + self.shop_display_url = "https://{}".format(self.current_shop.myshopify_domain) + self.rec_count = 0 + self.DT_FMT = '%Y-%m-%dT%H:%M:%S' + + + async def _get_async(self, url, headers=None, params=None, retry_attempt=0): + headers = {**headers, "Connection": "close"} if headers else {"Connection": "close"} + try: + async with aiohttp.ClientSession() as session: + async with session.get(url=url, headers=headers, params=params) as response: + if response.status == 200: + return await response.json() + elif response.status == 429: + sleep_time_str = math.floor(float(response.headers.get('Retry-After'))) + LOGGER.info("Received 429 -- sleeping for {} seconds".format(sleep_time_str)) + await asyncio.sleep(sleep_time_str) + return await self._get_async(url, headers, params, retry_attempt=retry_attempt+1) + elif response.status in range(500, 599): + if retry_attempt <= self.retry_limit: + return await self._get_async(url, headers, params, retry_attempt=retry_attempt+1) + else: + msg = "Failed after {} retry attempts".format(self.retry_limit) + LOGGER.error(msg) + raise Exception(msg) + except Exception as e: + if retry_attempt <= self.retry_limit: + return await self._get_async(url, headers, params, retry_attempt=retry_attempt+1) + else: + msg = "Failed after {} retry attempts.\nError:\n{}\n".format(self.retry_limit, e) + LOGGER.error(msg) + raise Exception(msg) + + async def _request(self, job): + endpoint = '{}.json'.format(self.endpoint) + url = self.base_url + endpoint + since_id = 1 + DT_FMT = '%Y-%m-%dT%H:%M:%S' + results = [] + while True: + params = { + **self.params, + "since_id": since_id, + "updated_at_min": utils.strftime(job['updated_at_min'], format_str=DT_FMT), + "updated_at_max": utils.strftime(job['updated_at_max'], format_str=DT_FMT) + } + resp = await self._get_async(url, params=params) + LOGGER.info("GET {}{}?{}".format(self.shop_display_url, endpoint, urlencode(params))) + + objects = [] + if self.result_key in resp: + for obj in resp[self.result_key]: + if obj['id'] < since_id: + err_msg = "obj['id'] < since_id: {} < {}".format(obj['id'], since_id) + raise OutOfOrderIdsError(err_msg) + objects.append(obj) + results += objects + + if len(objects) < self.results_per_page: + break + + max_id = max([o['id'] for o in objects]) + if objects[-1]['id'] != max_id: + err_msg = "{} is not the max id in objects ({})".format(objects[-1]['id'], max_id) + raise OutOfOrderIdsError(err_msg) + + since_id = objects[-1]['id'] + return results + + async def _runner(self): + hour_windows = [h for h in get_hourly_chunks(self.params['updated_at_min'], self.params['updated_at_max'])] + chunked_jobs = utils.chunk(hour_windows, self.bucket_size) + + for chunk_of_jobs in chunked_jobs: + futures = [self._request(i) for i in chunk_of_jobs] + for i, future in enumerate(asyncio.as_completed(futures)): + results = await future + self._write_singer_records(results) + + def _write_singer_records(self, recs): + with Transformer() as transformer: + for rec in recs: + if WRITE_TO_TARGET: + extraction_time = singer.utils.now() + transformed_rec = transformer.transform(rec, self.schema) + singer.write_record(self.stream_id, transformed_rec, time_extracted=extraction_time) + self.rec_count += 1 + + def Run(self): + asyncio.run(self._runner()) + return self.rec_count + + @classmethod + def sync(cls, schema, stream_id, endpoint, result_key, params, retry_limit=5, results_per_page=250): + return RunAsync(schema, stream_id, endpoint, result_key, params, retry_limit, results_per_page).Run() + + +def sync_async(config): + """ + Gets objects for endpoint, and writes singer records. + Returns the total number of records received and + emitted to target. + """ + updated_at_min = utils.strptime_with_tz(config['start_date']) + end_date = config['end_date'] + updated_at_max = utils.strptime_with_tz(end_date) if end_date is not None else singer.utils.now().replace(microsecond=0) + results_per_page = config['results_per_page'] + + DT_FMT = '%Y-%m-%dT%H:%M:%S' + query_params = { + "updated_at_min": utils.strftime(updated_at_min, format_str=DT_FMT), + "updated_at_max": utils.strftime(updated_at_max, format_str=DT_FMT), + "status": "any" + } + + return RunAsync.sync( + schema = config['stream_schema'], + stream_id = config['stream_id'], + endpoint = config['stream_endpoint'], + result_key = config['stream_result_key'], + params = query_params, + retry_limit = config['max_retries'], + results_per_page = results_per_page + ) + + +@utils.handle_top_exception(LOGGER) +def SIDD_ASYNC_TAP(config): + shop_url = "https://{k}:{p}@{s}.myshopify.com/admin".format(k=config['api_key'],p=config['api_password'],s=config['shop_name']) + shopify.ShopifyResource.set_site(shop_url) + + start_time = time.time() + if WRITE_TO_TARGET: + singer.write_schema(config['stream_id'], config['stream_schema'], config['key_properties'], bookmark_properties=config['replication_key']) + rec_count = sync_async(config) + duration = time.strftime("%H:%M:%S", time.gmtime(time.time() - start_time)) + + return (rec_count, duration) \ No newline at end of file diff --git a/spikes/async/sidd_endpoint_tap.py b/spikes/async/sidd_endpoint_tap.py new file mode 100755 index 00000000..e1d18b6c --- /dev/null +++ b/spikes/async/sidd_endpoint_tap.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 +import math +import shopify +from datetime import datetime, timedelta +import time +import sys +import asyncio +import aiohttp +import json +from urllib.parse import urlparse, urlencode +import singer +from singer import utils, metadata +from singer import Transformer +import requests + +LOGGER = singer.get_logger() + +DATE_WINDOW_SIZE=1 +WRITE_TO_TARGET = False + + +class Error(Exception): + """Base exception for the API interaction module""" + +class OutOfOrderIdsError(Error): + """Raised if our expectation of ordering by ID is violated""" + +def get(url, headers=None, params=None, retry_attempt=0): + with requests.get(url=url, headers=headers, params=params) as response: + if response.status_code == 429: + sleep_time_str = math.floor(float(response.headers.get('Retry-After'))) + LOGGER.info("Received 429 -- sleeping for {} seconds".format(sleep_time_str)) + time.sleep(sleep_time_str) + return get(url, headers, params, retry_attempt=retry_attempt+1) + if response.status_code in range(500, 599): + return get(url, headers, params, retry_attempt=retry_attempt+1) + else: + return response.json() + +def request(endpoint, params, config): + endpoint = '{}.json'.format(endpoint) + base_url = shopify.ShopifyResource.get_site() + current_shop = shopify.Shop.current() + shop_display_url = "https://{}".format(current_shop.myshopify_domain) + + url = base_url + endpoint + resp = get(url, params=params) + LOGGER.info("GET {}{}?{}".format(shop_display_url, endpoint, urlencode(params))) + return resp[config['stream_result_key']] if config['stream_result_key'] in resp else [] + + +def get_objects(config): + updated_at_min = utils.strptime_with_tz(config['start_date']) + end_date = config['end_date'] + stop_time = utils.strptime_with_tz(end_date) if end_date is not None else singer.utils.now().replace(microsecond=0) + date_window_size = DATE_WINDOW_SIZE + results_per_page = config['results_per_page'] + + while updated_at_min < stop_time: + since_id = 1 + + if since_id != 1: + LOGGER.info("Resuming sync from since_id %d", since_id) + + updated_at_max = updated_at_min + timedelta(days=date_window_size) + if updated_at_max > stop_time: + updated_at_max = stop_time + while True: + query_params = { + "since_id": since_id, + "updated_at_min": updated_at_min, + "updated_at_max": updated_at_max, + "limit": results_per_page, + "status": "any" + } + + objects = request(config['stream_endpoint'], query_params, config) + + for obj in objects: + if obj['id'] < since_id: + raise OutOfOrderIdsError("obj['id'] < since_id: {} < {}".format( + obj['id'], since_id)) + yield obj + + if len(objects) < results_per_page: + break + + if objects[-1]['id'] != max([o['id'] for o in objects]): + raise OutOfOrderIdsError("{} is not the max id in objects ({})".format( + objects[-1]['id'], max([o['id'] for o in objects]))) + since_id = objects[-1]['id'] + + updated_at_min = updated_at_max + + +def sync(config): + for obj in get_objects(config): + yield obj + + +@utils.handle_top_exception(LOGGER) +def SIDD_ENDPOINT_TAP(config): + shop_url = "https://{k}:{p}@{s}.myshopify.com/admin".format(k=config['api_key'],p=config['api_password'],s=config['shop_name']) + shopify.ShopifyResource.set_site(shop_url) + + start_time = time.time() + if WRITE_TO_TARGET: + singer.write_schema(config['stream_id'], config['stream_schema'], config['key_properties'], bookmark_properties=config['replication_key']) + rec_count = 0 + with Transformer() as transformer: + for rec in sync(config): + extraction_time = singer.utils.now() + record_metadata = metadata.to_map(config['stream_metadata']) + rec = transformer.transform(rec, config['stream_schema'], record_metadata) + if WRITE_TO_TARGET: + singer.write_record(config['stream_id'], rec, time_extracted=extraction_time) + rec_count += 1 + duration = time.strftime("%H:%M:%S", time.gmtime(time.time() - start_time)) + + return (rec_count, duration) \ No newline at end of file diff --git a/spikes/async/tester.py b/spikes/async/tester.py new file mode 100755 index 00000000..3347251f --- /dev/null +++ b/spikes/async/tester.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python3 +import json +import time +from prettytable import PrettyTable +from original_tap import ORIGINAL_TAP +from sidd_endpoint_tap import SIDD_ENDPOINT_TAP +from sidd_async_tap import SIDD_ASYNC_TAP + +RUN_ORIGINAL_TAP = False +RUN_SIDD_ENDPOINT_TAP = True +RUN_SIDD_ASYNC_TAP = True + + +def main(): + with open('credentials.json') as f: + credentials = json.load(f) + + with open('orders-schema.json') as f: + stream = json.load(f) + + config = { + 'shop_name': credentials['shop_name'], + 'api_key': credentials['api_key'], + 'api_password': credentials['api_password'], + 'start_date': "2019-02-28 00:00:00", + 'end_date': "2019-03-29 23:59:59", + 'max_retries': 15, + 'results_per_page': 250, + 'stream_result_key': 'orders', + 'stream_endpoint': '/orders', + 'stream_id': stream['tap_stream_id'], + 'stream_schema': stream['schema'], + 'stream_key_props': stream['key_properties'], + 'stream_replication_key': stream['replication_key'], + 'stream_metadata': stream['metadata'] + } + + results = {} + if RUN_ORIGINAL_TAP: + rec_count, duration = ORIGINAL_TAP(config) + results['original_tap'] = {'rec_count': rec_count, 'duration': duration} + + if RUN_SIDD_ENDPOINT_TAP: + rec_count, duration = SIDD_ENDPOINT_TAP(config) + results['sidd_endpoint_tap'] = {'rec_count': rec_count, 'duration': duration} + + if RUN_SIDD_ASYNC_TAP: + rec_count, duration = SIDD_ASYNC_TAP(config) + results['sidd_async_tap'] = {'rec_count': rec_count, 'duration': duration} + + for k,v in results.items(): + div = "-"*50 + info_msg = "{d}\n {name}\n{d}".format(name=k, d=div) + info_msg += "\n Num. Records: {r}".format(r=v['rec_count']) + info_msg += "\n Duration : {r}".format(r=v['duration']) + info_msg += "\n{d}".format(d=div) + print(info_msg) + + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/tap_shopify/streams/base.py b/tap_shopify/streams/base.py index 623aa2d7..75db90f5 100755 --- a/tap_shopify/streams/base.py +++ b/tap_shopify/streams/base.py @@ -1,6 +1,7 @@ import math import functools import datetime +from datetime import datetime, timedelta import sys import backoff import pyactiveresource @@ -252,46 +253,95 @@ def __init__(self, schema, stream_id, endpoint, result_key, params, retry_limit, self.bucket_size = 80 if self.current_shop.plan_name == "shopify_plus" else 40 self.shop_display_url = "https://{}".format(self.current_shop.myshopify_domain) self.rec_count = 0 + self.DT_FMT = '%Y-%m-%dT%H:%M:%S' + + def get_hourly_chunks(self, start, end, num_hours=1): + ranges = [] + NUM_SECONDS = num_hours * 60 * 60 + st = utils.strptime_with_tz(start) + ed = utils.strptime_with_tz(end) + + while st < ed: + curr_ed = st + timedelta(seconds=NUM_SECONDS) + if curr_ed > ed: + curr_ed = ed + ranges.append({'updated_at_min': st, 'updated_at_max': curr_ed}) + st = curr_ed + timedelta(seconds=1) + + return ranges async def _get_async(self, url, headers=None, params=None, retry_attempt=0): - async with aiohttp.ClientSession() as session: - async with session.get(url=url, headers=headers, params=params) as response: - if response.status == 429: - sleep_time_str = math.floor(float(response.headers.get('Retry-After'))) - LOGGER.info("Received 429 -- sleeping for {} seconds".format(sleep_time_str)) - await asyncio.sleep(sleep_time_str) - return await self._get_async(url, headers, params, retry_attempt=retry_attempt+1) - if response.status in range(500, 599): - return await self._get_async(url, headers, params, retry_attempt=retry_attempt+1) - else: - return await response.json() - - async def _request_count(self): - endpoint = '{}/count.json'.format(self.endpoint) - url = self.base_url + endpoint - resp = await self._get_async(url, params=self.params) - LOGGER.info("GET {}{}?{}".format(self.shop_display_url, endpoint, urlencode(self.params))) - return resp['count'] + headers = {**headers, "Connection": "close"} if headers else {"Connection": "close"} + try: + async with aiohttp.ClientSession() as session: + async with session.get(url=url, headers=headers, params=params) as response: + if response.status == 200: + return await response.json() + elif response.status == 429: + sleep_time_str = math.floor(float(response.headers.get('Retry-After'))) + LOGGER.info("Received 429 -- sleeping for {} seconds".format(sleep_time_str)) + await asyncio.sleep(sleep_time_str) + return await self._get_async(url, headers, params, retry_attempt=retry_attempt+1) + elif response.status in range(500, 599): + if retry_attempt <= self.retry_limit: + return await self._get_async(url, headers, params, retry_attempt=retry_attempt+1) + else: + msg = "Failed after {} retry attempts".format(self.retry_limit) + LOGGER.error(msg) + raise Exception(msg) + except Exception as e: + if retry_attempt <= self.retry_limit: + return await self._get_async(url, headers, params, retry_attempt=retry_attempt+1) + else: + msg = "Failed after {} retry attempts.\nError:\n{}\n".format(self.retry_limit, e) + LOGGER.error(msg) + raise Exception(msg) async def _request(self, job): endpoint = '{}.json'.format(self.endpoint) url = self.base_url + endpoint - resp = await self._get_async(url, params=job['params']) - LOGGER.info("GET {}{}?{}".format(self.shop_display_url, endpoint, urlencode(job['params']))) - return resp + since_id = 1 + DT_FMT = '%Y-%m-%dT%H:%M:%S' + results = [] + while True: + params = { + **self.params, + "since_id": since_id, + "updated_at_min": utils.strftime(job['updated_at_min'], format_str=DT_FMT), + "updated_at_max": utils.strftime(job['updated_at_max'], format_str=DT_FMT) + } + resp = await self._get_async(url, params=params) + LOGGER.info("GET {}{}?{}".format(self.shop_display_url, endpoint, urlencode(params))) + + objects = [] + if self.result_key in resp: + for obj in resp[self.result_key]: + if obj['id'] < since_id: + err_msg = "obj['id'] < since_id: {} < {}".format(obj['id'], since_id) + raise OutOfOrderIdsError(err_msg) + objects.append(obj) + results += objects + + if len(objects) < self.results_per_page: + break + + max_id = max([o['id'] for o in objects]) + if objects[-1]['id'] != max_id: + err_msg = "{} is not the max id in objects ({})".format(objects[-1]['id'], max_id) + raise OutOfOrderIdsError(err_msg) + + since_id = objects[-1]['id'] + return results async def _runner(self): - result_set_size = await self._request_count() - num_pages = math.ceil(result_set_size/self.results_per_page) - jobs = [{'params': { **self.params, 'page': p+1 }} for p in range(num_pages)] - chunked_jobs = utils.chunk(jobs, self.bucket_size) + hour_windows = [h for h in self.get_hourly_chunks(self.params['updated_at_min'], self.params['updated_at_max'])] + chunked_jobs = utils.chunk(hour_windows, self.bucket_size) for chunk_of_jobs in chunked_jobs: futures = [self._request(i) for i in chunk_of_jobs] for i, future in enumerate(asyncio.as_completed(futures)): - result = await future - if self.result_key in result: - self._write_singer_records(result[self.result_key]) + results = await future + self._write_singer_records(results) def _write_singer_records(self, recs): with Transformer() as transformer: From bb9044fc3c5bab5d5b29bbcc48dec6a1ca3d019a Mon Sep 17 00:00:00 2001 From: Siddharth Sudheer Date: Wed, 3 Apr 2019 15:40:32 -0400 Subject: [PATCH 6/6] made abandoned checkouts async capable --- tap_shopify/streams/abandoned_checkouts.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tap_shopify/streams/abandoned_checkouts.py b/tap_shopify/streams/abandoned_checkouts.py index 9dd7798d..fdbb2807 100755 --- a/tap_shopify/streams/abandoned_checkouts.py +++ b/tap_shopify/streams/abandoned_checkouts.py @@ -5,5 +5,8 @@ class AbandonedCheckouts(Stream): name = 'abandoned_checkouts' replication_object = shopify.Checkout + endpoint = "/checkouts" + result_key = "checkouts" + async_available = True Context.stream_objects['abandoned_checkouts'] = AbandonedCheckouts