diff --git a/backend/src/crawling/crawling_service.py b/backend/src/crawling/crawling_service.py index 3a56736..bf5250a 100644 --- a/backend/src/crawling/crawling_service.py +++ b/backend/src/crawling/crawling_service.py @@ -8,7 +8,7 @@ from src.crawling.communicators import Communicators from src.database.database_schemas import Links, Notifications, Scripts -from src.crawling.scheduler import scheduler +from src.crawling.scheduler_api import scheduler from src.user import user_service from .models.crawl_data_model import CrawlData, CrawlDataCreate diff --git a/backend/src/crawling/scheduler.py b/backend/src/crawling/scheduler.py index 1e0cbb5..83b50be 100644 --- a/backend/src/crawling/scheduler.py +++ b/backend/src/crawling/scheduler.py @@ -20,21 +20,19 @@ class Scheduler: + __kill_switch = False __waiting_crawls: asyncio.PriorityQueue __crawls_not_in_queue_num: int __running_crawls_futures = [] __crawls_to_update_or_delete = {} - async def __create(self): + async def create(self): self.__waiting_crawls = asyncio.PriorityQueue( maxsize=MAX_WAITING_CRAWLS_QUEUE_SIZE ) self.__crawls_not_in_queue_num = 0 await self.reload_crawls() - async def run_scheduler(self): - await self.__create() - await self.__run() async def reload_crawls(self): self.__waiting_crawls = asyncio.PriorityQueue( @@ -58,20 +56,23 @@ async def reload_crawls(self): ) ) + async def add_crawl(self, crawl: CrawlData): await self.__waiting_crawls.put((time.time() + crawl.period, crawl)) + def update_crawl(self, crawl: CrawlData): self.__crawls_to_update_or_delete.update([(crawl.crawl_id, crawl)]) + def delete_crawl(self, crawl_id): self.__crawls_to_update_or_delete.update([(crawl_id, None)]) - async def __run(self): + async def run(self): loop = asyncio.get_event_loop() asyncio.create_task(self.__remove_done_futures()) - while True: + while not self.__kill_switch: logger.log(level=logging.DEBUG, msg=f"Number of waiting crawls = {self.__waiting_crawls.qsize()}.") moment_of_exec, crawl = await self.__waiting_crawls.get() @@ -87,7 +88,7 @@ async def __run(self): if time_to_wait > 0: await self.__waiting_crawls.put((moment_of_exec, crawl)) - self.__crawls_not_in_queue_num -=1 + self.__crawls_not_in_queue_num -= 1 await asyncio.sleep(1) else: logger.log(level=logging.DEBUG, msg=f"Running check for change for crawl {crawl.name}. Number of running crawls = {len(self.__running_crawls_futures)}.") @@ -97,40 +98,45 @@ async def __run(self): async def __run_check_for_change(self, crawl, loop): while len(self.__running_crawls_futures) >= MAX_RUNNING_CRAWLS: await asyncio.sleep(1) - self.__running_crawls_futures.append(asyncio.run_coroutine_threadsafe(self.__check_for_change(crawl), loop)) + crawl_future = asyncio.run_coroutine_threadsafe(self.__check_for_change(crawl), loop) + self.__running_crawls_futures.append(crawl_future) async def __check_for_change(self, crawl): - logger.log(level=logging.DEBUG, msg=f"Checking crawl {crawl.name}") - current_value = await data_selector(url=crawl.url, xpath=crawl.xpath) - if current_value != crawl.element_value: - msg = ( - " Name: " - + crawl.name - + "\n" - + " Old value: " - + crawl.element_value - + "\n" - + " New value: " - + str(current_value) - ) - logger.log(level=logging.DEBUG, msg="Value changed! \n" + msg) - - update_element_value_in_db(crawl.crawl_id, current_value) - crawl.element_value = current_value - - screenshot_name = crawl.name.replace(" ", "_") - screenshot_path = await take_screenshot(crawl.url, filename=screenshot_name) - send_email( - crawl.email, - screenshot_path, - f"Your crawl \"{crawl.name}\" has new value!" - ) - os.remove(screenshot_path) + try: + logger.log(level=logging.DEBUG, msg=f"Checking crawl {crawl.name}") + current_value = await data_selector(url=crawl.url, xpath=crawl.xpath) + + if current_value != crawl.element_value: + msg = ( + " Name: " + + crawl.name + + "\n" + + " Old value: " + + crawl.element_value + + "\n" + + " New value: " + + str(current_value) + ) + logger.log(level=logging.DEBUG, msg="Value changed! \n" + msg) - await self.__waiting_crawls.put((time.time() + crawl.period, crawl)) - self.__crawls_not_in_queue_num -=1 + update_element_value_in_db(crawl.crawl_id, current_value) + crawl.element_value = current_value + screenshot_name = crawl.name.replace(" ", "_") + screenshot_path = await take_screenshot(crawl.url, filename=screenshot_name) + send_email( + crawl.email, + screenshot_path, + f"Your crawl \"{crawl.name}\" has new value!" + ) + os.remove(screenshot_path) + + finally: + await self.__waiting_crawls.put((time.time() + crawl.period, crawl)) + self.__crawls_not_in_queue_num -= 1 + + async def __remove_done_futures(self): while True: for i, future in enumerate(self.__running_crawls_futures): @@ -139,6 +145,10 @@ async def __remove_done_futures(self): await asyncio.sleep(1) + def kill(self): + self.__kill_switch = True + + def update_element_value_in_db(crawl_id, element_value): db = next(get_db()) db.query(Scripts) \ @@ -153,6 +163,3 @@ def update_element_value_in_db(crawl_id, element_value): def get_all_links(): db = next(get_db()) return db.query(Links).all() - - -scheduler = Scheduler() diff --git a/backend/src/crawling/scheduler_api.py b/backend/src/crawling/scheduler_api.py new file mode 100644 index 0000000..0db41dd --- /dev/null +++ b/backend/src/crawling/scheduler_api.py @@ -0,0 +1,31 @@ +import logging + +from .models.crawl_data_model import CrawlData +from .scheduler import Scheduler + + +logger = logging.getLogger("Noticrawl") + +scheduler = Scheduler() + + +async def run_scheduler(): + while True: + try: + await scheduler.create() + await scheduler.run() + except: + logger.log(level=logging.DEBUG, msg="Exception in scheduler!") + scheduler.reload_crawls() + + +async def add_crawl(crawl: CrawlData): + await scheduler.add_crawl(crawl) + + +def update_crawl(crawl: CrawlData): + scheduler.update_crawl(crawl) + + +def delete_crawl(crawl_id): + scheduler.delete_crawl(crawl_id) diff --git a/backend/src/helpers/crawling.py b/backend/src/helpers/crawling.py index 00c4161..630decf 100644 --- a/backend/src/helpers/crawling.py +++ b/backend/src/helpers/crawling.py @@ -7,35 +7,40 @@ logger = logging.getLogger("Helpers") async def data_selector(url, xpath): - logging.getLogger("websockets").setLevel("WARN") - browser = await pyppeteer.launch( - headless=True, args=["--no-sandbox"], logLevel="WARN" - ) - page = await browser.newPage() - await page.goto(url, waitUntil="networkidle2", timeout=600000) - await page.waitForXPath(xpath) - xpath_content = await page.xpath(xpath) - text_content = await page.evaluate( - "(xpath_content) => xpath_content.textContent", xpath_content[0] - ) - await page.close() - await browser.close() - return text_content + try: + logging.getLogger("websockets").setLevel("WARN") + browser = await pyppeteer.launch( + headless=True, args=["--no-sandbox"], logLevel="WARN" + ) + page = await browser.newPage() + await page.goto(url, waitUntil="networkidle2", timeout=600000) + await page.waitForXPath(xpath) + xpath_content = await page.xpath(xpath) + text_content = await page.evaluate( + "(xpath_content) => xpath_content.textContent", xpath_content[0] + ) + return text_content + finally: + await page.close() + await browser.close() async def take_screenshot(url, filename="screenshot", directory="/app/logs/screenshots"): - filename = filename + datetime.now().strftime("_%d-%m-%Y_%H-%M-%S-%f") + ".png" - path = directory + "/" + filename - os.makedirs(os.path.dirname(path), exist_ok=True) - - logging.getLogger("websockets").setLevel("WARN") - browser = await pyppeteer.launch( - headless=True, args=["--no-sandbox"], logLevel="WARN" - ) - page = await browser.newPage() - await page.goto(url, waitUntil="networkidle2", timeout=600000) - await page.screenshot(path=path, fullPage=True) - await page.close() - await browser.close() - - return path + try: + filename = filename + datetime.now().strftime("_%d-%m-%Y_%H-%M-%S-%f") + ".png" + path = directory + "/" + filename + os.makedirs(os.path.dirname(path), exist_ok=True) + + logging.getLogger("websockets").setLevel("WARN") + browser = await pyppeteer.launch( + headless=True, args=["--no-sandbox"], logLevel="WARN" + ) + page = await browser.newPage() + await page.goto(url, waitUntil="networkidle2", timeout=600000) + await page.screenshot(path=path, fullPage=True) + return path + + finally: + await page.close() + await browser.close() + diff --git a/backend/src/main.py b/backend/src/main.py index c4bfb11..cdc59df 100644 --- a/backend/src/main.py +++ b/backend/src/main.py @@ -8,7 +8,7 @@ from .auth.auth_controller import auth_router from .auth.auth_service import verify_token from .crawling.crawling_controller import crawling_router -from .crawling.scheduler import scheduler +from .crawling.scheduler_api import run_scheduler from .database import database_schemas from .helpers.status_code_model import StatusCodeBase from .user.user_controller import user_router @@ -22,7 +22,7 @@ database_schemas.create() -asyncio.create_task(scheduler.run_scheduler()) +asyncio.create_task(run_scheduler()) app.mount("/static", StaticFiles(directory="../frontend/build/static"), name="static") templates = Jinja2Templates(directory="../frontend/build")