diff --git a/finance_dl/amazon.py b/finance_dl/amazon.py index f59f81a..a68d76f 100644 --- a/finance_dl/amazon.py +++ b/finance_dl/amazon.py @@ -108,7 +108,6 @@ class Domain(): # Find invoices. your_orders: str - archived_orders: str invoice: str invoice_link: List[str] order_summary: str @@ -140,7 +139,6 @@ def __init__(self) -> None: sign_out='Sign Out', your_orders='Your Orders', - archived_orders='Archived Orders', invoice='Invoice', invoice_link=["View order", "View invoice"], # View invoice -> regular/digital order, View order -> Amazon Fresh @@ -173,7 +171,6 @@ def __init__(self) -> None: sign_out='Sign out', your_orders='Your Orders', - archived_orders='Archived Orders', invoice='Invoice', invoice_link=["View order", "View invoice"], # View invoice -> regular/digital order, View order -> Amazon Fresh @@ -205,7 +202,6 @@ def __init__(self) -> None: sign_out='Abmelden', your_orders='Meine Bestellungen', - archived_orders='Archivierte Bestellungen', invoice='Rechnung', invoice_link=["Bestelldetails anzeigen"], fresh_fallback=None, @@ -329,7 +325,7 @@ def get_invoice_path(self, year, order_id): return os.path.join(self.output_directory, order_id + '.html') def get_order_id(self, href) -> str: - m = re.match('.*[&?]orderI[Dd]=((?:D)?[0-9\\-]+)(?:&.*)?$', href) + m = re.match('.*[&?]orderID=((?:D)?[0-9\\-]+)(?:&.*)?$', href) if m is None: raise RuntimeError( 'Failed to parse order ID from href %r' % (href, )) @@ -363,30 +359,34 @@ def get_invoice_urls(): # order summary is hidden behind submenu which requires a click to be visible def invoice_finder(): - # order summary link is visible on page - elements_raw = self.driver.find_elements( - By.XPATH, '//a[contains(@href, "orderID=")]') - elements = [] - for invoice_link in elements_raw: - if invoice_link.text not in self.domain.invoice_link: - # skip invoice if label is not known - # different labels are possible e.g. for regular orders vs. Amazon fresh - if invoice_link.text != "": - # log non-empty link texts -> may be new type - logger.debug( - 'Skipping invoice due to unknown invoice_link.text: %s', - invoice_link.text) - else: - elements.append(invoice_link) - return elements - + if not self.domain.order_summary_hidden: + # order summary link is visible on page + return self.driver.find_elements( + By.XPATH, '//a[contains(@href, "orderID=")]') + else: + # order summary link is hidden in submenu for each order + elements = self.driver.find_elements(By.XPATH, + '//a[@class="a-popover-trigger a-declarative"]') + return [a for a in elements if a.text == self.domain.invoice] + if initial_iteration: invoices = invoice_finder() else: invoices, = self.wait_and_return(invoice_finder) initial_iteration = False + last_order_id = None + def invoice_link_finder(invoice_link): + if invoice_link.text not in self.domain.invoice_link: + # skip invoice if label is not known + # different labels are possible e.g. for regular orders vs. Amazon fresh + if invoice_link.text != "": + # log non-empty link texts -> may be new type + logger.debug( + 'Skipping invoice due to unknown invoice_link.text: %s', + invoice_link.text) + return (False, False) href = invoice_link.get_attribute('href') order_id = self.get_order_id(href) if self.domain.fresh_fallback is not None and invoice_link.text == self.domain.fresh_fallback: @@ -397,39 +397,26 @@ def invoice_link_finder(invoice_link): tokens[-1] = f"gp/css/summary/print.html?orderID={order_id}" href = "/".join(tokens) return (order_id, href) - - def invoice_link_finder_hidden(invoice_link): - # get order id to later find the correct summary link - order_id=self.get_order_id(invoice_link.get_attribute('href')) - - # get parent element to search for invoice menu button (has no orderID specified) - parent=invoice_link.find_element(By.XPATH,"./..") - # leading dot in './/' specifies to only search in children - popover=parent.find_elements(By.XPATH,'.//a[contains(@href, "invoice/invoice.html")]') - # depending on the order group the XPATH may be different - if len(popover) == 0: - popover=parent.find_elements( - By.XPATH, - f'.//a[contains(text(), {self.domain.invoice}) and @class="a-popover-trigger a-declarative"]') - - # open invoice popover to extract invoice link - popover[0].click() - - # submenu containing order summary takes some time to load after click - summary_link, = self.wait_and_locate( - (By.XPATH,'//a[contains(@href,"{}") and contains(text(),"{}")]'.format(order_id, self.domain.order_summary))) - if summary_link: - href = summary_link.get_attribute('href') - return (order_id, href) - else: - logger.info('Link extraction failed for order id: %r', order_id) - return (False, False) + + def invoice_link_finder_hidden(): + # submenu containing order summary takes some time to load after click + # search for order summary link and compare order_id + # repeat until order_id is different to last order_id + summary_links = self.driver.find_elements(By.LINK_TEXT, + self.domain.order_summary) + if summary_links: + href = summary_links[0].get_attribute('href') + order_id = self.get_order_id(href) + if order_id != last_order_id: + return (order_id, href) + return False for invoice_link in invoices: if not self.domain.order_summary_hidden: (order_id, href) = invoice_link_finder(invoice_link) else: - (order_id, href) = invoice_link_finder_hidden(invoice_link) + invoice_link.click() + (order_id, href), = self.wait_and_return(invoice_link_finder_hidden) if order_id: if order_id in order_ids_seen: logger.info('Skipping already-seen order id: %r', order_id) @@ -440,6 +427,7 @@ def invoice_link_finder_hidden(invoice_link): logger.info('Found order \'{}\''.format(order_id)) invoice_hrefs.append((href, order_id)) order_ids_seen.add(order_id) + last_order_id = order_id # Find next link next_links = self.find_elements_by_descendant_text_match( @@ -457,9 +445,7 @@ def retrieve_all_order_groups(): order_select_index = 0 while True: - (order_filter,), = self.wait_and_return( - lambda: self.find_visible_elements(By.XPATH, '//select[@name="timeFilter"]') - ) + order_filter, = self.wait_and_locate((By.CSS_SELECTOR, '#time-filter, #orderFilter')) order_select = Select(order_filter) num_options = len(order_select.options) if order_select_index >= num_options: @@ -468,7 +454,7 @@ def retrieve_all_order_groups(): order_select_index] option_text = option.text.strip() order_select_index += 1 - if option_text == self.domain.archived_orders: + if option_text == 'Archived Orders': continue if self.order_groups is not None and option_text not in self.order_groups: logger.info('Skipping order group: %r', option_text)