-
Notifications
You must be signed in to change notification settings - Fork 448
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
%collect line magic for using inside loops #432
base: master
Are you sure you want to change the base?
Changes from 6 commits
78025ab
2d04c51
a24c615
04ca3ec
74faded
3fc6183
0cfba59
101d27b
3768547
51b0c15
3187f1f
e65afd7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,329 @@ | ||
{ | ||
"cells": [ | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"# Sample usage of the %collect line magic" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Load the magics:" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 1, | ||
"metadata": { | ||
"collapsed": true | ||
}, | ||
"outputs": [], | ||
"source": [ | ||
"%load_ext sparkmagic.magics" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Create a new python session:" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 2, | ||
"metadata": {}, | ||
"outputs": [ | ||
{ | ||
"data": { | ||
"application/vnd.jupyter.widget-view+json": { | ||
"model_id": "3015273a12494389a3497888640c3cfc", | ||
"version_major": 2, | ||
"version_minor": 0 | ||
}, | ||
"text/html": [ | ||
"<p>Failed to display Jupyter Widget of type <code>MagicsControllerWidget</code>.</p>\n", | ||
"<p>\n", | ||
" If you're reading this message in the Jupyter Notebook or JupyterLab Notebook, it may mean\n", | ||
" that the widgets JavaScript is still loading. If this message persists, it\n", | ||
" likely means that the widgets JavaScript library is either not installed or\n", | ||
" not enabled. See the <a href=\"https://ipywidgets.readthedocs.io/en/stable/user_install.html\">Jupyter\n", | ||
" Widgets Documentation</a> for setup instructions.\n", | ||
"</p>\n", | ||
"<p>\n", | ||
" If you're reading this message in another frontend (for example, a static\n", | ||
" rendering on GitHub or <a href=\"https://nbviewer.jupyter.org/\">NBViewer</a>),\n", | ||
" it may mean that your frontend doesn't currently support widgets.\n", | ||
"</p>\n" | ||
], | ||
"text/plain": [ | ||
"MagicsControllerWidget(children=(Tab(children=(ManageSessionWidget(children=(HTML(value='<br/>'), HTML(value='No sessions yet.'))), CreateSessionWidget(children=(HTML(value='<br/>'), Dropdown(description='Endpoint:', options={'http://spark:8998': <sparkmagic.livyclientlib.endpoint.Endpoint object at 0x7f19a52a7f60>}, value=<sparkmagic.livyclientlib.endpoint.Endpoint object at 0x7f19a52a7f60>), Text(value='session-name', description='Name:'), ToggleButtons(description='Language:', options=('scala', 'python', 'python3'), value='scala'), Text(value='{\"driverMemory\": \"1000M\", \"executorCores\": 2}', description='Properties:'), HTML(value='<br/>'), SubmitButton(description='Create Session', style=ButtonStyle()))), AddEndpointWidget(children=(HTML(value='<br/>'), Text(value='http://example.com/livy', description='Address:'), Dropdown(description='Auth type:', options={'Kerberos': 'Kerberos', 'Basic_Access': 'Basic_Access', 'None': 'None'}, value='Kerberos'), Text(value='username', description='Username:', layout=Layout(display='none')), Text(value='password', description='Password:', layout=Layout(display='none')), HTML(value='<br/>'), SubmitButton(description='Add endpoint', style=ButtonStyle()))), ManageEndpointWidget(children=(HTML(value='<br/>'), HTML(value='Endpoint'), VBox(children=(HTML(value='<hr/>'), HBox(children=(VBox(children=(HTML(value='No sessions on this endpoint.'), HBox(children=(Text(value='0', description='Session to delete:'), Button(description='Delete', style=ButtonStyle()))))), Button(description='Clean Up', style=ButtonStyle()), Button(description='Remove', style=ButtonStyle()))))), HTML(value='<br/>')))), _titles={'0': 'Manage Sessions', '1': 'Create Session', '2': 'Add Endpoint', '3': 'Manage Endpoints'}),))" | ||
] | ||
}, | ||
"metadata": {}, | ||
"output_type": "display_data" | ||
}, | ||
{ | ||
"name": "stdout", | ||
"output_type": "stream", | ||
"text": [ | ||
"Starting Spark application\n" | ||
] | ||
}, | ||
{ | ||
"data": { | ||
"text/html": [ | ||
"<table>\n", | ||
"<tr><th>ID</th><th>YARN Application ID</th><th>Kind</th><th>State</th><th>Spark UI</th><th>Driver log</th><th>Current session?</th></tr><tr><td>0</td><td>None</td><td>pyspark</td><td>idle</td><td></td><td></td><td>✔</td></tr></table>" | ||
], | ||
"text/plain": [ | ||
"<IPython.core.display.HTML object>" | ||
] | ||
}, | ||
"metadata": {}, | ||
"output_type": "display_data" | ||
}, | ||
{ | ||
"name": "stdout", | ||
"output_type": "stream", | ||
"text": [ | ||
"SparkSession available as 'spark'.\n" | ||
] | ||
} | ||
], | ||
"source": [ | ||
"%manage_spark" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Create sample data in the remote spark cluster" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 3, | ||
"metadata": { | ||
"collapsed": true | ||
}, | ||
"outputs": [], | ||
"source": [ | ||
"%%spark\n", | ||
"from pyspark.sql.functions import *\n", | ||
"# make some test data\n", | ||
"columns = ['id', 'dogs', 'cats']\n", | ||
"vals = [\n", | ||
" (1, 2, 0),\n", | ||
" (2, 0, 1)\n", | ||
"]\n", | ||
"\n", | ||
"vals2 = [\n", | ||
" (1, 2, 0),\n", | ||
" (3, 1, 2)\n", | ||
"]\n", | ||
"\n", | ||
"# create DataFrame\n", | ||
"df1 = spark.createDataFrame(vals, columns)\n", | ||
"df2 = spark.createDataFrame(vals2, columns)\n", | ||
"df1.registerTempTable(\"dfn\")" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Now, we use the %collect line magic to retrieve the dataframes. We do this by looping through the dataframe names." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 5, | ||
"metadata": { | ||
"collapsed": true | ||
}, | ||
"outputs": [], | ||
"source": [ | ||
"dfList = [\"df1\", \"df2\"]\n", | ||
"for d in dfList:\n", | ||
" %collect -o $d -n -1" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"As we can see, the dataframes are now in the local namespace" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 6, | ||
"metadata": {}, | ||
"outputs": [ | ||
{ | ||
"data": { | ||
"text/html": [ | ||
"<div>\n", | ||
"<style scoped>\n", | ||
" .dataframe tbody tr th:only-of-type {\n", | ||
" vertical-align: middle;\n", | ||
" }\n", | ||
"\n", | ||
" .dataframe tbody tr th {\n", | ||
" vertical-align: top;\n", | ||
" }\n", | ||
"\n", | ||
" .dataframe thead th {\n", | ||
" text-align: right;\n", | ||
" }\n", | ||
"</style>\n", | ||
"<table border=\"1\" class=\"dataframe\">\n", | ||
" <thead>\n", | ||
" <tr style=\"text-align: right;\">\n", | ||
" <th></th>\n", | ||
" <th>id</th>\n", | ||
" <th>dogs</th>\n", | ||
" <th>cats</th>\n", | ||
" </tr>\n", | ||
" </thead>\n", | ||
" <tbody>\n", | ||
" <tr>\n", | ||
" <th>0</th>\n", | ||
" <td>1</td>\n", | ||
" <td>2</td>\n", | ||
" <td>0</td>\n", | ||
" </tr>\n", | ||
" <tr>\n", | ||
" <th>1</th>\n", | ||
" <td>2</td>\n", | ||
" <td>0</td>\n", | ||
" <td>1</td>\n", | ||
" </tr>\n", | ||
" </tbody>\n", | ||
"</table>\n", | ||
"</div>" | ||
], | ||
"text/plain": [ | ||
" id dogs cats\n", | ||
"0 1 2 0\n", | ||
"1 2 0 1" | ||
] | ||
}, | ||
"execution_count": 6, | ||
"metadata": {}, | ||
"output_type": "execute_result" | ||
} | ||
], | ||
"source": [ | ||
"df1" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 7, | ||
"metadata": {}, | ||
"outputs": [ | ||
{ | ||
"data": { | ||
"text/html": [ | ||
"<div>\n", | ||
"<style scoped>\n", | ||
" .dataframe tbody tr th:only-of-type {\n", | ||
" vertical-align: middle;\n", | ||
" }\n", | ||
"\n", | ||
" .dataframe tbody tr th {\n", | ||
" vertical-align: top;\n", | ||
" }\n", | ||
"\n", | ||
" .dataframe thead th {\n", | ||
" text-align: right;\n", | ||
" }\n", | ||
"</style>\n", | ||
"<table border=\"1\" class=\"dataframe\">\n", | ||
" <thead>\n", | ||
" <tr style=\"text-align: right;\">\n", | ||
" <th></th>\n", | ||
" <th>id</th>\n", | ||
" <th>dogs</th>\n", | ||
" <th>cats</th>\n", | ||
" </tr>\n", | ||
" </thead>\n", | ||
" <tbody>\n", | ||
" <tr>\n", | ||
" <th>0</th>\n", | ||
" <td>1</td>\n", | ||
" <td>2</td>\n", | ||
" <td>0</td>\n", | ||
" </tr>\n", | ||
" <tr>\n", | ||
" <th>1</th>\n", | ||
" <td>3</td>\n", | ||
" <td>1</td>\n", | ||
" <td>2</td>\n", | ||
" </tr>\n", | ||
" </tbody>\n", | ||
"</table>\n", | ||
"</div>" | ||
], | ||
"text/plain": [ | ||
" id dogs cats\n", | ||
"0 1 2 0\n", | ||
"1 3 1 2" | ||
] | ||
}, | ||
"execution_count": 7, | ||
"metadata": {}, | ||
"output_type": "execute_result" | ||
} | ||
], | ||
"source": [ | ||
"df2" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"We can also execute sql code within the %collect line magic by specifying the \"-c sql\" option." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": { | ||
"collapsed": true | ||
}, | ||
"outputs": [], | ||
"source": [ | ||
"%collect -c sql -o df1 select * from dfn" | ||
] | ||
} | ||
], | ||
"metadata": { | ||
"kernelspec": { | ||
"display_name": "Python 2", | ||
"language": "python", | ||
"name": "python2" | ||
}, | ||
"language_info": { | ||
"codemirror_mode": { | ||
"name": "ipython", | ||
"version": 2 | ||
}, | ||
"file_extension": ".py", | ||
"mimetype": "text/x-python", | ||
"name": "python", | ||
"nbconvert_exporter": "python", | ||
"pygments_lexer": "ipython2", | ||
"version": "2.7.14" | ||
} | ||
}, | ||
"nbformat": 4, | ||
"nbformat_minor": 2 | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,6 +38,42 @@ def manage_spark(self, line, local_ns=None): | |
Then, create a session. You'll be able to select the session created from the %%spark magic.""" | ||
return self.manage_widget | ||
|
||
|
||
@magic_arguments() | ||
@line_magic | ||
@needs_local_scope | ||
@argument("-c", "--context", type=str, default=CONTEXT_NAME_SPARK, | ||
help="Context to use: '{}' for spark and '{}' for sql queries. " | ||
"Default is '{}'.".format(CONTEXT_NAME_SPARK, CONTEXT_NAME_SQL, CONTEXT_NAME_SPARK)) | ||
@argument("-o", "--output", type=str, default=None, help="If present, indicated variable will be stored in variable" | ||
"of this name in user's local context.") | ||
@argument("-m", "--samplemethod", type=str, default=None, help="Sample method for dataframe: either take or sample") | ||
@argument("-n", "--maxrows", type=int, default=None, help="Maximum number of rows that will be pulled back " | ||
"from the dataframe on the server for storing") | ||
@argument("-r", "--samplefraction", type=float, default=None, help="Sample fraction for sampling from dataframe") | ||
@argument("-e", "--coerce", type=str, default=None, help="Whether to automatically coerce the types (default, pass True if being explicit) " | ||
"of the dataframe or not (pass False)") | ||
@handle_expected_exceptions | ||
def spark_collect(self, line, local_ns=None): | ||
args = parse_argstring_or_throw(self.spark, line) | ||
coerce = get_coerce_value(args.coerce) | ||
if (len(args.command) > 0): | ||
command = " ".join(args.command) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you please follow the 4-space indentation convention? |
||
else: | ||
command = "" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a new line after every code block, like after this if/else code block. |
||
if args.context == CONTEXT_NAME_SPARK: | ||
return self.execute_spark(command, args.output, args.samplemethod, args.maxrows, args.samplefraction, None, coerce) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a new line after every return call in this method |
||
|
||
elif args.context == CONTEXT_NAME_SQL: | ||
args.session = None | ||
args.quiet = None | ||
return self.execute_sqlquery(command, args.samplemethod, args.maxrows, args.samplefraction, | ||
args.session, args.output, args.quiet, coerce) | ||
|
||
else: | ||
self.ipython_display.send_error("Context '{}' not found".format(args.context)) | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you please add unit tests in the style of: |
||
@magic_arguments() | ||
@argument("-c", "--context", type=str, default=CONTEXT_NAME_SPARK, | ||
help="Context to use: '{}' for spark and '{}' for sql queries. " | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is amazing! Way to go!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! :)