Skip to content

Commit

Permalink
add pipe_once special command
Browse files Browse the repository at this point in the history
Mostly copied from mycli. Add tests for once as part of getting that
going.

Updates dbcli#192
  • Loading branch information
danp committed Nov 23, 2024
1 parent da590d8 commit 3143963
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 14 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
## Upcoming - TBD

### Features

* Add `\pipe_once` / `\|` commands for sending output to a command

### Bug Fixes

### Internal Changes

## 1.12.4 - 2024-11-11

### Bug Fixes
Expand Down
2 changes: 2 additions & 0 deletions litecli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ def one_iteration(text=None):
result_count += 1
mutating = mutating or is_mutating(status)
special.unset_once_if_written()
special.unset_pipe_once_if_written()
except EOFError as e:
raise e
except KeyboardInterrupt:
Expand Down Expand Up @@ -658,6 +659,7 @@ def output(self, output, status=None):
self.log_output(line)
special.write_tee(line)
special.write_once(line)
special.write_pipe_once(line)

if fits or output_via_pager:
# buffering
Expand Down
74 changes: 60 additions & 14 deletions litecli/packages/special/iocommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
use_expanded_output = False
PAGER_ENABLED = True
tee_file = None
once_file = once_file_args = written_to_once_file = None
once_file = None
written_to_once_file = None
pipe_once_process = None
written_to_pipe_once_process = False
favoritequeries = FavoriteQueries(ConfigObj())


Expand Down Expand Up @@ -376,36 +379,79 @@ def write_tee(output):
aliases=("\\o", "\\once"),
)
def set_once(arg, **_):
global once_file_args

once_file_args = parseargfile(arg)
global once_file, written_to_once_file
try:
once_file = open(**parseargfile(arg))
except (IOError, OSError) as e:
raise OSError("Cannot write to file '{}': {}".format(e.filename, e.strerror))
written_to_once_file = False

return [(None, None, None, "")]


@export
def write_once(output):
global once_file, written_to_once_file
if output and once_file_args:
if once_file is None:
try:
once_file = open(**once_file_args)
except (IOError, OSError) as e:
once_file = None
raise OSError("Cannot write to file '{}': {}".format(e.filename, e.strerror))

if output and once_file:
click.echo(output, file=once_file, nl=False)
click.echo("\n", file=once_file, nl=False)
once_file.flush()
written_to_once_file = True


@export
def unset_once_if_written():
"""Unset the once file, if it has been written to."""
global once_file, once_file_args, written_to_once_file
global once_file, written_to_once_file
if once_file and written_to_once_file:
once_file.close()
once_file = once_file_args = written_to_once_file = None
once_file = written_to_once_file = None


@special_command("\\pipe_once", "\\| command", "Send next result to a subprocess.", aliases=("\\|",))
def set_pipe_once(arg, **_):
global pipe_once_process, written_to_pipe_once_process
pipe_once_cmd = shlex.split(arg)
if len(pipe_once_cmd) == 0:
raise OSError("pipe_once requires a command")
written_to_pipe_once_process = False
pipe_once_process = subprocess.Popen(
pipe_once_cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1,
encoding="UTF-8",
universal_newlines=True,
)
return [(None, None, None, "")]


@export
def write_pipe_once(output):
global pipe_once_process, written_to_pipe_once_process
if output and pipe_once_process:
try:
click.echo(output, file=pipe_once_process.stdin, nl=False)
click.echo("\n", file=pipe_once_process.stdin, nl=False)
except (IOError, OSError) as e:
pipe_once_process.terminate()
raise OSError("Failed writing to pipe_once subprocess: {}".format(e.strerror))
written_to_pipe_once_process = True


@export
def unset_pipe_once_if_written():
"""Unset the pipe_once cmd, if it has been written to."""
global pipe_once_process, written_to_pipe_once_process
if written_to_pipe_once_process:
(stdout_data, stderr_data) = pipe_once_process.communicate()
if len(stdout_data) > 0:
print(stdout_data.rstrip("\n"))
if len(stderr_data) > 0:
print(stderr_data.rstrip("\n"))
pipe_once_process = None
written_to_pipe_once_process = False


@special_command(
Expand Down
59 changes: 59 additions & 0 deletions tests/test_special_iocommands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import os
import tempfile

import pytest

import litecli.packages.special


def test_once_command():
with pytest.raises(TypeError):
litecli.packages.special.execute(None, ".once")

with pytest.raises(OSError):
litecli.packages.special.execute(None, ".once /proc/access-denied")

litecli.packages.special.write_once("hello world") # write without file set
# keep Windows from locking the file with delete=False
with tempfile.NamedTemporaryFile(delete=False) as f:
litecli.packages.special.execute(None, ".once " + f.name)
litecli.packages.special.write_once("hello world")
if os.name == "nt":
assert f.read() == b"hello world\r\n"
else:
assert f.read() == b"hello world\n"

litecli.packages.special.execute(None, ".once -o " + f.name)
litecli.packages.special.write_once("hello world line 1")
litecli.packages.special.write_once("hello world line 2")
f.seek(0)
if os.name == "nt":
assert f.read() == b"hello world line 1\r\nhello world line 2\r\n"
else:
assert f.read() == b"hello world line 1\nhello world line 2\n"
# delete=False means we should try to clean up
try:
if os.path.exists(f.name):
os.remove(f.name)
except Exception as e:
print(f"An error occurred while attempting to delete the file: {e}")


def test_pipe_once_command():
with pytest.raises(IOError):
litecli.packages.special.execute(None, "\\pipe_once")

with pytest.raises(OSError):
litecli.packages.special.execute(None, "\\pipe_once /proc/access-denied")

if os.name == "nt":
litecli.packages.special.execute(None, '\\pipe_once python -c "import sys; print(len(sys.stdin.read().strip()))"')
litecli.packages.special.write_pipe_once("hello world")
litecli.packages.special.unset_pipe_once_if_written()
else:
with tempfile.NamedTemporaryFile() as f:
litecli.packages.special.execute(None, "\\pipe_once tee " + f.name)
litecli.packages.special.write_pipe_once("hello world")
litecli.packages.special.unset_pipe_once_if_written()
f.seek(0)
assert f.read() == b"hello world\n"

0 comments on commit 3143963

Please sign in to comment.