Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

%collect line magic for using inside loops #432

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
303 changes: 303 additions & 0 deletions examples/spark_collect_magic_example.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Sample usage of the %spark_collect line magic"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Load the magics:"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"%load_ext sparkmagic.magics"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create a new python session:"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "83ad58db89c449ee91d833bc4a39bc7a",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"A Jupyter Widget"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"ename": "AttributeError",
"evalue": "'NoneType' object has no attribute 'auth'",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mAttributeError\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m/home/edoardo/Documents/sparkmagic/hdijupyterutils/hdijupyterutils/ipywidgetfactory.pyc\u001b[0m in \u001b[0;36msubmit_clicked\u001b[0;34m(self, button)\u001b[0m\n\u001b[1;32m 63\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 64\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0msubmit_clicked\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mbutton\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 65\u001b[0;31m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mparent_widget\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrun\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
"\u001b[0;32m/home/edoardo/anaconda2/lib/python2.7/site-packages/sparkmagic/controllerwidget/createsessionwidget.pyc\u001b[0m in \u001b[0;36mrun\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 56\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 57\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 58\u001b[0;31m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mspark_controller\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0madd_session\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0malias\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mendpoint\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mskip\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mproperties\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 59\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mValueError\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 60\u001b[0m self.ipython_display.send_error(\"\"\"Could not add session with\n",
"\u001b[0;32m/home/edoardo/anaconda2/lib/python2.7/site-packages/sparkmagic/livyclientlib/sparkcontroller.pyc\u001b[0m in \u001b[0;36madd_session\u001b[0;34m(self, name, endpoint, skip_if_exists, properties)\u001b[0m\n\u001b[1;32m 81\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mlogger\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mdebug\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34mu\"Skipping {} because it already exists in list of sessions.\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mformat\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mname\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 82\u001b[0m \u001b[0;32mreturn\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 83\u001b[0;31m \u001b[0mhttp_client\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_http_client\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mendpoint\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 84\u001b[0m \u001b[0msession\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_livy_session\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mhttp_client\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mproperties\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mipython_display\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 85\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msession_manager\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0madd_session\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mname\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0msession\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/home/edoardo/anaconda2/lib/python2.7/site-packages/sparkmagic/livyclientlib/sparkcontroller.pyc\u001b[0m in \u001b[0;36m_http_client\u001b[0;34m(endpoint)\u001b[0m\n\u001b[1;32m 113\u001b[0m \u001b[0;34m@\u001b[0m\u001b[0mstaticmethod\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 114\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0m_http_client\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mendpoint\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 115\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mLivyReliableHttpClient\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mfrom_endpoint\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mendpoint\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
"\u001b[0;32m/home/edoardo/anaconda2/lib/python2.7/site-packages/sparkmagic/livyclientlib/livyreliablehttpclient.pyc\u001b[0m in \u001b[0;36mfrom_endpoint\u001b[0;34m(endpoint)\u001b[0m\n\u001b[1;32m 22\u001b[0m \u001b[0mheaders\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mupdate\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mconf\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcustom_headers\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 23\u001b[0m \u001b[0mretry_policy\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mLivyReliableHttpClient\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_get_retry_policy\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 24\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mLivyReliableHttpClient\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mReliableHttpClient\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mendpoint\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mheaders\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mretry_policy\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mendpoint\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 25\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 26\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mpost_statement\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0msession_id\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mdata\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/home/edoardo/anaconda2/lib/python2.7/site-packages/sparkmagic/livyclientlib/reliablehttpclient.pyc\u001b[0m in \u001b[0;36m__init__\u001b[0;34m(self, endpoint, headers, retry_policy)\u001b[0m\n\u001b[1;32m 21\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_headers\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mheaders\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 22\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_retry_policy\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mretry_policy\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 23\u001b[0;31m \u001b[0;32mif\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_endpoint\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mauth\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0mconstants\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mAUTH_KERBEROS\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 24\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_auth\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mHTTPKerberosAuth\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mmutual_authentication\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mREQUIRED\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 25\u001b[0m \u001b[0;32melif\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_endpoint\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mauth\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0mconstants\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mAUTH_BASIC\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;31mAttributeError\u001b[0m: 'NoneType' object has no attribute 'auth'"
]
}
],
"source": [
"%manage_spark"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create sample data in the remote spark cluster"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"%%spark\n",
"from pyspark.sql.functions import *\n",
"# make some test data\n",
"columns = ['id', 'dogs', 'cats']\n",
"vals = [\n",
" (1, 2, 0),\n",
" (2, 0, 1)\n",
"]\n",
"\n",
"vals2 = [\n",
" (1, 2, 0),\n",
" (3, 1, 2)\n",
"]\n",
"\n",
"# create DataFrame\n",
"df1 = spark.createDataFrame(vals, columns)\n",
"df2 = spark.createDataFrame(vals2, columns)\n",
"df1.registerTempTable(\"dfn\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now, we use the %spark_collect line magic to retrieve the dataframes. We do this by looping through the dataframe names."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"dfList = [\"df1\", \"df2\"]\n",
"for d in dfList:\n",
" %spark_collect -o $d -n -1"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"As we can see, the dataframes are now in the local namespace"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>id</th>\n",
" <th>dogs</th>\n",
" <th>cats</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>1</td>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" id dogs cats\n",
"0 1 2 0\n",
"1 2 0 1"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df1"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>id</th>\n",
" <th>dogs</th>\n",
" <th>cats</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>1</td>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>3</td>\n",
" <td>1</td>\n",
" <td>2</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" id dogs cats\n",
"0 1 2 0\n",
"1 3 1 2"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df2"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can also execute sql code within the %spark_collect line magic by specifying the \"-c sql\" option."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"%spark_collect -c sql -o df1 select * from dfn"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.14"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
36 changes: 36 additions & 0 deletions sparkmagic/sparkmagic/magics/remotesparkmagics.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,42 @@ def manage_spark(self, line, local_ns=None):
Then, create a session. You'll be able to select the session created from the %%spark magic."""
return self.manage_widget


@magic_arguments()
@line_magic
@needs_local_scope
@argument("-c", "--context", type=str, default=CONTEXT_NAME_SPARK,
help="Context to use: '{}' for spark and '{}' for sql queries. "
"Default is '{}'.".format(CONTEXT_NAME_SPARK, CONTEXT_NAME_SQL, CONTEXT_NAME_SPARK))
@argument("-o", "--output", type=str, default=None, help="If present, indicated variable will be stored in variable"
"of this name in user's local context.")
@argument("-m", "--samplemethod", type=str, default=None, help="Sample method for dataframe: either take or sample")
@argument("-n", "--maxrows", type=int, default=None, help="Maximum number of rows that will be pulled back "
"from the dataframe on the server for storing")
@argument("-r", "--samplefraction", type=float, default=None, help="Sample fraction for sampling from dataframe")
@argument("-e", "--coerce", type=str, default=None, help="Whether to automatically coerce the types (default, pass True if being explicit) "
"of the dataframe or not (pass False)")
@handle_expected_exceptions
def spark_collect(self, line, local_ns=None):
args = parse_argstring_or_throw(self.spark, line)
coerce = get_coerce_value(args.coerce)
if (len(args.command) > 0):
command = " ".join(args.command)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please follow the 4-space indentation convention?

else:
command = ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a new line after every code block, like after this if/else code block.

if args.context == CONTEXT_NAME_SPARK:
return self.execute_spark(command, args.output, args.samplemethod, args.maxrows, args.samplefraction, None, coerce)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a new line after every return call in this method


elif args.context == CONTEXT_NAME_SQL:
args.session = None
args.quiet = None
return self.execute_sqlquery(command, args.samplemethod, args.maxrows, args.samplefraction,
args.session, args.output, args.quiet, coerce)

else:
self.ipython_display.send_error("Context '{}' not found".format(args.context))


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@magic_arguments()
@argument("-c", "--context", type=str, default=CONTEXT_NAME_SPARK,
help="Context to use: '{}' for spark and '{}' for sql queries. "
Expand Down
Loading