Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

%collect line magic for using inside loops #432

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
329 changes: 329 additions & 0 deletions examples/CollectMagic_example.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,329 @@
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is amazing! Way to go!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! :)

"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Sample usage of the %collect line magic"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Load the magics:"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"%load_ext sparkmagic.magics"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create a new python session:"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "3015273a12494389a3497888640c3cfc",
"version_major": 2,
"version_minor": 0
},
"text/html": [
"<p>Failed to display Jupyter Widget of type <code>MagicsControllerWidget</code>.</p>\n",
"<p>\n",
" If you're reading this message in the Jupyter Notebook or JupyterLab Notebook, it may mean\n",
" that the widgets JavaScript is still loading. If this message persists, it\n",
" likely means that the widgets JavaScript library is either not installed or\n",
" not enabled. See the <a href=\"https://ipywidgets.readthedocs.io/en/stable/user_install.html\">Jupyter\n",
" Widgets Documentation</a> for setup instructions.\n",
"</p>\n",
"<p>\n",
" If you're reading this message in another frontend (for example, a static\n",
" rendering on GitHub or <a href=\"https://nbviewer.jupyter.org/\">NBViewer</a>),\n",
" it may mean that your frontend doesn't currently support widgets.\n",
"</p>\n"
],
"text/plain": [
"MagicsControllerWidget(children=(Tab(children=(ManageSessionWidget(children=(HTML(value='<br/>'), HTML(value='No sessions yet.'))), CreateSessionWidget(children=(HTML(value='<br/>'), Dropdown(description='Endpoint:', options={'http://spark:8998': <sparkmagic.livyclientlib.endpoint.Endpoint object at 0x7f19a52a7f60>}, value=<sparkmagic.livyclientlib.endpoint.Endpoint object at 0x7f19a52a7f60>), Text(value='session-name', description='Name:'), ToggleButtons(description='Language:', options=('scala', 'python', 'python3'), value='scala'), Text(value='{\"driverMemory\": \"1000M\", \"executorCores\": 2}', description='Properties:'), HTML(value='<br/>'), SubmitButton(description='Create Session', style=ButtonStyle()))), AddEndpointWidget(children=(HTML(value='<br/>'), Text(value='http://example.com/livy', description='Address:'), Dropdown(description='Auth type:', options={'Kerberos': 'Kerberos', 'Basic_Access': 'Basic_Access', 'None': 'None'}, value='Kerberos'), Text(value='username', description='Username:', layout=Layout(display='none')), Text(value='password', description='Password:', layout=Layout(display='none')), HTML(value='<br/>'), SubmitButton(description='Add endpoint', style=ButtonStyle()))), ManageEndpointWidget(children=(HTML(value='<br/>'), HTML(value='Endpoint'), VBox(children=(HTML(value='<hr/>'), HBox(children=(VBox(children=(HTML(value='No sessions on this endpoint.'), HBox(children=(Text(value='0', description='Session to delete:'), Button(description='Delete', style=ButtonStyle()))))), Button(description='Clean Up', style=ButtonStyle()), Button(description='Remove', style=ButtonStyle()))))), HTML(value='<br/>')))), _titles={'0': 'Manage Sessions', '1': 'Create Session', '2': 'Add Endpoint', '3': 'Manage Endpoints'}),))"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Starting Spark application\n"
]
},
{
"data": {
"text/html": [
"<table>\n",
"<tr><th>ID</th><th>YARN Application ID</th><th>Kind</th><th>State</th><th>Spark UI</th><th>Driver log</th><th>Current session?</th></tr><tr><td>0</td><td>None</td><td>pyspark</td><td>idle</td><td></td><td></td><td>✔</td></tr></table>"
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"SparkSession available as 'spark'.\n"
]
}
],
"source": [
"%manage_spark"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create sample data in the remote spark cluster"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"%%spark\n",
"from pyspark.sql.functions import *\n",
"# make some test data\n",
"columns = ['id', 'dogs', 'cats']\n",
"vals = [\n",
" (1, 2, 0),\n",
" (2, 0, 1)\n",
"]\n",
"\n",
"vals2 = [\n",
" (1, 2, 0),\n",
" (3, 1, 2)\n",
"]\n",
"\n",
"# create DataFrame\n",
"df1 = spark.createDataFrame(vals, columns)\n",
"df2 = spark.createDataFrame(vals2, columns)\n",
"df1.registerTempTable(\"dfn\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now, we use the %collect line magic to retrieve the dataframes. We do this by looping through the dataframe names."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"dfList = [\"df1\", \"df2\"]\n",
"for d in dfList:\n",
" %collect -o $d -n -1"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"As we can see, the dataframes are now in the local namespace"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>id</th>\n",
" <th>dogs</th>\n",
" <th>cats</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>1</td>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" id dogs cats\n",
"0 1 2 0\n",
"1 2 0 1"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df1"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>id</th>\n",
" <th>dogs</th>\n",
" <th>cats</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>1</td>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>3</td>\n",
" <td>1</td>\n",
" <td>2</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" id dogs cats\n",
"0 1 2 0\n",
"1 3 1 2"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df2"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can also execute sql code within the %collect line magic by specifying the \"-c sql\" option."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"%collect -c sql -o df1 select * from dfn"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.14"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
34 changes: 34 additions & 0 deletions sparkmagic/sparkmagic/magics/remotesparkmagics.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,40 @@ def manage_spark(self, line, local_ns=None):
Then, create a session. You'll be able to select the session created from the %%spark magic."""
return self.manage_widget


@magic_arguments()
@line_magic
@needs_local_scope
@argument("-c", "--context", type=str, default=CONTEXT_NAME_SPARK,
help="Context to use: '{}' for spark and '{}' for sql queries. "
"Default is '{}'.".format(CONTEXT_NAME_SPARK, CONTEXT_NAME_SQL, CONTEXT_NAME_SPARK))
@argument("-o", "--output", type=str, default=None, help="If present, indicated variable will be stored in variable"
"of this name in user's local context.")
@argument("-m", "--samplemethod", type=str, default=None, help="Sample method for dataframe: either take or sample")
@argument("-n", "--maxrows", type=int, default=None, help="Maximum number of rows that will be pulled back "
"from the dataframe on the server for storing")
@argument("-r", "--samplefraction", type=float, default=None, help="Sample fraction for sampling from dataframe")
@argument("-e", "--coerce", type=str, default=None, help="Whether to automatically coerce the types (default, pass True if being explicit) "
"of the dataframe or not (pass False)")
@handle_expected_exceptions
def collect(self, line, local_ns=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call it spark_collect?

args = parse_argstring_or_throw(self.spark, line)
coerce = get_coerce_value(args.coerce)
if (len(args.command) > 0):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would args.command be > 0? Sorry if I'm missing something super obvious.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand, args.command is the part of the line that does not contain arguments. This part can be empty (for instance if I do %spark_collect -o my_dataframe), or not (%spark_collect -c sql -o my_df SELECT * from my_table).

If it is empty, the join command would throw an error, so this is a way to get around it, but maybe there's a better way...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused... where is args.command coming from? I thought that for you to be able to do that, you needed to define an @argument called command, and I don't see it there. Am I missing something?

command = " ".join(args.command)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please follow the 4-space indentation convention?

else:
command = ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a new line after every code block, like after this if/else code block.

if args.context == CONTEXT_NAME_SPARK:
return self.execute_spark(command, args.output, args.samplemethod, args.maxrows, args.samplefraction, None, coerce)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a new line after every return call in this method

elif args.context == CONTEXT_NAME_SQL:
args.session = None
args.quiet = None
return self.execute_sqlquery(command, args.samplemethod, args.maxrows, args.samplefraction,
args.session, args.output, args.quiet, coerce)
else:
self.ipython_display.send_error("Context '{}' not found".format(args.context))


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@magic_arguments()
@argument("-c", "--context", type=str, default=CONTEXT_NAME_SPARK,
help="Context to use: '{}' for spark and '{}' for sql queries. "
Expand Down