diff --git a/.gitignore b/.gitignore index 8d35cb3..d118619 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,5 @@ __pycache__ *.pyc + +.vscode +.devcontainer diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..ec30735 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,26 @@ +# Base image +FROM python:3.9.5-slim as base + +# Build stage +FROM base as builder + +# Dependency install directory +RUN mkdir /install +WORKDIR /install + +# Install dependencies +COPY ./requirements.txt . +RUN pip install --prefix /install -r requirements.txt + +# Run stage +FROM base + +WORKDIR /usr/src/app + +# Fetch dependencies from the build stage +COPY --from=builder /install /usr/local + +COPY ./doctoshotgun.py . + +# Entrypoint - Run the main script +ENTRYPOINT ["./doctoshotgun.py"] diff --git a/README.md b/README.md index 94fb807..4a4c86e 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,10 @@ # DOCTOSHOTGUN -This script lets you automatically book a vaccine slot on Doctolib for today or -tomorrow, following rules from the French Government. - +This script lets you automatically book a vaccine slot on Doctolib in France and in Germany in +the next seven days.

- +

## Python dependencies @@ -14,9 +13,13 @@ tomorrow, following rules from the French Government. - cloudscraper - dateutil - termcolor +- colorama +- playsound (optional) ## How to use it +You need python3 for this script. If you don't have it, please [install it first](https://www.python.org/). + Install dependencies: ``` @@ -26,15 +29,54 @@ pip install -r requirements.txt Run: ``` -./doctoshotgun.py [password] +./doctoshotgun.py [] ``` -Optional arguments: +Further optional arguments: + +``` +--debug, -d show debug information +--pfizer, -z select only Pfizer vaccine +--moderna, -m select only Moderna vaccine +--janssen, -j select only Janssen vaccine +--astrazeneca, -a select only AstraZeneca vaccine +--only-second, -2 select only second dose +--only-third, -3 select only third dose +--patient PATIENT, -p PATIENT + give patient ID +--time-window TIME_WINDOW, -t TIME_WINDOW + set how many next days the script look for slots (default = 7) +--center CENTER, -c CENTER + filter centers +--center-regex CENTER_REGEX + filter centers by regex +--center-exclude CENTER_EXCLUDE, -x CENTER_EXCLUDE + exclude centers +--center-exclude-regex CENTER_EXCLUDE_REGEX + exclude centers by regex +--include-neighbor-city, -n + include neighboring cities +--start-date START_DATE + first date on which you want to book the first slot (format should be DD/MM/YYYY) +--end-date END_DATE last date on which you want to book the first slot (format should be DD/MM/YYYY) +--dry-run do not really book the slot +--code CODE 2FA code +--proxy PROXY, -P PROXY + configure a network proxy to use +``` + +### With Docker + +Build the image: ``` ---center "" [--center "" …] : filter centers to only choose one from the provided list ---patient : select patient for which book a slot ---debug : display debug information +docker build . -t doctoshotgun +``` + +Run the container: + +``` +docker run -it doctoshotgun [] ``` ### Multiple cities @@ -42,15 +84,15 @@ Optional arguments: You can also look for slot in multiple cities at the same time. Cities must be separated by commas: ``` -$ ./doctoshotgun.py ,, [password] +$ ./doctoshotgun.py ,, [] ``` ### Filter on centers -You can give name of centers in which you want specifictly looking for: +You can give name of centers in which you want specifically looking for: ``` -$ ./doctoshotgun.py paris roger.philibert@gmail.com \ +$ ./doctoshotgun.py fr paris roger.philibert@gmail.com \ --center "Centre de Vaccination Covid 19 - Ville de Paris" \ --center "Centre de Vaccination du 7eme arrondissement de Paris - Gymnase Camou" ``` @@ -60,7 +102,7 @@ $ ./doctoshotgun.py paris roger.philibert@gmail.com \ For doctolib accounts with more thant one patient, you can select patient just after launching the script: ``` -$ ./doctoshotgun.py paris roger.philibert@gmail.com PASSWORD +$ ./doctoshotgun.py fr paris roger.philibert@gmail.com PASSWORD Available patients are: * [0] Roger Philibert * [1] Luce Philibert @@ -70,10 +112,44 @@ For which patient do you want to book a slot? You can also give the patient id as argument: ``` -$ ./doctoshotgun.py paris roger.philibert@gmail.com PASSWORD -p 1 +$ ./doctoshotgun.py fr paris roger.philibert@gmail.com PASSWORD -p 1 +Starting to look for vaccine slots for Luce Philibert in 1 next day(s) starting today... +``` + +### Set time window + +By default, the script looks for slots between now and next day at 23:59:59. If you belong to a category of patients that is allowed to book a slot in a more distant future, you can expand the time window. For exemple, if you want to search in the next 5 days : + +``` +$ ./doctoshotgun.py fr paris roger.philibert@gmail.com -t 5 +Password: +Starting to look for vaccine slots for Roger Philibert in 5 next day(s) starting today... +This may take a few minutes/hours, be patient! +``` + +### Look on specific date + +By default, the script looks for slots between now and next day at 23:59:59. If you can't be vaccinated right now (e.g covid in the last 3 months or out of town) and you are looking for an appointment in a distant future, you can pass a starting date: + +``` +$ ./doctoshotgun.py fr paris roger.philibert@gmail.com --start-date 17/06/2021 +Password: +Starting to look for vaccine slots for Roger Philibert in 7 next day(s) starting 17/06/2021... +This may take a few minutes/hours, be patient! +``` + +### Filter by vaccine + +The Pfizer vaccine is the only vaccine allowed in France for people between 16 and 18. For this case, you can use the -z option. + +``` +$ ./doctoshotgun.py fr paris roger.philibert@gmail.com PASSWORD -z Starting to look for vaccine slots for Luce Philibert... +Vaccines: Pfizer +This may take a few minutes/hours, be patient! ``` +It is also possible to filter on Moderna vaccine with the -m option and Janssen with the -j option. ## Development diff --git a/ding.mp3 b/ding.mp3 new file mode 100644 index 0000000..d94c09f Binary files /dev/null and b/ding.mp3 differ diff --git a/doctoshotgun.py b/doctoshotgun.py index 5811594..b7c9acd 100755 --- a/doctoshotgun.py +++ b/doctoshotgun.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 import sys +import os import re import logging import tempfile @@ -8,6 +9,7 @@ from urllib.parse import urlparse import datetime import argparse +from pathlib import Path import getpass import unicodedata @@ -15,14 +17,35 @@ from dateutil.relativedelta import relativedelta import cloudscraper +import colorama +from requests.adapters import ReadTimeout, ConnectionError from termcolor import colored +from urllib import parse +from urllib3.exceptions import NewConnectionError -from woob.browser.exceptions import ClientError, ServerError -from woob.browser.browsers import LoginBrowser +from woob.browser.exceptions import ClientError, ServerError, HTTPNotFound +from woob.browser.browsers import LoginBrowser, StatesMixin from woob.browser.url import URL from woob.browser.pages import JsonPage, HTMLPage from woob.tools.log import createColoredFormatter +SLEEP_INTERVAL_AFTER_CONNECTION_ERROR = 5 +SLEEP_INTERVAL_AFTER_LOGIN_ERROR = 10 +SLEEP_INTERVAL_AFTER_CENTER = 1 +SLEEP_INTERVAL_AFTER_RUN = 5 + +try: + from playsound import playsound as _playsound, PlaysoundException + + def playsound(*args): + try: + return _playsound(*args) + except (PlaysoundException, ModuleNotFoundError): + pass # do not crash if, for one reason or another, something wrong happens +except ImportError: + def playsound(*args): + pass + def log(text, *args, **kwargs): args = (colored(arg, 'yellow') for arg in args) @@ -32,6 +55,14 @@ def log(text, *args, **kwargs): print(text, **kwargs) +def log_ts(text=None, *args, **kwargs): + ''' Log with timestamp''' + now = datetime.datetime.now() + print("[%s]" % now.isoformat(" ", "seconds")) + if text: + log(text, *args, **kwargs) + + class Session(cloudscraper.CloudScraper): def send(self, *args, **kwargs): callback = kwargs.pop('callback', lambda future, response: response) @@ -46,15 +77,61 @@ def send(self, *args, **kwargs): class LoginPage(JsonPage): - pass + def redirect(self): + return self.doc['redirection'] + + +class SendAuthCodePage(JsonPage): + def build_doc(self, content): + return "" # Do not choke on empty response from server + + +class ChallengePage(JsonPage): + def build_doc(self, content): + return "" # Do not choke on empty response from server class CentersPage(HTMLPage): + def on_load(self): + try: + v = self.doc.xpath('//input[@id="wait-time-value"]')[0] + except IndexError: + return + raise WaitingInQueue(int(v.attrib['value'])) + def iter_centers_ids(self): for div in self.doc.xpath('//div[@class="js-dl-search-results-calendar"]'): data = json.loads(div.attrib['data-props']) yield data['searchResultId'] + def get_next_page(self): + # French doctolib uses data-u attribute of span-element to create the link when user hovers span + for span in self.doc.xpath('//div[contains(@class, "next")]/span'): + if not span.attrib.has_key('data-u'): + continue + + # How to find the corresponding javascript-code: + # Press F12 to open dev-tools, select elements-tab, find div.next, right click on element and enable break on substructure change + # Hover "Next" element and follow callstack upwards + # JavaScript: + # var t = (e = r()(e)).data("u") + # , n = atob(t.replace(/\s/g, '').split('').reverse().join('')); + + import base64 + href = base64.urlsafe_b64decode(''.join(span.attrib['data-u'].split())[::-1]).decode() + query = dict(parse.parse_qsl(parse.urlsplit(href).query)) + + if 'page' in query: + return int(query['page']) + + for a in self.doc.xpath('//div[contains(@class, "next")]/a'): + href = a.attrib['href'] + query = dict(parse.parse_qsl(parse.urlsplit(href).query)) + + if 'page' in query: + return int(query['page']) + + return None class CenterResultPage(JsonPage): pass @@ -65,9 +142,18 @@ class CenterPage(HTMLPage): class CenterBookingPage(JsonPage): - def find_motive(self, regex): + def find_motive(self, regex, singleShot=False): for s in self.doc['data']['visit_motives']: - if re.search(regex, s['name']): + # ignore case as some doctors use their own spelling + if re.search(regex, s['name'], re.IGNORECASE): + if s['allow_new_patients'] == False: + log('Motive %s not allowed for new patients at this center. Skipping vaccine...', + s['name'], flush=True) + continue + if not singleShot and not s['first_shot_motive']: + log('Skipping second shot motive %s...', + s['name'], flush=True) + continue return s['id'] return None @@ -96,11 +182,11 @@ def get_profile_id(self): class AvailabilitiesPage(JsonPage): - def find_best_slot(self, limit=True): + def find_best_slot(self, start_date=None, end_date=None): for a in self.doc['availabilities']: - if limit and parse_date(a['date']).date() > datetime.date.today() + relativedelta(days=1): + date = parse_date(a['date']).date() + if start_date and date < start_date or end_date and date > end_date: continue - if len(a['slots']) == 0: continue return a['slots'][-1] @@ -133,19 +219,34 @@ def get_name(self): return '%s %s' % (self.doc[0]['first_name'], self.doc[0]['last_name']) -class Doctolib(LoginBrowser): - BASEURL = 'https://www.doctolib.fr' +class WaitingInQueue(Exception): + pass + +class CityNotFound(Exception): + pass + + +class Doctolib(LoginBrowser, StatesMixin): + # individual properties for each country. To be defined in subclasses + BASEURL = "" + vaccine_motives = {} + centers = URL('') + center = URL('') + # common properties login = URL('/login.json', LoginPage) - centers = URL(r'/vaccination-covid-19/(?P\w+)', CentersPage) + send_auth_code = URL('/api/accounts/send_auth_code', SendAuthCodePage) + challenge = URL('/login/challenge', ChallengePage) center_result = URL(r'/search_results/(?P\d+).json', CenterResultPage) - center = URL(r'/centre-de-sante/.*', CenterPage) center_booking = URL(r'/booking/(?P.+).json', CenterBookingPage) availabilities = URL(r'/availabilities.json', AvailabilitiesPage) - second_shot_availabilities = URL(r'/second_shot_availabilities.json', AvailabilitiesPage) + second_shot_availabilities = URL( + r'/second_shot_availabilities.json', AvailabilitiesPage) appointment = URL(r'/appointments.json', AppointmentPage) - appointment_edit = URL(r'/appointments/(?P.+)/edit.json', AppointmentEditPage) - appointment_post = URL(r'/appointments/(?P.+).json', AppointmentPostPage) + appointment_edit = URL( + r'/appointments/(?P.+)/edit.json', AppointmentEditPage) + appointment_post = URL( + r'/appointments/(?P.+).json', AppointmentPostPage) master_patient = URL(r'/account/master_patients.json', MasterPatientPage) def _setup_session(self, profile): @@ -157,7 +258,6 @@ def _setup_session(self, profile): self.session = session - def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.session.headers['sec-fetch-dest'] = 'document' @@ -165,15 +265,23 @@ def __init__(self, *args, **kwargs): self.session.headers['sec-fetch-site'] = 'same-origin' self.session.headers['User-Agent'] = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36' - self._logged = False self.patient = None - @property - def logged(self): - return self._logged + def locate_browser(self, state): + # When loading state, do not locate browser on the last url. + pass - def do_login(self): - self.open('https://www.doctolib.fr/sessions/new') + def do_login(self, code): + try: + self.open(self.BASEURL + '/sessions/new') + except ServerError as e: + if e.response.status_code in [503] \ + and 'text/html' in e.response.headers['Content-Type'] \ + and ('cloudflare' in e.response.text or 'Checking your browser before accessing' in e .response.text): + log('Request blocked by CloudFlare', color='red') + if e.response.status_code in [520]: + log('Cloudflare is unable to connect to Doctolib server. Please retry later.', color='red') + raise try: self.login.go(json={'kind': 'patient', 'username': self.username, @@ -181,79 +289,135 @@ def do_login(self): 'remember': True, 'remember_username': True}) except ClientError: + print('Wrong login/password') return False + if self.page.redirect() == "/sessions/two-factor": + print("Requesting 2fa code...") + if not code: + if not sys.__stdin__.isatty(): + log("Auth Code input required, but no interactive terminal available. Please provide it via command line argument '--code'.", color='red') + return False + self.send_auth_code.go( + json={'two_factor_auth_method': 'email'}, method="POST") + code = input("Enter auth code: ") + try: + self.challenge.go( + json={'auth_code': code, 'two_factor_auth_method': 'email'}, method="POST") + except HTTPNotFound: + print("Invalid auth code") + return False + return True - def find_centers(self, where): + def find_centers(self, where, motives=None, page=1): + if motives is None: + motives = self.vaccine_motives.keys() for city in where: try: - self.centers.go(where=city, params={'ref_visit_motive_ids[]': ['6970', '7005']}) + self.centers.go(where=city, params={ + 'ref_visit_motive_ids[]': motives, 'page': page}) except ServerError as e: if e.response.status_code in [503]: - return None - else: - raise e + if 'text/html' in e.response.headers['Content-Type'] \ + and ('cloudflare' in e.response.text or + 'Checking your browser before accessing' in e .response.text): + log('Request blocked by CloudFlare', color='red') + return + if e.response.status_code in [520]: + log('Cloudflare is unable to connect to Doctolib server. Please retry later.', color='red') + return + raise + except HTTPNotFound as e: + raise CityNotFound(city) from e + + next_page = self.page.get_next_page() for i in self.page.iter_centers_ids(): - page = self.center_result.open(id=i, params={'limit': '4', 'ref_visit_motive_ids[]': ['6970', '7005'], 'speciality_id': '5494', 'search_result_format': 'json'}) - # XXX return all pages even if there are no indicated availabilities. - #for a in page.doc['availabilities']: - # if len(a['slots']) > 0: - # yield page.doc['search_result'] + page = self.center_result.open( + id=i, + params={ + 'limit': '4', + 'ref_visit_motive_ids[]': motives, + 'speciality_id': '5494', + 'search_result_format': 'json' + } + ) try: yield page.doc['search_result'] except KeyError: pass + if next_page: + for center in self.find_centers(where, motives, next_page): + yield center + def get_patients(self): self.master_patient.go() return self.page.get_patients() - def normalize(self, string): + @classmethod + def normalize(cls, string): nfkd = unicodedata.normalize('NFKD', string) - normalized = u"".join([c for c in nfkd if not unicodedata.combining(c)]) + normalized = u"".join( + [c for c in nfkd if not unicodedata.combining(c)]) normalized = re.sub(r'\W', '-', normalized) return normalized.lower() - def try_to_book(self, center): - self.open(center['url']) + def try_to_book(self, center, vaccine_list, start_date, end_date, only_second, only_third, dry_run=False): + try: + self.open(center['url']) + except ClientError as e: + # Sometimes there are referenced centers which are not available anymore (410 Gone) + log('Error: %s', e, color='red') + return False + p = urlparse(center['url']) center_id = p.path.split('/')[-1] center_page = self.center_booking.go(center_id=center_id) profile_id = self.page.get_profile_id() - motive_id = self.page.find_motive(r'1re.*(Pfizer|Moderna)') - - if not motive_id: - log('Unable to find mRNA motive') - log('Motives: %s', ', '.join(self.page.get_motives())) + # extract motive ids based on the vaccine names + motives_id = dict() + for vaccine in vaccine_list: + motives_id[vaccine] = self.page.find_motive( + r'.*({})'.format(vaccine), singleShot=(vaccine == self.vaccine_motives[self.KEY_JANSSEN] or only_second or only_third)) + + motives_id = dict((k, v) + for k, v in motives_id.items() if v is not None) + if len(motives_id.values()) == 0: + log('Unable to find requested vaccines in motives', color='red') + log('Motives: %s', ', '.join(self.page.get_motives()), color='red') return False for place in self.page.get_places(): - log('– %s...', place['name'], end=' ', flush=True) + if place['name']: + log('– %s...', place['name']) practice_id = place['practice_ids'][0] - agenda_ids = center_page.get_agenda_ids(motive_id, practice_id) - if len(agenda_ids) == 0: - # do not filter to give a chance - agenda_ids = center_page.get_agenda_ids(motive_id) + for vac_name, motive_id in motives_id.items(): + log(' Vaccine %s...', vac_name, end=' ', flush=True) + agenda_ids = center_page.get_agenda_ids(motive_id, practice_id) + if len(agenda_ids) == 0: + # do not filter to give a chance + agenda_ids = center_page.get_agenda_ids(motive_id) - if self.try_to_book_place(profile_id, motive_id, practice_id, agenda_ids): - return True + if self.try_to_book_place(profile_id, motive_id, practice_id, agenda_ids, vac_name.lower(), start_date, end_date, only_second, only_third, dry_run): + return True return False - def try_to_book_place(self, profile_id, motive_id, practice_id, agenda_ids): - date = datetime.date.today().strftime('%Y-%m-%d') + def try_to_book_place(self, profile_id, motive_id, practice_id, agenda_ids, vac_name, start_date, end_date, only_second, only_third, dry_run=False): + date = start_date.strftime('%Y-%m-%d') while date is not None: - self.availabilities.go(params={'start_date': date, - 'visit_motive_ids': motive_id, - 'agenda_ids': '-'.join(agenda_ids), - 'insurance_sector': 'public', - 'practice_ids': practice_id, - 'destroy_temporary': 'true', - 'limit': 3}) + self.availabilities.go( + params={'start_date': date, + 'visit_motive_ids': motive_id, + 'agenda_ids': '-'.join(agenda_ids), + 'insurance_sector': 'public', + 'practice_ids': practice_id, + 'destroy_temporary': 'true', + 'limit': 3}) if 'next_slot' in self.page.doc: date = self.page.doc['next_slot'] else: @@ -263,65 +427,105 @@ def try_to_book_place(self, profile_id, motive_id, practice_id, agenda_ids): log('no availabilities', color='red') return False - slot = self.page.find_best_slot() + slot = self.page.find_best_slot(start_date, end_date) if not slot: - log('first slot not found :(', color='red') - return False - if type(slot) != dict: - log('error while fetching first slot.', color='red') + if only_second == False and only_third == False: + log('First slot not found :(', color='red') + else: + log('Slot not found :(', color='red') return False + # depending on the country, the slot is returned in a different format. Go figure... + if isinstance(slot, dict) and 'start_date' in slot: + slot_date_first = slot['start_date'] + if vac_name != "janssen": + slot_date_second = slot['steps'][1]['start_date'] + elif isinstance(slot, str): + if vac_name != "janssen" and not only_second and not only_third: + log('Only one slot for multi-shot vaccination found') + # should be for Janssen, second or third shots only, otherwise it is a list + slot_date_first = slot + elif isinstance(slot, list): + slot_date_first = slot[0] + if vac_name != "janssen": # maybe redundant? + slot_date_second = slot[1] + else: + log('Error while fetching first slot.', color='red') + return False + if vac_name != "janssen" and not only_second and not only_third: + assert slot_date_second log('found!', color='green') - log(' ├╴ Best slot found: %s', parse_date(slot['start_date']).strftime('%c')) + log(' ├╴ Best slot found: %s', parse_date( + slot_date_first).strftime('%c')) appointment = {'profile_id': profile_id, 'source_action': 'profile', - 'start_date': slot['start_date'], + 'start_date': slot_date_first, 'visit_motive_ids': str(motive_id), - } + } data = {'agenda_ids': '-'.join(agenda_ids), 'appointment': appointment, 'practice_ids': [practice_id]} headers = { - 'content-type': 'application/json', - } + 'content-type': 'application/json', + } self.appointment.go(data=json.dumps(data), headers=headers) if self.page.is_error(): log(' └╴ Appointment not available anymore :( %s', self.page.get_error()) return False - self.second_shot_availabilities.go(params={'start_date': slot['steps'][1]['start_date'].split('T')[0], - 'visit_motive_ids': motive_id, - 'agenda_ids': '-'.join(agenda_ids), - 'first_slot': slot['start_date'], - 'insurance_sector': 'public', - 'practice_ids': practice_id, - 'limit': 3}) - - second_slot = self.page.find_best_slot(limit=False) - if not second_slot: - log(' └╴ No second shot found') - return False + playsound('ding.mp3') + + if vac_name != "janssen" and not only_second and not only_third: # janssen has only one shot + self.second_shot_availabilities.go( + params={'start_date': slot_date_second.split('T')[0], + 'visit_motive_ids': motive_id, + 'agenda_ids': '-'.join(agenda_ids), + 'first_slot': slot_date_first, + 'insurance_sector': 'public', + 'practice_ids': practice_id, + 'limit': 3}) + + second_slot = self.page.find_best_slot() + if not second_slot: + log(' └╴ No second shot found') + return False + + # in theory we could use the stored slot_date_second result from above, + # but we refresh with the new results to play safe + if isinstance(second_slot, dict) and 'start_date' in second_slot: + slot_date_second = second_slot['start_date'] + elif isinstance(slot, str): + slot_date_second = second_slot + # TODO: is this else needed? + # elif isinstance(slot, list): + # slot_date_second = second_slot[1] + else: + log('Error while fetching second slot.', color='red') + return False - log(' ├╴ Second shot: %s', parse_date(second_slot['start_date']).strftime('%c')) + log(' ├╴ Second shot: %s', parse_date( + slot_date_second).strftime('%c')) - data['second_slot'] = second_slot['start_date'] - self.appointment.go(data=json.dumps(data), headers=headers) + data['second_slot'] = slot_date_second + self.appointment.go(data=json.dumps(data), headers=headers) - if self.page.is_error(): - log(' └╴ Appointment not available anymore :( %s', self.page.get_error()) - return False + if self.page.is_error(): + log(' └╴ Appointment not available anymore :( %s', + self.page.get_error()) + return False a_id = self.page.doc['id'] self.appointment_edit.go(id=a_id) - log(' ├╴ Booking for %s %s...', self.patient['first_name'], self.patient['last_name']) + log(' ├╴ Booking for %(first_name)s %(last_name)s...' % self.patient) - self.appointment_edit.go(id=a_id, params={'master_patient_id': self.patient['id']}) + self.appointment_edit.go( + id=a_id, params={'master_patient_id': self.patient['id']}) custom_fields = {} for field in self.page.get_custom_fields(): @@ -330,28 +534,37 @@ def try_to_book_place(self, profile_id, motive_id, practice_id, agenda_ids): elif field['placeholder']: value = field['placeholder'] else: - print('%s (%s):' % (field['label'], field['placeholder']), end=' ', flush=True) + for key, value in field.get('options', []): + print(' │ %s %s' % (colored(key, 'green'), colored(value, 'yellow'))) + print(' ├╴ %s%s:' % (field['label'], (' (%s)' % field['placeholder']) if field['placeholder'] else ''), + end=' ', flush=True) value = sys.stdin.readline().strip() custom_fields[field['id']] = value + if dry_run: + log(' └╴ Booking status: %s', 'fake') + return True + data = {'appointment': {'custom_fields_values': custom_fields, 'new_patient': True, 'qualification_answers': {}, 'referrer_id': None, - }, + }, 'bypass_mandatory_relative_contact_info': False, 'email': None, 'master_patient': self.patient, 'new_patient': True, 'patient': None, 'phone_number': None, - } + } - self.appointment_post.go(id=a_id, data=json.dumps(data), headers=headers, method='PUT') + self.appointment_post.go(id=a_id, data=json.dumps( + data), headers=headers, method='PUT') if 'redirection' in self.page.doc and not 'confirmed-appointment' in self.page.doc['redirection']: - log(' ├╴ Open %s to complete', 'https://www.doctolib.fr' + self.page.doc['redirection']) + log(' ├╴ Open %s to complete', self.BASEURL + + self.page.doc['redirection']) self.appointment_post.go(id=a_id) @@ -359,7 +572,64 @@ def try_to_book_place(self, profile_id, motive_id, practice_id, agenda_ids): return self.page.doc['confirmed'] + +class DoctolibDE(Doctolib): + BASEURL = 'https://www.doctolib.de' + KEY_PFIZER = '6768' + KEY_PFIZER_SECOND = '6769' + KEY_PFIZER_THIRD = '9039' + KEY_MODERNA = '6936' + KEY_MODERNA_SECOND = '6937' + KEY_MODERNA_THIRD = '9040' + KEY_JANSSEN = '7978' + KEY_ASTRAZENECA = '7109' + KEY_ASTRAZENECA_SECOND = '7110' + vaccine_motives = { + KEY_PFIZER: 'Pfizer', + KEY_PFIZER_SECOND: 'Zweit.*Pfizer|Pfizer.*Zweit', + KEY_PFIZER_THIRD: 'Auffrischung.*Pfizer|Pfizer.*Auffrischung|Dritt.*Pfizer|Booster.*Pfizer', + KEY_MODERNA: 'Moderna', + KEY_MODERNA_SECOND: 'Zweit.*Moderna|Moderna.*Zweit', + KEY_MODERNA_THIRD: 'Auffrischung.*Moderna|Moderna.*Auffrischung|Dritt.*Moderna|Booster.*Moderna', + KEY_JANSSEN: 'Janssen', + KEY_ASTRAZENECA: 'AstraZeneca', + KEY_ASTRAZENECA_SECOND: 'Zweit.*AstraZeneca|AstraZeneca.*Zweit', + } + centers = URL(r'/impfung-covid-19-corona/(?P\w+)', CentersPage) + center = URL(r'/praxis/.*', CenterPage) + + +class DoctolibFR(Doctolib): + BASEURL = 'https://www.doctolib.fr' + KEY_PFIZER = '6970' + KEY_PFIZER_SECOND = '6971' + KEY_PFIZER_THIRD = '8192' + KEY_MODERNA = '7005' + KEY_MODERNA_SECOND = '7004' + KEY_MODERNA_THIRD = '8193' + KEY_JANSSEN = '7945' + KEY_ASTRAZENECA = '7107' + KEY_ASTRAZENECA_SECOND = '7108' + vaccine_motives = { + KEY_PFIZER: 'Pfizer', + KEY_PFIZER_SECOND: '2de.*Pfizer', + KEY_PFIZER_THIRD: '3e.*Pfizer', + KEY_MODERNA: 'Moderna', + KEY_MODERNA_SECOND: '2de.*Moderna', + KEY_MODERNA_THIRD: '3e.*Moderna', + KEY_JANSSEN: 'Janssen', + KEY_ASTRAZENECA: 'AstraZeneca', + KEY_ASTRAZENECA_SECOND: '2de.*AstraZeneca', + } + + centers = URL(r'/vaccination-covid-19/(?P\w+)', CentersPage) + center = URL(r'/centre-de-sante/.*', CenterPage) + + class Application: + DATA_DIRNAME = (Path(os.environ.get("XDG_DATA_HOME") or Path.home() / ".local" / "share")) / 'doctoshotgun' + STATE_FILENAME = DATA_DIRNAME / 'state.json' + @classmethod def create_default_logger(cls): # stderr logger @@ -375,15 +645,74 @@ def setup_loggers(self, level): logging.root.setLevel(level) logging.root.addHandler(self.create_default_logger()) - def main(self): - parser = argparse.ArgumentParser(description="Book a vaccine slot on Doctolib") - parser.add_argument('--debug', '-d', action='store_true', help='show debug information') - parser.add_argument('--patient', '-p', type=int, default=-1, help='give patient ID') - parser.add_argument('--center', '-c', action='append', help='filter centers') + def load_state(self): + try: + with open(self.STATE_FILENAME, 'r') as fp: + state = json.load(fp) + except IOError: + return {} + else: + return state + + def save_state(self, state): + if not os.path.exists(self.DATA_DIRNAME): + os.makedirs(self.DATA_DIRNAME) + with open(self.STATE_FILENAME, 'w') as fp: + json.dump(state, fp) + + def main(self, cli_args=None): + colorama.init() # needed for windows + + doctolib_map = { + "fr": DoctolibFR, + "de": DoctolibDE + } + + parser = argparse.ArgumentParser( + description="Book a vaccine slot on Doctolib") + parser.add_argument('--debug', '-d', action='store_true', + help='show debug information') + parser.add_argument('--pfizer', '-z', action='store_true', + help='select only Pfizer vaccine') + parser.add_argument('--moderna', '-m', action='store_true', + help='select only Moderna vaccine') + parser.add_argument('--janssen', '-j', action='store_true', + help='select only Janssen vaccine') + parser.add_argument('--astrazeneca', '-a', action='store_true', + help='select only AstraZeneca vaccine') + parser.add_argument('--only-second', '-2', + action='store_true', help='select only second dose') + parser.add_argument('--only-third', '-3', + action='store_true', help='select only third dose') + parser.add_argument('--patient', '-p', type=int, + default=-1, help='give patient ID') + parser.add_argument('--time-window', '-t', type=int, default=7, + help='set how many next days the script look for slots (default = 7)') + parser.add_argument( + '--center', '-c', action='append', help='filter centers') + parser.add_argument('--center-regex', + action='append', help='filter centers by regex') + parser.add_argument('--center-exclude', '-x', + action='append', help='exclude centers') + parser.add_argument('--center-exclude-regex', + action='append', help='exclude centers by regex') + parser.add_argument( + '--include-neighbor-city', '-n', action='store_true', help='include neighboring cities') + parser.add_argument('--start-date', type=str, default=None, + help='first date on which you want to book the first slot (format should be DD/MM/YYYY)') + parser.add_argument('--end-date', type=str, default=None, + help='last date on which you want to book the first slot (format should be DD/MM/YYYY)') + parser.add_argument('--dry-run', action='store_true', + help='do not really book the slot') + parser.add_argument( + 'country', help='country where to book', choices=list(doctolib_map.keys())) parser.add_argument('city', help='city where to book') parser.add_argument('username', help='Doctolib username') parser.add_argument('password', nargs='?', help='Doctolib password') - args = parser.parse_args() + parser.add_argument('--code', type=str, default=None, help='2FA code') + parser.add_argument('--proxy', '-P', type=str, default=None, + help='define a proxy to use') + args = parser.parse_args(cli_args if cli_args else sys.argv[1:]) if args.debug: responses_dirname = tempfile.mkdtemp(prefix='woob_session_') @@ -395,60 +724,193 @@ def main(self): if not args.password: args.password = getpass.getpass() - docto = Doctolib(args.username, args.password, responses_dirname=responses_dirname) - if not docto.do_login(): - print('Wrong login/password') - return 1 - - patients = docto.get_patients() - if len(patients) == 0: - print("It seems that you don't have any Patient registered in your Doctolib account. Please fill your Patient data on Doctolib Website.") - return 1 - if args.patient >= 0 and args.patient < len(patients): - docto.patient = patients[args.patient] - elif len(patients) > 1: - print('Available patients are:') - for i, patient in enumerate(patients): - print('* [%s] %s %s' % (i, patient['first_name'], patient['last_name'])) - while True: - print('For which patient do you want to book a slot?', end=' ', flush=True) - try: - docto.patient = patients[int(sys.stdin.readline().strip())] - except (ValueError, IndexError): - continue - else: - break - else: - docto.patient = patients[0] + docto = doctolib_map[args.country]( + args.username, args.password, responses_dirname=responses_dirname) + + if args.proxy: + docto.PROXIES = {'https': args.proxy} - log('Starting to look for vaccine slots for %s %s...', docto.patient['first_name'], docto.patient['last_name']) - log('This may take a few minutes/hours, be patient!') - cities = [docto.normalize(city) for city in args.city.split(',')] + docto.load_state(self.load_state()) - while True: - for center in docto.find_centers(cities): - if args.center: - if center['name_with_title'] not in args.center: - logging.debug("Skipping center '%s'", center['name_with_title']) + try: + if not docto.do_login(args.code): + return 1 + + patients = docto.get_patients() + if len(patients) == 0: + print("It seems that you don't have any Patient registered in your Doctolib account. Please fill your Patient data on Doctolib Website.") + return 1 + if args.patient >= 0 and args.patient < len(patients): + docto.patient = patients[args.patient] + elif len(patients) > 1: + print('Available patients are:') + for i, patient in enumerate(patients): + print('* [%s] %s %s' % + (i, patient['first_name'], patient['last_name'])) + while True: + print('For which patient do you want to book a slot?', + end=' ', flush=True) + try: + docto.patient = patients[int(sys.stdin.readline().strip())] + except (ValueError, IndexError): continue + else: + break + else: + docto.patient = patients[0] + + motives = [] + if not args.pfizer and not args.moderna and not args.janssen and not args.astrazeneca: + if args.only_second: + motives.append(docto.KEY_PFIZER_SECOND) + motives.append(docto.KEY_MODERNA_SECOND) + # motives.append(docto.KEY_ASTRAZENECA_SECOND) #do not add AstraZeneca by default + elif args.only_third: + if not docto.KEY_PFIZER_THIRD and not docto.KEY_MODERNA_THIRD: + print('Invalid args: No third shot vaccinations in this country') + return 1 + motives.append(docto.KEY_PFIZER_THIRD) + motives.append(docto.KEY_MODERNA_THIRD) else: - if docto.normalize(center['city']) not in cities: - logging.debug("Skipping city '%(city)s' %(name_with_title)s", center) - continue - - log('') - log('Center %s:', center['name_with_title']) - - if docto.try_to_book(center): - log('') - log('💉 %s Congratulations.' % colored('Booked!', 'green', attrs=('bold',))) - return 0 + motives.append(docto.KEY_PFIZER) + motives.append(docto.KEY_MODERNA) + motives.append(docto.KEY_JANSSEN) + # motives.append(docto.KEY_ASTRAZENECA) #do not add AstraZeneca by default + if args.pfizer: + if args.only_second: + motives.append(docto.KEY_PFIZER_SECOND) + elif args.only_third: + if not docto.KEY_PFIZER_THIRD: # not available in all countries + print('Invalid args: Pfizer has no third shot in this country') + return 1 + motives.append(docto.KEY_PFIZER_THIRD) + else: + motives.append(docto.KEY_PFIZER) + if args.moderna: + if args.only_second: + motives.append(docto.KEY_MODERNA_SECOND) + elif args.only_third: + if not docto.KEY_MODERNA_THIRD: # not available in all countries + print('Invalid args: Moderna has no third shot in this country') + return 1 + motives.append(docto.KEY_MODERNA_THIRD) + else: + motives.append(docto.KEY_MODERNA) + if args.janssen: + if args.only_second or args.only_third: + print('Invalid args: Janssen has no second or third shot') + return 1 + else: + motives.append(docto.KEY_JANSSEN) + if args.astrazeneca: + if args.only_second: + motives.append(docto.KEY_ASTRAZENECA_SECOND) + elif args.only_third: + print('Invalid args: AstraZeneca has no third shot') + return 1 + else: + motives.append(docto.KEY_ASTRAZENECA) - sleep(1) + vaccine_list = [docto.vaccine_motives[motive] for motive in motives] - sleep(5) + if args.start_date: + try: + start_date = datetime.datetime.strptime( + args.start_date, '%d/%m/%Y').date() + except ValueError as e: + print('Invalid value for --start-date: %s' % e) + return 1 + else: + start_date = datetime.date.today() + if args.end_date: + try: + end_date = datetime.datetime.strptime( + args.end_date, '%d/%m/%Y').date() + except ValueError as e: + print('Invalid value for --end-date: %s' % e) + return 1 + else: + end_date = start_date + relativedelta(days=args.time_window) + log('Starting to look for vaccine slots for %s %s between %s and %s...', + docto.patient['first_name'], docto.patient['last_name'], start_date, end_date) + log('Vaccines: %s', ', '.join(vaccine_list)) + log('Country: %s ', args.country) + log('This may take a few minutes/hours, be patient!') + cities = [docto.normalize(city) for city in args.city.split(',')] - return 0 + while True: + log_ts() + try: + for center in docto.find_centers(cities, motives): + if args.center: + if center['name_with_title'] not in args.center: + logging.debug("Skipping center '%s'" % + center['name_with_title']) + continue + if args.center_regex: + center_matched = False + for center_regex in args.center_regex: + if re.match(center_regex, center['name_with_title']): + center_matched = True + else: + logging.debug( + "Skipping center '%(name_with_title)s'" % center) + if not center_matched: + continue + if args.center_exclude: + if center['name_with_title'] in args.center_exclude: + logging.debug( + "Skipping center '%(name_with_title)s' because it's excluded" % center) + continue + if args.center_exclude_regex: + center_excluded = False + for center_exclude_regex in args.center_exclude_regex: + if re.match(center_exclude_regex, center['name_with_title']): + logging.debug( + "Skipping center '%(name_with_title)s' because it's excluded" % center) + center_excluded = True + if center_excluded: + continue + if not args.include_neighbor_city and not docto.normalize(center['city']).startswith(tuple(cities)): + logging.debug( + "Skipping city '%(city)s' %(name_with_title)s" % center) + continue + + log('') + + log('Center %(name_with_title)s (%(city)s):' % center) + + if docto.try_to_book(center, vaccine_list, start_date, end_date, args.only_second, args.only_third, args.dry_run): + log('') + log('💉 %s Congratulations.' % + colored('Booked!', 'green', attrs=('bold',))) + return 0 + + sleep(SLEEP_INTERVAL_AFTER_CENTER) + + log('') + log('No free slots found at selected centers. Trying another round in %s sec...', SLEEP_INTERVAL_AFTER_RUN) + sleep(SLEEP_INTERVAL_AFTER_RUN) + except CityNotFound as e: + print('\n%s: City %s not found. Make sure you selected a city from the available countries.' % ( + colored('Error', 'red'), colored(e, 'yellow'))) + return 1 + except WaitingInQueue as waiting_time: + log('Within the queue, estimated waiting time %s minutes', waiting_time) + sleep(30) + except (ReadTimeout, ConnectionError, NewConnectionError) as e: + print('\n%s' % (colored( + 'Connection error. Check your internet connection. Retrying ...', 'red'))) + print(str(e)) + sleep(SLEEP_INTERVAL_AFTER_CONNECTION_ERROR) + except Exception as e: + template = "An unexpected exception of type {0} occurred. Arguments:\n{1!r}" + message = template.format(type(e).__name__, e.args) + print(message) + return 1 + return 0 + finally: + self.save_state(docto.dump_state()) if __name__ == '__main__': diff --git a/example.svg b/example.svg index 7ce8047..1459a7e 100644 --- a/example.svg +++ b/example.svg @@ -1 +1 @@ -rom1@money(master)~/src/doctoshotgun$rom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyparisrom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyparisromain@bignon.merom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyparisromain@bignon.me-prom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyparisromain@bignon.me-p0Password:StartingtolookforvaccineslotsforRomainBignon...Thismaytakeafewminutes/hours,bepatient!CenterCentredevaccinationCovid-19-CPAMdeParis75:CentredevaccinationAmelot-CPAMdeParis75...noavailabilitiesCentredevaccinationMaroc-CPAMdeParis75...noavailabilitiesCenterCentredevaccinationCovid-19-Paris17ᵉ:CentredeVaccinationCovid-19-Paris17e...noavailabilitiesCenterCentredeVaccinationCovid-19-VilledeParis:CentredeVaccination-Mairiedu10e...found!├╴Bestslotfound:MonMay1716:30:002021├╴Secondshot:SatJun2617:00:002021├╴BookingforRomainBignon...└╴Bookingstatus:True💉Booked!Congratulations.rom1@money(master)~/src/doctoshotgun$.rom1@money(master)~/src/doctoshotgun$./rom1@money(master)~/src/doctoshotgun$./drom1@money(master)~/src/doctoshotgun$./dorom1@money(master)~/src/doctoshotgun$./docrom1@money(master)~/src/doctoshotgun$./doctrom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyrom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyprom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyparom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyparrom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyparirom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyparisrrom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyparisrorom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyparisromrom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyparisromarom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyparisromairom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyparisromainrom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyparisromain@rom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyparisromain@brom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyparisromain@birom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyparisromain@bigrom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyparisromain@bignrom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyparisromain@bignorom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyparisromain@bignonrom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyparisromain@bignon.rom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyparisromain@bignon.mrom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyparisromain@bignon.me-CentredevaccinationAmelot-CPAMdeParis75...CentredevaccinationMaroc-CPAMdeParis75...CentredeVaccinationCovid-19-Paris17e...CentredeVaccination-Mairiedu10e... \ No newline at end of file +rom1@money(master)~/src/doctoshotgun$rom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyfrrom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyfrparisrom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyfrparisromain@bignon.merom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyfrparisromain@bignon.me-prom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyfrparisromain@bignon.me-p0Password:StartingtolookforvaccineslotsforRomainBignon...Thismaytakeafewminutes/hours,bepatient!CenterCentredevaccinationCovid-19-CPAMdeParis75:CentredevaccinationAmelot-CPAMdeParis75...noavailabilitiesCentredevaccinationMaroc-CPAMdeParis75...noavailabilitiesCenterCentredevaccinationCovid-19-Paris17ᵉ:CentredeVaccinationCovid-19-Paris17e...noavailabilitiesCenterCentredeVaccinationCovid-19-VilledeParis:CentredeVaccination-Mairiedu10e...found!├╴Bestslotfound:MonMay1716:30:002021├╴Secondshot:SatJun2617:00:002021├╴BookingforRomainBignon...└╴Bookingstatus:True💉Booked!Congratulations.rom1@money(master)~/src/doctoshotgun$.rom1@money(master)~/src/doctoshotgun$./rom1@money(master)~/src/doctoshotgun$./drom1@money(master)~/src/doctoshotgun$./dorom1@money(master)~/src/doctoshotgun$./docrom1@money(master)~/src/doctoshotgun$./doctrom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyrom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyfrom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyfrprom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyfrparom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyfrparrom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyfrparirom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyfrparisrrom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyfrparisrorom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyfrparisromrom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyfrparisromarom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyfrparisromairom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyfrparisromainrom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyfrparisromain@rom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyfrparisromain@brom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyfrparisromain@birom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyfrparisromain@bigrom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyfrparisromain@bignrom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyfrparisromain@bignorom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyfrparisromain@bignonrom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyfrparisromain@bignon.rom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyfrparisromain@bignon.mrom1@money(master)~/src/doctoshotgun$./doctoshotgun.pyfrparisromain@bignon.me-CentredevaccinationAmelot-CPAMdeParis75...CentredevaccinationMaroc-CPAMdeParis75...CentredeVaccinationCovid-19-Paris17e...CentredeVaccination-Mairiedu10e... \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 96a3fb5..2e3b0cb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ woob cloudscraper python-dateutil -termcolor \ No newline at end of file +termcolor +colorama diff --git a/test_browser.py b/test_browser.py index d2a29a8..eda1472 100644 --- a/test_browser.py +++ b/test_browser.py @@ -1,21 +1,60 @@ import pytest +from requests.adapters import Response import responses +from html import escape +import lxml.html as html +import json +import datetime +from woob.browser.browsers import Browser from woob.browser.exceptions import ServerError +from doctoshotgun import CentersPage, DoctolibDE, DoctolibFR, CenterBookingPage -from doctoshotgun import Doctolib +# globals +FIXTURES_FOLDER = "test_fixtures" + +# URL to be mocked using responses +SEARCH_URL_FOR_KOLN = ( + 'https://127.0.0.1/search_results/1234567.json?limit=4' + '&ref_visit_motive_ids%5B%5D=6768' + '&ref_visit_motive_ids%5B%5D=6769' + '&ref_visit_motive_ids%5B%5D=9039' + '&ref_visit_motive_ids%5B%5D=6936' + '&ref_visit_motive_ids%5B%5D=6937' + '&ref_visit_motive_ids%5B%5D=9040' + '&ref_visit_motive_ids%5B%5D=7978' + '&ref_visit_motive_ids%5B%5D=7109' + '&ref_visit_motive_ids%5B%5D=7110' + '&speciality_id=5494' + '&search_result_format=json' +) + +SEARCH_URL_FOR_MUNCHEN=( + 'https://127.0.0.1/impfung-covid-19-corona/M%C3%BCnchen' + '?ref_visit_motive_ids%5B%5D=6768' + '&ref_visit_motive_ids%5B%5D=6769' + '&ref_visit_motive_ids%5B%5D=9039' + '&ref_visit_motive_ids%5B%5D=6936' + '&ref_visit_motive_ids%5B%5D=6937' + '&ref_visit_motive_ids%5B%5D=9040' + '&ref_visit_motive_ids%5B%5D=7978' + '&ref_visit_motive_ids%5B%5D=7109' + '&ref_visit_motive_ids%5B%5D=7110' + '&page=1' +) @responses.activate -def test_find_centers_returns_503_should_continue(tmp_path): +def test_find_centers_fr_returns_503_should_continue(tmp_path): """ Check that find_centers doesn't raise a ServerError in case of 503 HTTP response """ - docto = Doctolib("roger.phillibert@gmail.com", "1234", responses_dirname=tmp_path) + docto = DoctolibFR("roger.phillibert@gmail.com", + "1234", responses_dirname=tmp_path) docto.BASEURL = "https://127.0.0.1" responses.add( responses.GET, - "https://127.0.0.1/vaccination-covid-19/Paris?ref_visit_motive_ids%5B%5D=6970&ref_visit_motive_ids%5B%5D=7005", + "https://127.0.0.1/vaccination-covid-19/Paris?ref_visit_motive_ids%5B%5D=6970&ref_visit_motive_ids%5B%5D=6971&ref_visit_motive_ids%5B%5D=8192&ref_visit_motive_ids%5B%5D=7005&ref_visit_motive_ids%5B%5D=7004&ref_visit_motive_ids%5B%5D=8193&ref_visit_motive_ids%5B%5D=7945&ref_visit_motive_ids%5B%5D=7107&ref_visit_motive_ids%5B%5D=7108&page=1", status=503 ) @@ -25,16 +64,57 @@ def test_find_centers_returns_503_should_continue(tmp_path): @responses.activate -def test_find_centers_returns_502_should_fail(tmp_path): +def test_find_centers_de_returns_503_should_continue(tmp_path): + """ + Check that find_centers doesn't raise a ServerError in case of 503 HTTP response + """ + docto = DoctolibDE("roger.phillibert@gmail.com", + "1234", responses_dirname=tmp_path) + docto.BASEURL = "https://127.0.0.1" + + responses.add( + responses.GET, + SEARCH_URL_FOR_MUNCHEN, + status=503 + ) + + # this should not raise an exception + for _ in docto.find_centers(["München"]): + pass + + +@responses.activate +def test_find_centers_de_returns_520_should_continue(tmp_path): + """ + Check that find_centers doesn't raise a ServerError in case of 503 HTTP response + """ + docto = DoctolibDE("roger.phillibert@gmail.com", + "1234", responses_dirname=tmp_path) + docto.BASEURL = "https://127.0.0.1" + + responses.add( + responses.GET, + SEARCH_URL_FOR_MUNCHEN, + status=520 + ) + + # this should not raise an exception + for _ in docto.find_centers(["München"]): + pass + + +@responses.activate +def test_find_centers_fr_returns_502_should_fail(tmp_path): """ Check that find_centers raises an error in case of non-whitelisted status code """ - docto = Doctolib("roger.phillibert@gmail.com", "1234", responses_dirname=tmp_path) + docto = DoctolibFR("roger.phillibert@gmail.com", + "1234", responses_dirname=tmp_path) docto.BASEURL = "https://127.0.0.1" responses.add( responses.GET, - "https://127.0.0.1/vaccination-covid-19/Paris?ref_visit_motive_ids%5B%5D=6970&ref_visit_motive_ids%5B%5D=7005", + "https://127.0.0.1/vaccination-covid-19/Paris?ref_visit_motive_ids%5B%5D=6970&ref_visit_motive_ids%5B%5D=6971&ref_visit_motive_ids%5B%5D=8192&ref_visit_motive_ids%5B%5D=7005&ref_visit_motive_ids%5B%5D=7004&ref_visit_motive_ids%5B%5D=8193&ref_visit_motive_ids%5B%5D=7945&ref_visit_motive_ids%5B%5D=7107&ref_visit_motive_ids%5B%5D=7108&page=1", status=502 ) @@ -42,3 +122,507 @@ def test_find_centers_returns_502_should_fail(tmp_path): with pytest.raises(ServerError): for _ in docto.find_centers(["Paris"]): pass + + +@responses.activate +def test_find_centers_de_returns_502_should_fail(tmp_path): + """ + Check that find_centers raises an error in case of non-whitelisted status code + """ + docto = DoctolibDE("roger.phillibert@gmail.com", + "1234", responses_dirname=tmp_path) + docto.BASEURL = "https://127.0.0.1" + + responses.add( + responses.GET, + SEARCH_URL_FOR_MUNCHEN, + status=502 + ) + + # this should raise an exception + with pytest.raises(ServerError): + for _ in docto.find_centers(["München"]): + pass + + +@responses.activate +def test_get_next_page_fr_should_return_2_on_page_1(tmp_path): + """ + Check that get_next_page returns 2 when we are on page 1 and there is a next page available + """ + + """ + Next (data-u decoded): /vaccination-covid-19-autres-professions-prioritaires/france?page=2&ref_visit_motive_ids%5B%5D=6970&ref_visit_motive_ids%5B%5D=7005 + """ + + htmlString = """ + + """ + doc = html.document_fromstring(htmlString) + + response = Response() + response._content = b'{}' + + centers_page = CentersPage(browser=Browser(), response=response) + centers_page.doc = doc + next_page = centers_page.get_next_page() + assert next_page == 2 + + +@responses.activate +def test_get_next_page_fr_should_return_3_on_page_2(tmp_path): + """ + Check that get_next_page returns 3 when we are on page 2 and next page is available + """ + + """ + Previous (data-u decoded): /vaccination-covid-19-autres-professions-prioritaires/france?ref_visit_motive_ids%5B%5D=6970&ref_visit_motive_ids%5B%5D=7005 + Next (data-u decoded): /vaccination-covid-19-autres-professions-prioritaires/france?page=3&ref_visit_motive_ids%5B%5D=6970&ref_visit_motive_ids%5B%5D=7005 + """ + + htmlString = """ + + """ + doc = html.document_fromstring(htmlString) + + response = Response() + response._content = b'{}' + + centers_page = CentersPage(browser=Browser(), response=response) + centers_page.doc = doc + next_page = centers_page.get_next_page() + assert next_page == 3 + + +@responses.activate +def test_get_next_page_fr_should_return_4_on_page_3(tmp_path): + """ + Check that get_next_page returns 4 when we are on page 3 and next page is available + """ + + """ + Previous (data-u decoded): /vaccination-covid-19-autres-professions-prioritaires/france?page=2&ref_visit_motive_ids%5B%5D=6970&ref_visit_motive_ids%5B%5D=7005 + Next (data-u decoded): /vaccination-covid-19-autres-professions-prioritaires/france?page=4&ref_visit_motive_ids%5B%5D=6970&ref_visit_motive_ids%5B%5D=7005 + """ + + htmlString = """ + + """ + doc = html.document_fromstring(htmlString) + + response = Response() + response._content = b'{}' + + centers_page = CentersPage(browser=Browser(), response=response) + centers_page.doc = doc + next_page = centers_page.get_next_page() + assert next_page == 4 + + +def test_get_next_page_fr_should_return_None_on_last_page(tmp_path): + """ + Check that get_next_page returns None when we are on the last page + """ + """ + Previous (data-u decoded): /vaccination-covid-19-autres-professions-prioritaires/france?page=7&ref_visit_motive_ids%5B%5D=6970&ref_visit_motive_ids%5B%5D=7005 + """ + + htmlString = """ + + """ + doc = html.document_fromstring(htmlString) + + response = Response() + response._content = b'{}' + + centers_page = CentersPage(browser=Browser(), response=response) + centers_page.doc = doc + next_page = centers_page.get_next_page() + assert next_page == None + + +@responses.activate +def test_get_next_page_de_should_return_2_on_page_1(tmp_path): + """ + Check that get_next_page returns 2 when we are on page 1 and next page is available + """ + + htmlString = """ + + """ + doc = html.document_fromstring(htmlString) + + response = Response() + response._content = b'{}' + + centers_page = CentersPage(browser=Browser(), response=response) + centers_page.doc = doc + next_page = centers_page.get_next_page() + assert next_page == 2 + + +@responses.activate +def test_get_next_page_de_should_return_3_on_page_2(tmp_path): + """ + Check that get_next_page returns 3 when we are on page 2 and next page is available + """ + + htmlString = """ + + """ + doc = html.document_fromstring(htmlString) + + response = Response() + response._content = b'{}' + + centers_page = CentersPage(browser=Browser(), response=response) + centers_page.doc = doc + next_page = centers_page.get_next_page() + assert next_page == 3 + + +@responses.activate +def test_get_next_page_de_should_return_4_on_page_3(tmp_path): + """ + Check that get_next_page returns 4 when we are on page 3 and next page is available + """ + + htmlString = """ + + """ + doc = html.document_fromstring(htmlString) + + response = Response() + response._content = b'{}' + + centers_page = CentersPage(browser=Browser(), response=response) + centers_page.doc = doc + next_page = centers_page.get_next_page() + assert next_page == 4 + + +def test_get_next_page_de_should_return_None_on_last_page(tmp_path): + """ + Check that get_next_page returns None when we are on the last page + """ + + htmlString = """ + + """ + doc = html.document_fromstring(htmlString) + + response = Response() + response._content = b'{}' + + centers_page = CentersPage(browser=Browser(), response=response) + centers_page.doc = doc + next_page = centers_page.get_next_page() + assert next_page == None + + +@responses.activate +def test_book_slots_should_succeed(tmp_path): + """ + Check that try_to_book calls all services successfully + """ + docto = DoctolibDE("roger.phillibert@gmail.com", + "1234", responses_dirname=tmp_path) + docto.BASEURL = "https://127.0.0.1" + docto.patient = { + "id": "patient-id", + "first_name": "Roger", + "last_name": "Phillibert" + } + + mock_search_result_id = { + "searchResultId": 1234567 + } + + mock_search_result_id_escaped_json = escape( + json.dumps(mock_search_result_id, separators=(',', ':'))) + + responses.add( + responses.GET, + ("https://127.0.0.1/impfung-covid-19-corona/K%C3%B6ln" + "?ref_visit_motive_ids%5B%5D=6768" + "&ref_visit_motive_ids%5B%5D=6769" + "&ref_visit_motive_ids%5B%5D=9039" + "&ref_visit_motive_ids%5B%5D=6936" + "&ref_visit_motive_ids%5B%5D=6937" + "&ref_visit_motive_ids%5B%5D=9040" + "&ref_visit_motive_ids%5B%5D=7978" + "&ref_visit_motive_ids%5B%5D=7109" + "&ref_visit_motive_ids%5B%5D=7110" + "&page=1"), + status=200, + body="
".format( + dataProps=mock_search_result_id_escaped_json) + ) + + with open(FIXTURES_FOLDER + '/search_result.json') as json_file: + mock_search_result = json.load(json_file) + + responses.add( + responses.GET, + SEARCH_URL_FOR_KOLN, + status=200, + body=json.dumps(mock_search_result) + ) + + with open(FIXTURES_FOLDER + '/doctor_response.json') as json_file: + mock_doctor_response = json.load(json_file) + + responses.add( + responses.GET, + "https://127.0.0.1/allgemeinmedizin/koeln/dr-dre?insurance_sector=public", + status=200, + body=json.dumps(mock_doctor_response) + ) + + responses.add( + responses.GET, + "https://127.0.0.1/booking/dr-dre.json", + status=200, + body=json.dumps(mock_doctor_response) + ) + + with open(FIXTURES_FOLDER + '/availabilities.json') as json_file: + mock_availabilities = json.load(json_file) + + responses.add( + responses.GET, + "https://127.0.0.1/availabilities.json?start_date=2021-06-01&visit_motive_ids=2920448&agenda_ids=&insurance_sector=public&practice_ids=234567&destroy_temporary=true&limit=3", + status=200, + body=json.dumps(mock_availabilities) + ) + responses.add( + responses.GET, + "https://127.0.0.1/availabilities.json?start_date=2021-06-01&visit_motive_ids=2746983&agenda_ids=&insurance_sector=public&practice_ids=234567&destroy_temporary=true&limit=3", + status=200, + body=json.dumps(mock_availabilities) + ) + + mock_appointments = { + "id": "appointment-id" + } + + responses.add( + responses.POST, + "https://127.0.0.1/appointments.json", + status=200, + body=json.dumps(mock_appointments) + ) + + mock_appointments_edit = { + "id": "appointment-edit-id", + "appointment": { + "custom_fields": {} + } + } + + responses.add( + responses.GET, + "https://127.0.0.1/appointments/appointment-id/edit.json", + status=200, + body=json.dumps(mock_appointments_edit) + ) + + responses.add( + responses.GET, + "https://127.0.0.1/second_shot_availabilities.json?start_date=2021-07-20&visit_motive_ids=2746983&agenda_ids=&first_slot=2021-06-10T08%3A40%3A00.000%2B02%3A00&insurance_sector=public&practice_ids=234567&limit=3", + status=200, + body=json.dumps(mock_availabilities) + ) + + mock_appointment_id_put = { + } + + responses.add( + responses.PUT, + "https://127.0.0.1/appointments/appointment-id.json", + status=200, + body=json.dumps(mock_appointment_id_put) + ) + + mock_appointment_id = { + "confirmed": True + } + + responses.add( + responses.GET, + "https://127.0.0.1/appointments/appointment-id.json", + status=200, + body=json.dumps(mock_appointment_id) + ) + + result_handled = False + for result in docto.find_centers(["Köln"]): + result_handled = True + + center = result['search_result'] + + # single shot vaccination + assert docto.try_to_book(center=center, + vaccine_list=["Janssen"], + start_date=datetime.date( + year=2021, month=6, day=1), + end_date=datetime.date( + year=2021, month=6, day=14), + only_second=False, + only_third=False, + dry_run=False) + assert len(responses.calls) == 10 + + # two shot vaccination + assert docto.try_to_book(center=center, + vaccine_list=["Pfizer"], + start_date=datetime.date( + year=2021, month=6, day=1), + end_date=datetime.date( + year=2021, month=6, day=14), + only_second=False, + only_third=False, + dry_run=False) + assert len(responses.calls) == 20 + pass + + assert result_handled + + +@responses.activate +def test_find_motive_should_ignore_second_shot(tmp_path): + """ + Check that find_motive ignores second shot motives + """ + + with open(FIXTURES_FOLDER + '/doctor_response.json') as json_file: + mock_doctor_response = json.load(json_file) + + response = Response() + response._content = b'{}' + + booking_page = CenterBookingPage(browser=Browser(), response=response) + booking_page.doc = mock_doctor_response + visit_motive_id = CenterBookingPage.find_motive( + booking_page, '.*(Pfizer)', False) + assert visit_motive_id == mock_doctor_response['data']['visit_motives'][1]['id'] + + visit_motive_id = CenterBookingPage.find_motive( + booking_page, '.*(Janssen)', True) + assert visit_motive_id == mock_doctor_response['data']['visit_motives'][3]['id'] diff --git a/test_cli_args.py b/test_cli_args.py new file mode 100644 index 0000000..72a21af --- /dev/null +++ b/test_cli_args.py @@ -0,0 +1,94 @@ +import responses +from unittest.mock import patch, MagicMock + +from doctoshotgun import Application, DoctolibDE, DoctolibFR, MasterPatientPage + +CENTERS = [ + { + "name_with_title": "Doktor", + "city": "koln", + }, + { + "name_with_title": "Doktor2", + "city": "koln", + }, + { + "name_with_title": "Doktor", + "city": "neuss", + }, +] + + +@responses.activate +@patch('doctoshotgun.DoctolibDE') +def test_center_arg_should_filter_centers(MockDoctolibDE, tmp_path): + """ + Check that booking is performed in correct city + """ + # prepare + mock_doctolib_de = get_mocked_doctolib(MockDoctolibDE) + + # call + center = 'Doktor' + city = 'koln' + call_application(city, cli_args=['--center', center]) + + # assert + assert mock_doctolib_de.get_patients.called + assert mock_doctolib_de.try_to_book.called + for call_args_list in mock_doctolib_de.try_to_book.call_args_list: + assert call_args_list.args[0]['name_with_title'] == center + assert call_args_list.args[0]['city'] == city + + +@responses.activate +@patch('doctoshotgun.DoctolibDE') +def test_center_exclude_arg_should_filter_excluded_centers(MockDoctolibDE, tmp_path): + """ + Check that booking is performed in correct city + """ + # prepare + mock_doctolib_de = get_mocked_doctolib(MockDoctolibDE) + + # call + excluded_center = 'Doktor' + city = 'koln' + call_application(city, cli_args=['--center-exclude', excluded_center]) + + # assert + assert mock_doctolib_de.get_patients.called + assert mock_doctolib_de.try_to_book.called + for call_args_list in mock_doctolib_de.try_to_book.call_args_list: + assert call_args_list.args[0]['name_with_title'] != excluded_center + assert call_args_list.args[0]['city'] == city + + +def get_mocked_doctolib(MockDoctolibDE): + mock_doctolib_de = MagicMock(wraps=DoctolibDE) + MockDoctolibDE.return_value = mock_doctolib_de + + mock_doctolib_de.vaccine_motives = DoctolibDE.vaccine_motives + mock_doctolib_de.KEY_PFIZER = DoctolibDE.KEY_PFIZER + mock_doctolib_de.KEY_MODERNA = DoctolibDE.KEY_MODERNA + mock_doctolib_de.KEY_JANSSEN = DoctolibDE.KEY_JANSSEN + + mock_doctolib_de.get_patients.return_value = [ + {"first_name": 'First', "last_name": 'Name'} + ] + mock_doctolib_de.do_login.return_value = True + + mock_doctolib_de.find_centers.return_value = CENTERS + + mock_doctolib_de.try_to_book.return_value = True + + mock_doctolib_de.load_state.return_value = None + mock_doctolib_de.dump_state.return_value = {} + + return mock_doctolib_de + + +def call_application(city, cli_args=[]): + assert 0 == Application.main( + Application(), + cli_args=["de", city, "roger.phillibert@gmail.com", "1234"] + cli_args + ) diff --git a/test_fixtures/availabilities.json b/test_fixtures/availabilities.json new file mode 100644 index 0000000..51bd17e --- /dev/null +++ b/test_fixtures/availabilities.json @@ -0,0 +1,20 @@ +{ + "availabilities": [ + { + "date": "2021-06-10", + "slots": [ + { + "start_date": "2021-06-10T08:30:00.000+02:00", + "steps": [{}, { + "start_date": "2021-07-20T08:30:00.000+02:00" + }] + },{ + "start_date": "2021-06-10T08:40:00.000+02:00", + "steps": [{}, { + "start_date": "2021-07-20T08:40:00.000+02:00" + }] + } + ] + } + ] +} \ No newline at end of file diff --git a/test_fixtures/doctor_response.json b/test_fixtures/doctor_response.json new file mode 100644 index 0000000..f5beec3 --- /dev/null +++ b/test_fixtures/doctor_response.json @@ -0,0 +1,51 @@ +{ + "data": { + "profile": { + "id": 9876543 + }, + "visit_motives": [ + { + "id": 2741702, + "name": "Erstimpfung Covid-19 (AstraZeneca)", + "vaccination_days_range": 42, + "first_shot_motive": true, + "allow_new_patients": true + }, + { + "id": 2746983, + "name": "Erstimpfung Covid-19 (BioNTech-Pfizer)", + "vaccination_days_range": 21, + "first_shot_motive": true, + "allow_new_patients": true + }, + { + "id": 2746984, + "name": "Zweitimpfung Covid-19 (BioNTech-Pfizer)", + "vaccination_days_range": 0, + "first_shot_motive": false, + "allow_new_patients": true + }, + { + "id": 2920448, + "name": "Einzelimpfung Covid-19 (Janssen)", + "vaccination_days_range": 0, + "first_shot_motive": false, + "allow_new_patients": true + } + ], + "agendas": [ + { + "booking_disabled": false, + "visit_motive_ids": [] + } + ], + "places": [ + { + "name": "Praxis Prof. Dr. med. Dre", + "practice_ids": [ + 234567 + ] + } + ] + } +} \ No newline at end of file diff --git a/test_fixtures/search_result.json b/test_fixtures/search_result.json new file mode 100644 index 0000000..53c02ee --- /dev/null +++ b/test_fixtures/search_result.json @@ -0,0 +1,7 @@ +{ + "search_result": { + "search_result": { + "url": "/allgemeinmedizin/koeln/dr-dre?insurance_sector=public" + } + } +}