66from typing import TYPE_CHECKING , Any
77
88import sentry_sdk
9- import uvloop
109from sanic import Request , Sanic
1110from sanic .log import logger
1211from sanic .response import BaseHTTPResponse
13- from sanic .worker . loader import AppLoader
12+ from sanic .signals import Event
1413from sentry_sdk .integrations .asyncio import AsyncioIntegration
1514from sentry_sdk .integrations .grpc import GRPCIntegration
1615from sentry_sdk .integrations .sanic import SanicIntegration , _context_enter , _context_exit , _set_transaction
2120from renku_data_services .authz .admin_sync import sync_admins_from_keycloak
2221from renku_data_services .base_models .core import APIUser
2322from renku_data_services .data_api .app import register_all_handlers
24- from renku_data_services .data_api .prometheus import collect_system_metrics , setup_app_metrics , setup_prometheus
23+ from renku_data_services .data_api .prometheus import collect_system_metrics , setup_prometheus
2524from renku_data_services .errors .errors import (
2625 ForbiddenError ,
2726 MissingResourceError ,
3837 import sentry_sdk ._types
3938
4039
41- async def _send_messages (app : Sanic ) -> None :
42- config = Config .from_env ()
40+ async def send_pending_events (app : Sanic , config : Config ) -> None :
41+ """Send messages to the message queue."""
42+ logger .info ("Sending events to message queue." )
4343 while True :
4444 try :
4545 await config .event_repo .send_pending_events ()
46- # we need to collect metrics for this background process separately from the task we add to the
47- # server processes
48- await collect_system_metrics (app , "send_events_worker" )
4946 await asyncio .sleep (1 )
5047 except (asyncio .CancelledError , KeyboardInterrupt ) as e :
5148 logger .warning (f"Exiting: { e } " )
@@ -55,8 +52,9 @@ async def _send_messages(app: Sanic) -> None:
5552 raise
5653
5754
58- async def _update_search (app : Sanic ) -> None :
59- config = Config .from_env ()
55+ async def update_search (app : Sanic , config : Config ) -> None :
56+ """Send updates to solr."""
57+ logger .info ("Sending updates to search" )
6058 while True :
6159 try :
6260 async with DefaultSolrClient (config .solr_config ) as client :
@@ -73,49 +71,17 @@ async def _update_search(app: Sanic) -> None:
7371 raise
7472
7573
76- async def _solr_reindex (app : Sanic ) -> None :
74+ async def solr_reindex (app : Sanic , config : Config ) -> None :
7775 """Run a solr reindex of all data.
7876
7977 This might be required after migrating the solr schema.
8078 """
81- config = Config . from_env ( )
79+ logger . info ( "starting solr reindex" )
8280 reprovision = config .search_reprovisioning
8381 admin = APIUser (is_admin = True )
8482 await reprovision .run_reprovision (admin )
8583
8684
87- def send_pending_events (app_name : str ) -> None :
88- """Send pending messages to redis."""
89- app = Sanic (app_name )
90- setup_app_metrics (app )
91-
92- logger .info ("running events sending loop." )
93-
94- asyncio .set_event_loop (uvloop .new_event_loop ())
95- asyncio .run (_send_messages (app ))
96-
97-
98- def update_search (app_name : str ) -> None :
99- """Update the SOLR with data from the search staging table."""
100- app = Sanic (app_name )
101- setup_app_metrics (app )
102-
103- logger .info ("Running update search loop." )
104-
105- asyncio .set_event_loop (uvloop .new_event_loop ())
106- asyncio .run (_update_search (app ))
107-
108-
109- def solr_reindex (app_name : str ) -> None :
110- """Runs a solr reindex."""
111- app = Sanic (app_name )
112- setup_app_metrics (app )
113-
114- logger .info ("Running SOLR reindex triggered by a migration" )
115- asyncio .set_event_loop (uvloop .new_event_loop ())
116- asyncio .run (_solr_reindex (app ))
117-
118-
11985def create_app () -> Sanic :
12086 """Create a Sanic application."""
12187 config = Config .from_env ()
@@ -207,29 +173,30 @@ async def setup_rclone_validator(app: Sanic) -> None:
207173 validator = RCloneValidator ()
208174 app .ext .dependency (validator )
209175
210- @app .main_process_ready
211- async def ready (app : Sanic ) -> None :
176+ @app .signal ( Event . SERVER_INIT_AFTER )
177+ async def ready (app : Sanic , loop : asyncio . AbstractEventLoop ) -> None :
212178 """Application ready event handler."""
213- logger .info ("starting events background job ." )
214- app .manager . manage ( "SendEvents" , send_pending_events , { "app_name" : app . name }, transient = True )
215- app .manager . manage ( "UpdateSearch" , update_search , { "app_name" : app . name }, transient = True )
179+ logger .info ("Starting background tasks.. ." )
180+ app .add_task ( send_pending_events ( app , config ), name = "SendEvents" )
181+ app .add_task ( update_search ( app , config ), name = "UpdateSearch" )
216182 if getattr (app .ctx , "solr_reindex" , False ):
217- app .manager . manage ( "SolrReindex" , solr_reindex , { "app_name" : app . name }, transient = True )
183+ app .add_task ( solr_reindex ( app , config ), name = "SolrReindex" )
218184
219185 return app
220186
221187
188+ app = create_app ()
189+
222190if __name__ == "__main__" :
223191 parser = argparse .ArgumentParser (prog = "Renku Data Services" )
224192 # NOTE: K8s probes will fail if listening only on 127.0.0.1 - so we listen on 0.0.0.0
225193 parser .add_argument ("-H" , "--host" , default = "0.0.0.0" , help = "Host to listen on" ) # nosec B104
226194 parser .add_argument ("-p" , "--port" , default = 8000 , type = int , help = "Port to listen on" )
227195 parser .add_argument ("--debug" , action = "store_true" , help = "Enable Sanic debug mode" )
228196 parser .add_argument ("--fast" , action = "store_true" , help = "Enable Sanic fast mode" )
197+ parser .add_argument ("--workers" , default = 1 , type = int , help = "The number of workers to use." )
229198 parser .add_argument ("-d" , "--dev" , action = "store_true" , help = "Enable Sanic development mode" )
230199 parser .add_argument ("--single-process" , action = "store_true" , help = "Do not use multiprocessing." )
231200 args : dict [str , Any ] = vars (parser .parse_args ())
232- loader = AppLoader (factory = create_app )
233- app = loader .load ()
234- app .prepare (** args )
235- Sanic .serve (primary = app , app_loader = loader )
201+ logger .info (f"The args for serving are: { args } " )
202+ app .run (** args )
0 commit comments