Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Executing cell in specific language #598

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions sparkmagic/sparkmagic/kernels/kernelmagics.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from sparkmagic.utils import constants
from sparkmagic.utils.utils import parse_argstring_or_throw, get_coerce_value
from sparkmagic.utils.sparkevents import SparkEvents
from sparkmagic.utils.constants import LANGS_SUPPORTED
from sparkmagic.utils.constants import LANGS_SUPPORTED, LANG_PYTHON, LANG_R, LANG_SCALA
from sparkmagic.livyclientlib.command import Command
from sparkmagic.livyclientlib.endpoint import Endpoint
from sparkmagic.magics.sparkmagicsbase import SparkMagicBase
Expand Down Expand Up @@ -248,6 +248,8 @@ def configure(self, line, cell="", local_ns=None):
@needs_local_scope
@argument("-o", "--output", type=str, default=None, help="If present, indicated variable will be stored in variable"
"of this name in user's local context.")
@argument("-l", "--language", type=str, default=None,
help="Language for command; one of {}".format(', '.join([LANG_PYTHON, LANG_SCALA, LANG_R])))
@argument("-m", "--samplemethod", type=str, default=None, help="Sample method for dataframe: either take or sample")
@argument("-n", "--maxrows", type=int, default=None, help="Maximum number of rows that will be pulled back "
"from the dataframe on the server for storing")
Expand All @@ -262,7 +264,7 @@ def spark(self, line, cell="", local_ns=None):

coerce = get_coerce_value(args.coerce)

self.execute_spark(cell, args.output, args.samplemethod, args.maxrows, args.samplefraction, None, coerce)
self.execute_spark(cell, args.language, args.output, args.samplemethod, args.maxrows, args.samplefraction, None, coerce)
else:
return

Expand Down
14 changes: 10 additions & 4 deletions sparkmagic/sparkmagic/livyclientlib/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
from sparkmagic.utils.sparkevents import SparkEvents
from sparkmagic.utils.constants import MAGICS_LOGGER_NAME, FINAL_STATEMENT_STATUS, \
MIMETYPE_IMAGE_PNG, MIMETYPE_TEXT_HTML, MIMETYPE_TEXT_PLAIN
from .exceptions import LivyUnexpectedStatusException
from .exceptions import LivyUnexpectedStatusException, \
BadUserConfigurationException


class Command(ObjectWithGuid):
def __init__(self, code, spark_events=None):
def __init__(self, code, spark_events=None, language=None):
super(Command, self).__init__()
self.code = textwrap.dedent(code)
self.language = language
self.logger = SparkLog(u"Command")
if spark_events is None:
spark_events = SparkEvents()
Expand All @@ -38,7 +40,11 @@ def execute(self, session):
statement_id = -1
try:
session.wait_for_idle()
data = {u"code": self.code}
try:
codekind = conf.get_livy_kind(self.language)
except BadUserConfigurationException as e:
codekind = session.kind
Comment on lines +45 to +46

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to default this to the session kind? A statement without any kind attribute will use the session kind by default. Reference: https://livy.incubator.apache.org/docs/latest/rest-api.html#session-kind

data = {u"code": self.code, u"kind": codekind}
response = session.http_client.post_statement(session.id, data)
statement_id = response[u'id']
output = self._get_statement_output(session, statement_id)
Expand Down Expand Up @@ -97,4 +103,4 @@ def _get_statement_output(self, session, statement_id):
MIMETYPE_TEXT_PLAIN)
else:
raise LivyUnexpectedStatusException(u"Unknown output status from Livy: '{}'"
.format(statement_output[u"status"]))
.format(statement_output[u"status"]))
2 changes: 1 addition & 1 deletion sparkmagic/sparkmagic/magics/remotesparkmagics.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def spark(self, line, cell="", local_ns=None):
elif len(subcommand) == 0:
coerce = get_coerce_value(args.coerce)
if args.context == CONTEXT_NAME_SPARK:
return self.execute_spark(cell, args.output, args.samplemethod,
return self.execute_spark(cell, args.language, args.output, args.samplemethod,
args.maxrows, args.samplefraction, args.session, coerce)
elif args.context == CONTEXT_NAME_SQL:
return self.execute_sqlquery(cell, args.samplemethod, args.maxrows, args.samplefraction,
Expand Down
4 changes: 2 additions & 2 deletions sparkmagic/sparkmagic/magics/sparkmagicsbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ def do_send_to_spark(self, cell, input_variable_name, var_type, output_variable_
self.ipython_display.write(u'Successfully passed \'{}\' as \'{}\' to Spark'
u' kernel'.format(input_variable_name, output_variable_name))

def execute_spark(self, cell, output_var, samplemethod, maxrows, samplefraction, session_name, coerce):
(success, out, mimetype) = self.spark_controller.run_command(Command(cell), session_name)
def execute_spark(self, cell, language, output_var, samplemethod, maxrows, samplefraction, session_name, coerce):
(success, out, mimetype) = self.spark_controller.run_command(Command(cell, language=language), session_name)
if not success:
if conf.shutdown_session_on_spark_statement_errors():
self.spark_controller.cleanup()
Expand Down
6 changes: 3 additions & 3 deletions sparkmagic/sparkmagic/tests/test_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def test_execute():

result = command.execute(session)

http_client.post_statement.assert_called_with(0, {"code": command.code})
http_client.post_statement.assert_called_with(0, {"code": command.code, "kind": kind})
http_client.get_statement.assert_called_with(0, 0)
assert result[0]
assert_equals(tls.TestLivySession.pi_result, result[1])
Expand Down Expand Up @@ -84,7 +84,7 @@ def test_execute_waiting():

result = command.execute(session)

http_client.post_statement.assert_called_with(0, {"code": command.code})
http_client.post_statement.assert_called_with(0, {"code": command.code, "kind": kind})
http_client.get_statement.assert_called_with(0, 0)
assert result[0]
assert_equals(tls.TestLivySession.pi_result, result[1])
Expand All @@ -111,7 +111,7 @@ def test_execute_null_ouput():

result = command.execute(session)

http_client.post_statement.assert_called_with(0, {"code": command.code})
http_client.post_statement.assert_called_with(0, {"code": command.code, "kind": kind})
http_client.get_statement.assert_called_with(0, 0)
assert result[0]
assert_equals(u"", result[1])
Expand Down
4 changes: 2 additions & 2 deletions sparkmagic/sparkmagic/tests/test_kernel_magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ def test_spark_sample_options():
magic.execute_spark = MagicMock()
ret = magic.spark(line, cell)

magic.execute_spark.assert_called_once_with(cell, "var_name", "sample", 142, 0.3, None, True)
magic.execute_spark.assert_called_once_with(cell, None, "var_name", "sample", 142, 0.3, None, True)


@with_setup(_setup, _teardown)
Expand All @@ -605,7 +605,7 @@ def test_spark_false_coerce():
magic.execute_spark = MagicMock()
ret = magic.spark(line, cell)

magic.execute_spark.assert_called_once_with(cell, "var_name", "sample", 142, 0.3, None, False)
magic.execute_spark.assert_called_once_with(cell, None, "var_name", "sample", 142, 0.3, None, False)


@with_setup(_setup, _teardown)
Expand Down
8 changes: 4 additions & 4 deletions sparkmagic/sparkmagic/tests/test_remotesparkmagics.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ def test_run_spark_command_parses():
result = magic.spark(line, cell)

magic.execute_spark.assert_called_once_with("cell code",
None, "sample", None, None, "sessions_name", None)
None, None, "sample", None, None, "sessions_name", None)


@with_setup(_setup, _teardown)
Expand All @@ -305,7 +305,7 @@ def test_run_spark_command_parses_with_coerce():
result = magic.spark(line, cell)

magic.execute_spark.assert_called_once_with("cell code",
None, "sample", None, None, "sessions_name", True)
None, None, "sample", None, None, "sessions_name", True)


@with_setup(_setup, _teardown)
Expand All @@ -326,7 +326,7 @@ def test_run_spark_command_parses_with_coerce_false():
result = magic.spark(line, cell)

magic.execute_spark.assert_called_once_with("cell code",
None, "sample", None, None, "sessions_name", False)
None, None, "sample", None, None, "sessions_name", False)


@with_setup(_setup, _teardown)
Expand Down Expand Up @@ -367,7 +367,7 @@ def test_run_spark_with_store_command_parses():

result = magic.spark(line, cell)
magic.execute_spark.assert_called_once_with("cell code",
"var_name", "sample", None, None, "sessions_name", None)
None, "var_name", "sample", None, None, "sessions_name", None)


@with_setup(_setup, _teardown)
Expand Down
14 changes: 7 additions & 7 deletions sparkmagic/sparkmagic/tests/test_sparkmagicsbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,12 @@ def test_spark_execution_without_output_var():
output_var = None

magic.spark_controller.run_command.return_value = (True,'out',MIMETYPE_TEXT_PLAIN)
magic.execute_spark("", output_var, None, None, None, session, None)
magic.execute_spark("", None, output_var, None, None, None, session, None)
magic.ipython_display.write.assert_called_once_with('out')
assert not magic.spark_controller._spark_store_command.called

magic.spark_controller.run_command.return_value = (False,'out',MIMETYPE_TEXT_PLAIN)
assert_raises(SparkStatementException, magic.execute_spark,"", output_var, None, None, None, session, True)
assert_raises(SparkStatementException, magic.execute_spark,"", None, output_var, None, None, None, session, True)
assert not magic.spark_controller._spark_store_command.called

@with_setup(_setup, _teardown)
Expand All @@ -245,14 +245,14 @@ def test_spark_execution_with_output_var():
df = 'df'

magic.spark_controller.run_command.side_effect = [(True,'out',MIMETYPE_TEXT_PLAIN), df]
magic.execute_spark("", output_var, None, None, None, session, True)
magic.execute_spark("", None, output_var, None, None, None, session, True)
magic.ipython_display.write.assert_called_once_with('out')
magic._spark_store_command.assert_called_once_with(output_var, None, None, None, True)
assert shell.user_ns[output_var] == df

magic.spark_controller.run_command.side_effect = None
magic.spark_controller.run_command.return_value = (False,'out',MIMETYPE_TEXT_PLAIN)
assert_raises(SparkStatementException, magic.execute_spark,"", output_var, None, None, None, session, True)
assert_raises(SparkStatementException, magic.execute_spark,"", None, output_var, None, None, None, session, True)


@with_setup(_setup, _teardown)
Expand All @@ -264,7 +264,7 @@ def test_spark_exception_with_output_var():
df = 'df'

magic.spark_controller.run_command.side_effect = [(True,'out',MIMETYPE_TEXT_PLAIN), exception]
assert_raises(BadUserDataException, magic.execute_spark,"", output_var, None, None, None, session, True)
assert_raises(BadUserDataException, magic.execute_spark,"", None, output_var, None, None, None, session, True)
magic.ipython_display.write.assert_called_once_with('out')
magic._spark_store_command.assert_called_once_with(output_var, None, None, None, True)
assert shell.user_ns == {}
Expand All @@ -276,7 +276,7 @@ def test_spark_statement_exception():
exception = BadUserDataException("Ka-boom!")

magic.spark_controller.run_command.side_effect = [(False, 'out', "text/plain"), exception]
assert_raises(SparkStatementException, magic.execute_spark,"", None, None, None, None, session, True)
assert_raises(SparkStatementException, magic.execute_spark,"", None, None, None, None, None, session, True)
magic.spark_controller.cleanup.assert_not_called()

@with_setup(_setup, _teardown)
Expand All @@ -290,5 +290,5 @@ def test_spark_statement_exception_shutdowns_livy_session():
exception = BadUserDataException("Ka-boom!")

magic.spark_controller.run_command.side_effect = [(False, 'out', "text/plain"), exception]
assert_raises(SparkStatementException, magic.execute_spark,"", None, None, None, None, session, True)
assert_raises(SparkStatementException, magic.execute_spark,"", None, None, None, None, None, session, True)
magic.spark_controller.cleanup.assert_called_once()