Skip to content

Commit 7df0ad5

Browse files
authored
Merge pull request #79 from nextcloud/feat/taskprocessing-trigger
feat: Implement taskrocessing trigger event
2 parents 9f557d5 + 5224144 commit 7df0ad5

File tree

3 files changed

+355
-34
lines changed

3 files changed

+355
-34
lines changed

ex_app/lib/main.py

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
# SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors
22
# SPDX-License-Identifier: AGPL-3.0-or-later
3+
import concurrent.futures
34
import os
45
import traceback
56
from contextlib import asynccontextmanager
67
from json import JSONDecodeError
78
from threading import Event
89
import asyncio
910

10-
import httpx
11+
from niquests import RequestException
1112
import json
1213
from fastapi import FastAPI
1314
from nc_py_api import NextcloudApp, NextcloudException
@@ -39,6 +40,9 @@
3940
fast_app = FastAPI(lifespan=http_mcp_app.lifespan)
4041

4142
app_enabled = Event()
43+
TRIGGER = Event()
44+
IDLE_POLLING_INTERVAL = 5
45+
IDLE_POLLING_INTERVAL_WITH_TRIGGER = 5 * 60
4246

4347
LOCALE_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "locale")
4448
current_translator = ContextVar("current_translator")
@@ -52,9 +56,14 @@ async def lifespan(app: FastAPI):
5256
async with exapp_lifespan(app):
5357
async with http_mcp_app.lifespan(app):
5458
yield
59+
5560
@asynccontextmanager
5661
async def exapp_lifespan(app: FastAPI):
57-
set_handlers(app, enabled_handler)
62+
set_handlers(
63+
app,
64+
enabled_handler,
65+
trigger_handler=trigger_handler,
66+
)
5867
start_bg_task()
5968
nc = NextcloudApp()
6069
if nc.enabled_state:
@@ -135,29 +144,23 @@ async def background_thread_task():
135144
try:
136145
response = nc.providers.task_processing.next_task([provider.id], [provider.task_type])
137146
if not response or not 'task' in response:
138-
await asyncio.sleep(2)
147+
await wait_for_task()
139148
continue
140-
except (NextcloudException, httpx.RequestError, JSONDecodeError) as e:
149+
except (NextcloudException, RequestException, JSONDecodeError) as e:
141150
tb_str = ''.join(traceback.format_exception(e))
142151
log(nc, LogLvl.WARNING, "Error fetching the next task " + tb_str)
143-
await asyncio.sleep(5)
152+
await wait_for_task(5)
144153
continue
145-
except (
146-
httpx.RemoteProtocolError,
147-
httpx.ReadError,
148-
httpx.LocalProtocolError,
149-
httpx.PoolTimeout,
150-
) as e:
154+
except RequestException as e:
151155
log(nc, LogLvl.DEBUG, "Ignored error during task polling")
152-
await asyncio.sleep(2)
156+
await wait_for_task(2)
153157
continue
154158

155159
task = response["task"]
156160
log(nc, LogLvl.INFO, 'New Task incoming')
157161
log(nc, LogLvl.DEBUG, str(task))
158162
log(nc, LogLvl.INFO, str({'input': task['input']['input'], 'confirmation': task['input']['confirmation'], 'conversation_token': '<skipped>'}))
159163
asyncio.create_task(handle_task(task, nc))
160-
await asyncio.sleep(5)
161164

162165

163166
async def handle_task(task, nc: NextcloudApp):
@@ -171,7 +174,7 @@ async def handle_task(task, nc: NextcloudApp):
171174
log(nc, LogLvl.ERROR, "Error: " + tb_str)
172175
try:
173176
nc.providers.task_processing.report_result(task["id"], error_message=str(e))
174-
except (NextcloudException, httpx.RequestError) as net_err:
177+
except (NextcloudException, RequestException) as net_err:
175178
tb_str = ''.join(traceback.format_exception(net_err))
176179
log(nc, LogLvl.WARNING, "Network error in reporting the error: " + tb_str)
177180
return
@@ -180,7 +183,7 @@ async def handle_task(task, nc: NextcloudApp):
180183
task["id"],
181184
output,
182185
)
183-
except (NextcloudException, httpx.RequestError, JSONDecodeError) as e:
186+
except (NextcloudException, RequestException, JSONDecodeError) as e:
184187
tb_str = ''.join(traceback.format_exception(e))
185188
log(nc, LogLvl.ERROR, "Network error trying to report the task result: " + tb_str)
186189

@@ -190,6 +193,29 @@ def start_bg_task():
190193
loop = asyncio.get_event_loop()
191194
loop.create_task(background_thread_task())
192195

196+
# Trigger event is available starting with nextcloud v33
197+
def trigger_handler(providerId: str):
198+
global TRIGGER
199+
TRIGGER.set()
200+
201+
# Waits for interval seconds or IDLE_POLLING_INTERVAL seconds
202+
# but can return earlier when TRIGGER event is received from nextcloud
203+
# if the trigger event is received, IDLE_POLLING_INTERVAL is set to IDLE_POLLING_INTERVAL_WITH_TRIGGER
204+
async def wait_for_task(interval = None):
205+
global TRIGGER
206+
global IDLE_POLLING_INTERVAL
207+
global IDLE_POLLING_INTERVAL_WITH_TRIGGER
208+
if interval is None:
209+
interval = IDLE_POLLING_INTERVAL
210+
# Call TRIGGER.wait() in a separate thread
211+
loop = asyncio.get_running_loop()
212+
with concurrent.futures.ThreadPoolExecutor() as pool:
213+
was_event = await loop.run_in_executor(pool, TRIGGER.wait, interval)
214+
if was_event:
215+
IDLE_POLLING_INTERVAL = IDLE_POLLING_INTERVAL_WITH_TRIGGER
216+
TRIGGER.clear()
217+
218+
193219
APP.mount("/mcp", http_mcp_app)
194220

195221
if __name__ == "__main__":

0 commit comments

Comments
 (0)