Skip to content

Commit 7c8dca0

Browse files
committed
add async to query method
1 parent 6fa3cdc commit 7c8dca0

File tree

1 file changed

+29
-7
lines changed

1 file changed

+29
-7
lines changed

omniduct/databases/base.py

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import sqlparse
1111
from decorator import decorator
1212
from jinja2 import StrictUndefined, Template
13+
from concurrent.futures import ThreadPoolExecutor
1314

1415
from . import cursor_formatters
1516
from omniduct.caches.base import cached_method
@@ -18,6 +19,7 @@
1819
from omniduct.utils.docs import quirk_docs
1920
from omniduct.utils.magics import MagicsProvider, process_line_arguments, process_line_cell_arguments
2021

22+
2123
logging.getLogger('requests').setLevel(logging.WARNING)
2224

2325

@@ -236,15 +238,35 @@ def query(self, statement, format=None, format_opts={}, **kwargs):
236238
Returns:
237239
The results of the query formatted as nominated.
238240
"""
239-
cursor = self.execute(statement, async=False, template=False, **kwargs)
241+
async = kwargs.get('async', False)
242+
cursor = self.execute(statement, template=False, **kwargs)
243+
244+
def finish(cursor):
245+
if self._cursor_empty(cursor):
246+
return None
247+
# Some DBAPI2 cursor implementations error if attempting to extract
248+
# data from an empty cursor, and if so, we simply return None.
249+
formatter = self._get_formatter(format, cursor, **format_opts)
250+
return formatter.dump()
240251

241-
# Some DBAPI2 cursor implementations error if attempting to extract
242-
# data from an empty cursor, and if so, we simply return None.
243-
if self._cursor_empty(cursor):
244-
return None
252+
if not async:
253+
return finish(cursor)
245254

246-
formatter = self._get_formatter(format, cursor, **format_opts)
247-
return formatter.dump()
255+
from werkzeug.local import LocalProxy
256+
257+
class ResultProxy(LocalProxy):
258+
259+
def _get_future(self):
260+
return super(ResultProxy, self)._get_current_object()
261+
262+
def _get_current_object(self):
263+
future = self._get_future()
264+
if not future.done():
265+
raise Exception('query is not yet done')
266+
return future.result()
267+
268+
future = ThreadPoolExecutor(max_workers=1).submit(finish, cursor)
269+
return ResultProxy(lambda: future)
248270

249271
def stream(self, statement, format=None, format_opts={}, batch=None, **kwargs):
250272
"""

0 commit comments

Comments
 (0)