Skip to content

Commit

Permalink
Add Credentials and service-key to Cli
Browse files Browse the repository at this point in the history
  • Loading branch information
stegm committed Dec 4, 2024
1 parent 1cf9bdc commit e4f3949
Show file tree
Hide file tree
Showing 2 changed files with 254 additions and 43 deletions.
188 changes: 145 additions & 43 deletions pykoplenti/cli.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from ast import literal_eval
import asyncio
from collections import defaultdict
from dataclasses import dataclass
from inspect import iscoroutinefunction
import os
from pathlib import Path
from pprint import pprint
import re
import tempfile
import traceback
from typing import Any, Awaitable, Callable, Dict, Union
from typing import Any, Awaitable, Callable, Dict, Optional, Union
import warnings

from aiohttp import ClientSession, ClientTimeout
import click
Expand All @@ -20,35 +23,38 @@
class SessionCache:
"""Persistent the session in a temporary file."""

def __init__(self, host):
self.host = host
def __init__(self, host: str, user: str):
self._cache_file = Path(
tempfile.gettempdir(), f"pykoplenti-session-{host}-{user}"
)

def read_session_id(self) -> Union[str, None]:
file = os.path.join(tempfile.gettempdir(), f"pykoplenti-session-{self.host}")
if os.path.isfile(file):
with open(file, "rt") as f:
if self._cache_file.is_file():
with self._cache_file.open("rt") as f:
return f.readline(256)
else:
return None

def write_session_id(self, id: str):
file = os.path.join(tempfile.gettempdir(), f"pykoplenti-session-{self.host}")
f = os.open(file, os.O_WRONLY | os.O_TRUNC | os.O_CREAT, mode=0o600)
f = os.open(self._cache_file, os.O_WRONLY | os.O_TRUNC | os.O_CREAT, mode=0o600)
try:
os.write(f, id.encode("ascii"))
finally:
os.close(f)

def remove(self):
self._cache_file.unlink(missing_ok=True)


class ApiShell:
"""Provides a shell-like access to the inverter."""

def __init__(self, client: ApiClient):
def __init__(self, client: ApiClient, user: str):
super().__init__()
self.client = client
self._session_cache = SessionCache(self.client.host)
self._session_cache = SessionCache(self.client.host, user)

async def prepare_client(self, passwd):
async def prepare_client(self, key: Optional[str], service_code: Optional[str]):
# first try to reuse existing session
session_id = self._session_cache.read_session_id()
if session_id is not None:
Expand All @@ -61,17 +67,19 @@ async def prepare_client(self, passwd):

print_formatted_text("Failed")

if passwd is not None:
if key is not None:
print_formatted_text("Logging in... ", end=None)
await self.client.login(passwd)
await self.client.login(key=key, service_code=service_code)
self._session_cache.write_session_id(self.client.session_id)
print_formatted_text("Success")
else:
print_formatted_text("Session could not be reused and no key given")

def print_exception(self):
"""Prints an excpetion from executing a method."""
print_formatted_text(traceback.format_exc())

async def run(self, passwd):
async def run(self, key: Optional[str], service_code: Optional[str]):
session = PromptSession()
print_formatted_text(flush=True) # Initialize output

Expand All @@ -83,7 +91,7 @@ async def run(self, passwd):
# get_setting_values 'scb:time'
# set_setting_values 'devices:local' {'Battery:MinSoc':'15'}

await self.prepare_client(passwd)
await self.prepare_client(key, service_code)

while True:
try:
Expand Down Expand Up @@ -153,82 +161,164 @@ async def _execute(self, method, args):
self.print_exception()


async def repl_main(host, port, passwd):
async def repl_main(
host: str, port: int, key: Optional[str], service_code: Optional[str]
):
async with ClientSession(timeout=ClientTimeout(total=10)) as session:
client = ExtendedApiClient(session, host=host, port=port)

shell = ApiShell(client)
await shell.run(passwd)
shell = ApiShell(client, "user" if service_code is None else "master")
await shell.run(key, service_code)


async def command_main(
host: str, port: int, passwd: str, fn: Callable[[ApiClient], Awaitable[Any]]
host: str,
port: int,
key: Optional[str],
service_code: Optional[str],
fn: Callable[[ApiClient], Awaitable[Any]],
):
async with ClientSession(timeout=ClientTimeout(total=10)) as session:
client = ExtendedApiClient(session, host=host, port=port)
session_cache = SessionCache(host)
session_cache = SessionCache(host, "user" if service_code is None else "master")

# Try to reuse an existing session
client.session_id = session_cache.read_session_id()
me = await client.get_me()
if not me.is_authenticated:
if key is None:
raise ValueError("Could not reuse session and no login key is given.")

# create a new session
await client.login(passwd)
await client.login(key=key, service_code=service_code)

if client.session_id is not None:
session_cache.write_session_id(client.session_id)

await fn(client)


@dataclass
class GlobalArgs:
"""Global arguments over all sub commands."""

def __init__(self):
self.host = None
self.port = None
self.password = None
self.password_file = None
host: str = ""
"""The hostname or ip of the inverter."""

port: int = 0
"""The port on which the API listens on the inverter."""

key: Optional[str] = None
"""The key (password or master key) to login into the API.
If None, a previous session cache is used. If the session
cache has no valid session, no login is executed.
"""

service_code: Optional[str] = None
"""The service code for master access.
Only necessary for master access. If missing, user acess is used.
"""


pass_global_args = click.make_pass_decorator(GlobalArgs, ensure=True)


@click.group()
@click.option("--host", help="hostname or ip of the inverter")
@click.option("--port", default=80, help="port of the inverter (default 80)")
@click.option("--password", default=None, help="the password")
@click.option("--port", default=80, help="port of the inverter", show_default=True)
@click.option(
"--password", default=None, help="the password or master key (also device id)"
)
@click.option("--service-code", default=None, help="service code for installer access")
@click.option(
"--password-file",
default="secrets",
help='password file (default "secrets" in the current working directory)',
help="password file - deprecated, use --secret-file",
show_default=True,
type=click.Path(exists=False, dir_okay=False, readable=True, path_type=Path),
)
@click.option(
"--credentials",
default=None,
help="file with credentials to login",
type=click.Path(exists=True, dir_okay=False, readable=True, path_type=Path),
)
@pass_global_args
def cli(global_args, host, port, password, password_file):
def cli(
global_args: GlobalArgs,
host: str,
port: int,
password: Optional[str],
service_code: Optional[str],
password_file: Path,
credentials: Path,
):
"""Handling of global arguments with click"""
global_args.host = host
global_args.port = port

if password is not None:
global_args.passwd = password
elif os.path.isfile(password_file):
global_args.key = password
elif password_file.is_file():
with open(password_file, "rt") as f:
global_args.passwd = f.readline()
else:
global_args.passwd = None
global_args.key = f.readline()
warnings.warn(
"--password-file is deprecated. Use --credentials instead.",
DeprecationWarning,
)

if service_code is not None:
global_args.service_code = service_code

if credentials is not None:
if password is not None:
raise click.BadOptionUsage(
"password", "password cannot be used with credentials"
)
if password_file is not None and password_file.is_file():
raise click.BadOptionUsage(
"password-file", "password-file cannot be used with credentials"
)
if service_code is not None:
raise click.BadOptionUsage(
"service_code", "service_code cannot be used with credentials"
)

global_args.host = host
global_args.port = port
for line in credentials.read_text().splitlines():
# Fallback to previous password file
if "=" not in line:
global_args.key = line
break

name, _, value = line.partition("=")
name = name.strip()
if name in ("password", "key", "master-key"):
global_args.key = value.strip()
elif name == "service-code":
global_args.service_code = value.strip()


@cli.command()
@pass_global_args
def repl(global_args):
def repl(global_args: GlobalArgs):
"""Provides a simple REPL for executing API requests to the inverter."""
asyncio.run(repl_main(global_args.host, global_args.port, global_args.passwd))
asyncio.run(
repl_main(
global_args.host,
global_args.port,
global_args.key,
global_args.service_code,
)
)


@cli.command()
@click.option("--lang", default=None, help="language for events")
@click.option("--count", default=10, help="number of events to read")
@pass_global_args
def read_events(global_args, lang, count):
def read_events(global_args: GlobalArgs, lang, count):
"""Returns the last events"""

async def fn(client: ApiClient):
Expand All @@ -240,7 +330,13 @@ async def fn(client: ApiClient):
)

asyncio.run(
command_main(global_args.host, global_args.port, global_args.passwd, fn)
command_main(
global_args.host,
global_args.port,
global_args.key,
global_args.service_code,
fn,
)
)


Expand All @@ -267,7 +363,7 @@ async def fn(client: ApiClient):

@cli.command()
@pass_global_args
def all_processdata(global_args):
def all_processdata(global_args: GlobalArgs):
"""Returns a list of all available process data."""

async def fn(client: ApiClient):
Expand All @@ -277,7 +373,13 @@ async def fn(client: ApiClient):
print(f"{k}/{x}")

asyncio.run(
command_main(global_args.host, global_args.port, global_args.passwd, fn)
command_main(
global_args.host,
global_args.port,
global_args.key,
global_args.service_code,
fn,
)
)


Expand Down
Loading

0 comments on commit e4f3949

Please sign in to comment.