Skip to content

Commit 3b96719

Browse files
authored
Merge pull request #57 from nextcloud/feat/trigger-taskprocessing
feat: Add support for taskprocessing trigger event
2 parents c6c9b2d + 980c2c2 commit 3b96719

4 files changed

Lines changed: 99 additions & 72 deletions

File tree

.github/workflows/integration_test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ jobs:
2626
# do not stop on another job's failure
2727
fail-fast: false
2828
matrix:
29-
php-versions: [ '8.1' ]
29+
php-versions: [ '8.2' ]
3030
databases: [ 'sqlite' ]
3131
server-versions: [ 'master' ]
3232

lib/main.py

Lines changed: 90 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import threading
2+
import traceback
23
from contextlib import asynccontextmanager
4+
from threading import Event
35
from time import perf_counter, sleep
46
import os
57
import logging
@@ -40,7 +42,6 @@
4042
)
4143
LOGGER = logging.getLogger(os.environ["APP_ID"])
4244
LOGGER.setLevel(logging.DEBUG)
43-
ENABLED_FLAG = NextcloudApp().enabled_state
4445

4546

4647
def load_models():
@@ -62,18 +63,26 @@ def create_model_loader(file_path):
6263

6364
return lambda: WhisperModel(file_path, device=device)
6465

66+
ENABLED = Event()
6567

68+
TRIGGER = Event()
69+
WAIT_INTERVAL = 5
70+
WAIT_INTERVAL_WITH_TRIGGER = 5 * 60
6671
models = load_models()
6772

6873
@asynccontextmanager
6974
async def lifespan(_app: FastAPI):
75+
global ENABLED
7076
setup_nextcloud_logging("stt_whisper2", logging_level=logging.WARNING)
7177
set_handlers(
7278
APP,
7379
enabled_handler,
80+
trigger_handler=trigger_handler
7481
)
75-
t = BackgroundProcessTask()
76-
t.start()
82+
nc = NextcloudApp()
83+
if nc.enabled_state:
84+
ENABLED.set()
85+
start_bg_task()
7786
yield
7887

7988

@@ -85,73 +94,76 @@ def get_file(nc, task_id, file_id):
8594
LAST_MODEL_NAME = None
8695
LAST_MODEL = None
8796

88-
class BackgroundProcessTask(threading.Thread):
89-
def run(self, *args, **kwargs): # pylint: disable=unused-argument
90-
global ENABLED_FLAG
91-
global LAST_MODEL_NAME
92-
global LAST_MODEL
93-
94-
nc = NextcloudApp()
95-
while True:
96-
if not ENABLED_FLAG:
97-
sleep(30)
98-
ENABLED_FLAG = nc.enabled_state
99-
continue
97+
def start_bg_task():
98+
t = threading.Thread(target=background_thread_task)
99+
t.start()
100100

101-
try:
102-
next = nc.providers.task_processing.next_task([f'stt_whisper2:{model_name}' for model_name, _ in models.items()], ['core:audio2text'])
103-
if not 'task' in next or next is None:
104-
sleep(5)
105-
continue
106-
task = next.get('task')
107-
except Exception as e:
108-
LOGGER.error(str(e))
109-
sleep(5)
101+
def background_thread_task():
102+
global ENABLED
103+
global LAST_MODEL_NAME
104+
global LAST_MODEL
105+
106+
nc = NextcloudApp()
107+
while True:
108+
while not ENABLED.is_set():
109+
sleep(5)
110+
111+
try:
112+
next = nc.providers.task_processing.next_task([f'stt_whisper2:{model_name}' for model_name, _ in models.items()], ['core:audio2text'])
113+
if not 'task' in next or next is None:
114+
wait_for_task()
110115
continue
116+
task = next.get('task')
117+
except Exception as e:
118+
LOGGER.error(str(e) + "\n" + "".join(traceback.format_exception(e)))
119+
wait_for_task(10)
120+
continue
121+
try:
122+
LOGGER.info(f"Next task: {task['id']}")
123+
model_name = next.get("provider").get('name').split(':', 2)[1]
124+
LOGGER.info( f"model: {model_name}")
125+
if LAST_MODEL_NAME == model_name:
126+
model = LAST_MODEL
127+
else:
128+
model_load = models.get(model_name)
129+
if model_load is None:
130+
nc.providers.task_processing.report_result(
131+
task["id"], None, "Requested model is not available"
132+
)
133+
continue
134+
model = model_load()
135+
LAST_MODEL_NAME = model_name
136+
LAST_MODEL = model
137+
138+
LOGGER.info("generating transcription")
139+
time_start = perf_counter()
140+
file_name = get_file(nc, task["id"], task.get("input").get('input'))
141+
segments, info = model.transcribe(file_name)
142+
transcript = ''
143+
for segment in segments:
144+
transcript += segment.text
145+
percentage = ( segment.start / info.duration ) * 100
146+
nc.providers.task_processing.set_progress(task['id'], percentage)
147+
del model
148+
LOGGER.info(f"transcription generated: {perf_counter() - time_start}s")
149+
150+
nc.providers.task_processing.report_result(
151+
task["id"],
152+
{'output': str(transcript)},
153+
)
154+
except Exception as e: # noqa
111155
try:
112-
LOGGER.info(f"Next task: {task['id']}")
113-
model_name = next.get("provider").get('name').split(':', 2)[1]
114-
LOGGER.info( f"model: {model_name}")
115-
if LAST_MODEL_NAME == model_name:
116-
model = LAST_MODEL
117-
else:
118-
model_load = models.get(model_name)
119-
if model_load is None:
120-
NextcloudApp().providers.task_processing.report_result(
121-
task["id"], None, "Requested model is not available"
122-
)
123-
continue
124-
model = model_load()
125-
LAST_MODEL_NAME = model_name
126-
LAST_MODEL = model
127-
128-
LOGGER.info("generating transcription")
129-
time_start = perf_counter()
130-
file_name = get_file(nc, task["id"], task.get("input").get('input'))
131-
segments, _ = model.transcribe(file_name)
132-
del model
133-
LOGGER.info(f"transcription generated: {perf_counter() - time_start}s")
134-
135-
transcript = ''
136-
for segment in segments:
137-
transcript += segment.text
138-
NextcloudApp().providers.task_processing.report_result(
139-
task["id"],
140-
{'output': str(transcript)},
141-
)
142-
except Exception as e: # noqa
143-
try:
144-
LOGGER.error(str(e))
145-
nc.providers.task_processing.report_result(task["id"], None, str(e))
146-
except:
147-
pass
156+
LOGGER.error(str(e) + "\n" + "".join(traceback.format_exception(e)))
157+
nc.providers.task_processing.report_result(task["id"], None, str(e))
158+
except:
159+
pass
148160

149161

150162
async def enabled_handler(enabled: bool, nc: AsyncNextcloudApp) -> str:
151-
global ENABLED_FLAG
163+
global ENABLED
152164

153-
ENABLED_FLAG = enabled
154165
if enabled is True:
166+
ENABLED.set()
155167
LOGGER.info("Hello from %s", nc.app_cfg.app_name)
156168
for model_name, _ in models.items():
157169
await nc.providers.task_processing.register(TaskProcessingProvider(
@@ -161,11 +173,27 @@ async def enabled_handler(enabled: bool, nc: AsyncNextcloudApp) -> str:
161173
expected_runtime=120,
162174
))
163175
else:
176+
ENABLED.clear()
164177
LOGGER.info("Bye bye from %s", nc.app_cfg.app_name)
165178
for model_name, _ in models.items():
166179
await nc.providers.task_processing.unregister(f'stt_whisper2:{model_name}', True)
167180
return ""
168181

169182

183+
def trigger_handler(providerId: str):
184+
global TRIGGER
185+
TRIGGER.set()
186+
187+
# Waits for `interval` seconds or WAIT_INTERVAL
188+
# In case a TRIGGER event comes in, WAIT_INTERVAL is set (increased) to WAIT_INTERVAL_WITH_TRIGGER
189+
def wait_for_task(interval = None):
190+
global WAIT_INTERVAL
191+
global WAIT_INTERVAL_WITH_TRIGGER
192+
if interval is None:
193+
interval = WAIT_INTERVAL
194+
if TRIGGER.wait(timeout=interval):
195+
WAIT_INTERVAL = WAIT_INTERVAL_WITH_TRIGGER
196+
TRIGGER.clear()
197+
170198
if __name__ == "__main__":
171199
run_app("main:APP", log_level="trace")

lib/ocs.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import typing
33
from json import loads
44

5-
from httpx import Response, codes, ReadTimeout
5+
from niquests import Response, codes, RequestException
66
from nc_py_api import NextcloudException
77

88

@@ -24,7 +24,7 @@ def check_error(response: Response, info: str = ""):
2424
else:
2525
phrase = "Unknown error"
2626
raise NextcloudException(status_code, reason=phrase, info=info)
27-
if not codes.is_error(status_code):
27+
if status_code < 400:
2828
return
2929
raise NextcloudException(status_code, reason=codes(status_code).phrase, info=info)
3030

@@ -42,12 +42,9 @@ def ocs(
4242
ncSession.init_adapter()
4343
info = f"request: {method} {path}"
4444
nested_req = kwargs.pop("nested_req", False)
45-
try:
46-
response: Response = ncSession.adapter.request(
47-
method, path, content=content, json=json, params=params, files=files, **kwargs
48-
)
49-
except ReadTimeout:
50-
raise NextcloudException(408, info=info) from None
45+
response: Response = ncSession.adapter.request(
46+
method, path, data=content, json=json, params=params, files=files, **kwargs
47+
)
5148
if response.status_code >= 400:
5249
print(loads(response.text))
5350
check_error(response, info)

requirements.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1-
nc_py_api[app]==0.20.2
1+
nc_py_api[app]==0.22.0
22
pydantic
33
faster-whisper
44
python-multipart
55
ctranslate2==4.4.0
6+
niquests
7+
requests

0 commit comments

Comments
 (0)