Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add pipe_once special command #193

Merged
merged 1 commit into from
Nov 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
## Upcoming - TBD

### Features

* Add `\pipe_once` / `\|` commands for sending output to a command

### Bug Fixes

### Internal Changes

## 1.12.4 - 2024-11-11

### Bug Fixes
Expand Down
2 changes: 2 additions & 0 deletions litecli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ def one_iteration(text=None):
result_count += 1
mutating = mutating or is_mutating(status)
special.unset_once_if_written()
special.unset_pipe_once_if_written()
except EOFError as e:
raise e
except KeyboardInterrupt:
Expand Down Expand Up @@ -658,6 +659,7 @@ def output(self, output, status=None):
self.log_output(line)
special.write_tee(line)
special.write_once(line)
special.write_pipe_once(line)

if fits or output_via_pager:
# buffering
Expand Down
74 changes: 60 additions & 14 deletions litecli/packages/special/iocommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
use_expanded_output = False
PAGER_ENABLED = True
tee_file = None
once_file = once_file_args = written_to_once_file = None
once_file = None
written_to_once_file = None
pipe_once_process = None
written_to_pipe_once_process = False
favoritequeries = FavoriteQueries(ConfigObj())


Expand Down Expand Up @@ -376,36 +379,79 @@ def write_tee(output):
aliases=("\\o", "\\once"),
)
def set_once(arg, **_):
global once_file_args

once_file_args = parseargfile(arg)
global once_file, written_to_once_file
try:
once_file = open(**parseargfile(arg))
except (IOError, OSError) as e:
raise OSError("Cannot write to file '{}': {}".format(e.filename, e.strerror))
written_to_once_file = False

return [(None, None, None, "")]


@export
def write_once(output):
global once_file, written_to_once_file
if output and once_file_args:
if once_file is None:
try:
once_file = open(**once_file_args)
except (IOError, OSError) as e:
once_file = None
raise OSError("Cannot write to file '{}': {}".format(e.filename, e.strerror))

if output and once_file:
click.echo(output, file=once_file, nl=False)
click.echo("\n", file=once_file, nl=False)
once_file.flush()
written_to_once_file = True


@export
def unset_once_if_written():
"""Unset the once file, if it has been written to."""
global once_file, once_file_args, written_to_once_file
global once_file, written_to_once_file
if once_file and written_to_once_file:
once_file.close()
once_file = once_file_args = written_to_once_file = None
once_file = written_to_once_file = None


@special_command("\\pipe_once", "\\| command", "Send next result to a subprocess.", aliases=("\\|",))
def set_pipe_once(arg, **_):
global pipe_once_process, written_to_pipe_once_process
pipe_once_cmd = shlex.split(arg)
if len(pipe_once_cmd) == 0:
raise OSError("pipe_once requires a command")
written_to_pipe_once_process = False
pipe_once_process = subprocess.Popen(
pipe_once_cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1,
encoding="UTF-8",
universal_newlines=True,
)
return [(None, None, None, "")]


@export
def write_pipe_once(output):
global pipe_once_process, written_to_pipe_once_process
if output and pipe_once_process:
try:
click.echo(output, file=pipe_once_process.stdin, nl=False)
click.echo("\n", file=pipe_once_process.stdin, nl=False)
except (IOError, OSError) as e:
pipe_once_process.terminate()
raise OSError("Failed writing to pipe_once subprocess: {}".format(e.strerror))
written_to_pipe_once_process = True


@export
def unset_pipe_once_if_written():
"""Unset the pipe_once cmd, if it has been written to."""
global pipe_once_process, written_to_pipe_once_process
if written_to_pipe_once_process:
(stdout_data, stderr_data) = pipe_once_process.communicate()
if len(stdout_data) > 0:
print(stdout_data.rstrip("\n"))
if len(stderr_data) > 0:
print(stderr_data.rstrip("\n"))
pipe_once_process = None
written_to_pipe_once_process = False


@special_command(
Expand Down
59 changes: 59 additions & 0 deletions tests/test_special_iocommands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import os
import tempfile

import pytest

import litecli.packages.special


def test_once_command():
with pytest.raises(TypeError):
litecli.packages.special.execute(None, ".once")

with pytest.raises(OSError):
litecli.packages.special.execute(None, ".once /proc/access-denied")

litecli.packages.special.write_once("hello world") # write without file set
# keep Windows from locking the file with delete=False
with tempfile.NamedTemporaryFile(delete=False) as f:
litecli.packages.special.execute(None, ".once " + f.name)
litecli.packages.special.write_once("hello world")
if os.name == "nt":
assert f.read() == b"hello world\r\n"
else:
assert f.read() == b"hello world\n"

litecli.packages.special.execute(None, ".once -o " + f.name)
litecli.packages.special.write_once("hello world line 1")
litecli.packages.special.write_once("hello world line 2")
f.seek(0)
if os.name == "nt":
assert f.read() == b"hello world line 1\r\nhello world line 2\r\n"
else:
assert f.read() == b"hello world line 1\nhello world line 2\n"
# delete=False means we should try to clean up
try:
if os.path.exists(f.name):
os.remove(f.name)
except Exception as e:
print(f"An error occurred while attempting to delete the file: {e}")


def test_pipe_once_command():
with pytest.raises(IOError):
litecli.packages.special.execute(None, "\\pipe_once")

with pytest.raises(OSError):
litecli.packages.special.execute(None, "\\pipe_once /proc/access-denied")

if os.name == "nt":
litecli.packages.special.execute(None, '\\pipe_once python -c "import sys; print(len(sys.stdin.read().strip()))"')
litecli.packages.special.write_pipe_once("hello world")
litecli.packages.special.unset_pipe_once_if_written()
else:
with tempfile.NamedTemporaryFile() as f:
litecli.packages.special.execute(None, "\\pipe_once tee " + f.name)
litecli.packages.special.write_pipe_once("hello world")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this up from mycli, using tee to write the output to a file which can then be checked. But I did notice around here that mycli is using write_once in this test instead of write_pipe_once.

litecli.packages.special.unset_pipe_once_if_written()
f.seek(0)
assert f.read() == b"hello world\n"
Loading