Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Remote python API for REST API & Websockets #46

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions appyter/parse/nb.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import nbformat as nbf

def nb_from_ipynb_string(string):
Expand All @@ -7,6 +8,9 @@ def nb_from_ipynb_file(filename):
with open(filename, 'r') as fr:
return nbf.read(fr, as_version=4)

def nb_from_json(j):
return nb_from_ipynb_string(json.dumps(j))

def nb_to_ipynb_string(nb):
return nbf.writes(nb)

Expand All @@ -16,3 +20,6 @@ def nb_to_ipynb_io(nb, io):
def nb_to_ipynb_file(nb, filename):
with open(filename, 'w') as fw:
nb_to_ipynb_io(nb, fw)

def nb_to_json(nb):
return json.loads(nb_to_ipynb_string(nb))
5 changes: 5 additions & 0 deletions appyter/remote/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
''' Remote interaction with an appyter
'''
import os
from appyter.util import importdir
importdir(os.path.join(os.path.dirname(__file__)), __package__, globals())
7 changes: 7 additions & 0 deletions appyter/remote/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import click

from appyter.cli import cli

@cli.group(help='Interact with a remote appyter')
def remote():
pass
223 changes: 223 additions & 0 deletions appyter/remote/nbevalutate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
import os
import sys
import json
import click
import asyncio
import socketio
import requests

from appyter.remote.cli import remote
from appyter.util import join_routes, is_remote

def read_chunks(fr, chunk_size=1024*100):
''' Read a file in chunks generating the chunks
'''
while True:
data = fr.read(chunk_size)
if not data:
break
yield data

async def get_queue_assert(input_msg_queue, **conditions):
response = await input_msg_queue.get()
assert response['type'] not in {'error', 'connection_error', 'disconnect'}, response['data']
for key, value in conditions.items():
assert response[key] == value, f"Failure, expected {conditions} got {response}"
return response

async def do_remote_download(sio, input_msg_queue, output_msg_queue, data):
''' Handle submitting a download, and watching it complete on remote.
'''
await sio.emit('download_start', data)
msg = await get_queue_assert(input_msg_queue, type='download_queued')
await output_msg_queue.put(msg)
while True:
msg = await input_msg_queue.get()
await output_msg_queue.put(msg)
if msg['type'] == 'download_complete':
break
assert msg['type'] == 'download_progress', f"Failure, expected `download_progress` got {msg}"

async def do_remote_upload(sio, input_msg_queue, output_msg_queue, data):
''' Handle upload to remote.
'''
with open(data['name'], 'rb') as fr:
# request upload
await sio.emit('siofu_start', data)
# wait for server to be ready
msg = await input_msg_queue.get()
await output_msg_queue.put(msg)
assert msg['type'] == 'siofu_ready' and msg['data']['id'] == data['id'], f"Error: received {msg}"
# read file in chunks
for chunk in read_chunks(fr):
await sio.emit('siofu_progress', dict(id=data['id'], content=chunk))
msg = await input_msg_queue.get()
await output_msg_queue.put(msg)
assert msg['type'] == 'siofu_chunk' and msg['data']['id'] == data['id'], f"Error: received {msg}"
#
await sio.emit('siofu_done', data)
msg = await input_msg_queue.get()
await output_msg_queue.put(msg)
assert msg['type'] == 'siofu_complete' and msg['data']['id'] == data['id'], f"Error: received {msg}"

async def remote_message_producer(sio, url, input_msg_queue):
''' Listen for events and put them on a async queue
'''
# Step 1. Capture messages from remote onto a queue for processing
# socketio
@sio.on('connect')
async def _():
await input_msg_queue.put(dict(type='connect', data=''))
@sio.on('connect_error')
async def _(data):
await input_msg_queue.put(dict(type='connect_error', data=data))
@sio.on('disconnect')
async def _():
await input_msg_queue.put(dict(type='disconnect', data=''))
# api
@sio.on('session')
async def _(data):
await input_msg_queue.put(dict(type='session', data=data))
# notebook generation
@sio.on('status')
async def _(data):
await input_msg_queue.put(dict(type='status', data=data))
@sio.on('progress')
async def _(data):
await input_msg_queue.put(dict(type='progress', data=data))
@sio.on('nb')
async def _(data):
await input_msg_queue.put(dict(type='nb', data=data))
@sio.on('cell')
async def _(data):
await input_msg_queue.put(dict(type='cell', data=data))
@sio.on('error')
async def _(data):
await input_msg_queue.put(dict(type='error', data=data))
# remote download
@sio.on('download_start')
async def _(data):
await input_msg_queue.put(dict(type='download_start', data=data))
@sio.on('download_queued')
async def _(data):
await input_msg_queue.put(dict(type='download_queued', data=data))
@sio.on('download_complete')
async def _(data):
await input_msg_queue.put(dict(type='download_complete', data=data))
@sio.on('download_error')
async def _(data):
await input_msg_queue.put(dict(type='download_error', data=data))
# remote upload
@sio.on('siofu_ready')
async def _(data):
await input_msg_queue.put(dict(type='siofu_ready', data=data))
@sio.on('siofu_chunk')
async def _(data):
await input_msg_queue.put(dict(type='siofu_chunk', data=data))
@sio.on('siofu_complete')
async def _(data):
await input_msg_queue.put(dict(type='siofu_complete', data=data))
@sio.on('siofu_error')
async def _(data):
await input_msg_queue.put(dict(type='siofu_error', data=data))
#
# Step 2. Establish a connection with remote
await sio.connect(url)
# wait until disconnect (pushing any received events onto the asyncio queue)
await sio.wait()

async def evaluate_saga(sio, url, context, params, output, input_msg_queue, output_msg_queue):
''' Consume events from remote when necessary
'''
try:
# Ensure we connected properly
response = await get_queue_assert(input_msg_queue, type='connect')
# Step 3. Request session on remote
await sio.emit('session', {})
response = await get_queue_assert(input_msg_queue, type='session')
session = response['data']
# Step 4. Get file(s) onto remote
for param in params:
if param['field'] != 'FileField':
continue
if param['args']['name'] in context:
context_value = context[param['args']['name']]
if is_remote(context_value):
# have remote download the file
await do_remote_download(sio, input_msg_queue, output_msg_queue, dict(session, name=param['args']['name'], url=context_value, filename=os.path.basename(context_value)))
else:
# upload the file
await do_remote_upload(sio, input_msg_queue, output_msg_queue, dict(session, id=0, name=context_value))
else:
await output_msg_queue.put({ 'type': 'warning', 'data': f"Missing file for `{param['args']['name']}`" })
# Step 5. Create notebook on remote
await sio.emit('submit', dict(session, **context))
# Step 6. Start execution on remote
await sio.emit('init', session)
# Process results
while True:
response = await get_queue_assert(input_msg_queue)
await output_msg_queue.put(response)
if response['type'] == 'status' and response['data'] == 'Success':
# Step 7. Grab final notebook from remote
# NOTE: not async but doesn't really matter here..
response = requests.get(join_routes(url, session['_session'])[1:], headers={'Accept': 'application/json'})
assert response.status_code <= 299, f"Error ({response.status_code}): {response.text}"
json.dump(response.json(), output)
break
except Exception as e:
await output_msg_queue.put(dict(type='error', data=str(e)))
#
# Step 8. Disconnect from server
await sio.disconnect()
await output_msg_queue.put(None)
#
async def output_msg_consumer(output_msg_queue, emit):
while True:
msg = await output_msg_queue.get()
if msg is None:
break
print(json.dumps(msg), file=emit)

@remote.command(help='Construct and execute the appyter provided parameters')
@click.option(
'--context',
envvar='CONTEXT',
default='-',
type=click.Path(readable=True, dir_okay=False, allow_dash=True),
help='JSON serialized context mapping field names to values (- for stdin)',
)
@click.option(
'--emit',
envvar='EMIT',
default='-',
type=click.Path(writable=True, dir_okay=False, allow_dash=True),
help='The output location of the progress stream (- for stderr)',
)
@click.option(
'--output',
envvar='OUTPUT',
default='-',
type=click.Path(writable=True, dir_okay=False, allow_dash=True),
help='The output location of the inspection json (- for stdout)',
)
@click.argument('url', envvar='URL')
def nbevaluate(url, context, emit, output):
context = json.load(sys.stdin if context == '-' else open(context, 'r'))
emit = sys.stderr if emit == '-' else open(emit, 'w')
output = sys.stdout if output == '-' else open(output, 'w')
# Step 1. Inspect remote ensuring it's up and grabbing notebook parameters
response = requests.get(url, headers={'Accept': 'application/json'})
assert response.status_code <= 299, f"Error ({response.status_code}): {response.text}"
params = response.json()
# Execute async saga for evaluation
sio = socketio.AsyncClient()
loop = asyncio.get_event_loop()
input_msg_queue = asyncio.Queue(loop=loop)
output_msg_queue = asyncio.Queue(loop=loop)
loop.run_until_complete(asyncio.gather(
remote_message_producer(sio, join_routes(url, 'socket.io')[1:], input_msg_queue),
evaluate_saga(sio, url, context, params, output, input_msg_queue, output_msg_queue),
output_msg_consumer(output_msg_queue, emit),
))
loop.close()
12 changes: 12 additions & 0 deletions appyter/remote/nbinspect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import click
import requests

from appyter.remote.cli import remote

@remote.command(help='Inspect appyter for arguments (fields)')
@click.option('--output', envvar='OUTPUT', default='-', type=click.File('w'), help='The output location of the inspection json')
@click.argument('url', envvar='URL')
def nbinspect(url, output):
response = requests.get(url, headers={'Accept': 'application/json'})
assert response.status_code <= 299, f"Error ({response.status_code}): {response.text}"
json.dump(response.json(), output)
30 changes: 30 additions & 0 deletions appyter/remote/nbviewer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import sys
import json
import tempfile
from subprocess import Popen, PIPE

from appyter.context import get_env, get_jinja2_env
from appyter.render.nbviewer import render_nb_from_stream

def jupyter_inline_evaluate(url, context):
def stream_generator():
with tempfile.NamedTemporaryFile('w') as tmp:
tmp.close()
with open(tmp.name, 'w') as fw:
json.dump(context, fw)

with Popen([
sys.executable,
'-u',
'-m', 'appyter',
'remote', 'nbevaluate',
f"--context={tmp.name}",
url,
], stdout=PIPE, stderr=PIPE) as proc:
packet = proc.stderr.readline()
while packet:
yield packet.decode()
packet = proc.stderr.readline()

env = get_jinja2_env(get_env(**dict(ipynb='app.ipynb')))
render_nb_from_stream(env, stream_generator())
3 changes: 2 additions & 1 deletion appyter/render/nbexecute.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from appyter.cli import cli
from appyter.ext.nbclient import NotebookClientIOPubHook
from appyter.render.nbviewer import render_nbviewer_from_nb
from appyter.parse.nb import nb_from_ipynb_file, nb_to_ipynb_file
from appyter.parse.nb import nb_from_ipynb_file, nb_to_ipynb_file, nb_to_json


def cell_is_code(cell):
Expand Down Expand Up @@ -48,6 +48,7 @@ def nbexecute(ipynb='', emit=json_emitter, cwd=''):
resources={ 'metadata': {'path': cwd} },
iopub_hook=iopub_hook_factory(nb, emit),
)
emit({ 'type': 'nb', 'data': nb_to_json(nb) })
with client.setup_kernel():
n_cells = len(nb.cells)
emit({ 'type': 'status', 'data': 'Executing...' })
Expand Down
60 changes: 59 additions & 1 deletion appyter/render/nbviewer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import nbconvert
from bs4 import BeautifulSoup

def render_nbviewer_from_nb(env, nb):
def render_nbviewer_from_nb_soup(env, nb):
''' Render an nbviewer (HTML serialization of the notebook)
'''
exporter = nbconvert.HTMLExporter()
Expand All @@ -15,7 +15,65 @@ def render_nbviewer_from_nb(env, nb):
soup.find('script').decompose()
# soup.find('style').decompose() # remove first style (bootstrap)
soup.find('link').decompose() # remove link to custom stylesheet
return soup

def render_nbviewer_cell_from_nb(env, nb, cell_index):
''' Render a single cell from a notebook

TODO: Surely this can be more efficient..
'''
exporter = nbconvert.HTMLExporter()
export, _ = exporter.from_notebook_node(nb)
soup = BeautifulSoup(export, 'html.parser')
return soup.select('.cell')[cell_index]

def render_nbviewer_from_nb(env, nb):
''' Render an nbviewer (HTML serialization of the notebook)
'''
soup = render_nbviewer_from_nb_soup(env, nb)
nb_container = soup.select('#notebook-container')[0]
nb_container['class'] = ''
nb_container['id'] = ''
return str(soup)

def render_nb_from_stream(env, stream):
''' Render a jupyter notebook and update it *in* a jupyter notebook from an nbexecute progress stream.

:param nb: (ipynb) The loaded jupyter notebook
:param stream: (generator) The stream of messages coming from nbexecute.
'''
import json
import uuid
from IPython.display import display, update_display, HTML
from appyter.util import try_json_loads
from appyter.parse.nb import nb_from_json
id = '_' + str(uuid.uuid4())
nb = None
display(HTML('Starting...'), display_id=id+'_status')
for msg in stream:
msg = try_json_loads(msg)
if type(msg) == dict:
if nb is None and msg['type'] == 'nb':
# received the constructed notebook parse and render it
nb = msg['data']
nb_html = render_nbviewer_from_nb_soup(env, nb_from_json(nb))
nb_html.select('#notebook-container')[0]['id'] = '#' + id
# show each cell separately with an id based on the index
for cell_index, cell in enumerate(nb_html.select('.cell')):
display(HTML(str(cell)), display_id=id+'_%d' % (cell_index))
elif nb is not None and msg['type'] == 'cell':
cell, cell_index = msg['data']
# when a cell updates, we'll update the notebook and update the cell display
nb['cells'][cell_index] = cell
cell_rendered = str(render_nbviewer_cell_from_nb(env, nb_from_json(nb), cell_index))
update_display(HTML(cell_rendered), display_id=id+'_%d' % (cell_index))
elif msg['type'] == 'status':
update_display(HTML(str(msg['data'])), display_id=id+'_status')
elif msg['type'] == 'error':
update_display(HTML('Error'), display_id=id+'_status')
raise Exception(msg['data'])
else:
update_display(HTML(str(msg)), display_id=id+'_status')
update_display(HTML(''), display_id=id+'_status')
#
return nb
Loading