Tensor Neural Engine Kompanion. An util library based on PyTorch and PyTorch Lightning.
The tensorneko requires pytorch and pytorch-lightning (optional), and you can install it with below command.
pip install tensorneko # for PyTorch only
pip install tensorneko[lightning] # for PyTorch and Lightning
To use the library without PyTorch and PyTorch Lightning, you can install the util library (support Python 3.7 ~ 3.12 with limited features) with following command.
pip install tensorneko_util
Some cpu bound functions are implemented by rust-based pyo3
, and you can install the optimized version with below command.
pip install tensorneko_lib
Some CLI tools are provided in the tensorneko_tool
package, and you can install it with below command.
pipx install tensorneko_tool # or `pip install tensorneko_tool`
Then you can use the CLI tools tensorneko
in the terminal.
Build an MLP with linear layers. The activation and normalization will be placed in the hidden layers.
784 -> 1024 -> 512 -> 10
import tensorneko as neko
import torch.nn
mlp = neko.module.MLP(
neurons=[784, 1024, 512, 10],
build_activation=torch.nn.ReLU,
build_normalization=[
lambda: torch.nn.BatchNorm1d(1024),
lambda: torch.nn.BatchNorm1d(512)
],
dropout_rate=0.5
)
Build a Conv2d with activation and normalization.
import tensorneko as neko
import torch.nn
conv2d = neko.layer.Conv2d(
in_channels=256,
out_channels=1024,
kernel_size=(3, 3),
padding=(1, 1),
build_activation=torch.nn.ReLU,
build_normalization=lambda: torch.nn.BatchNorm2d(256),
normalization_after_activation=False
)
Layers:
Aggregation
Concatenate
Conv
,Conv1d
,Conv2d
,Conv3d
GaussianNoise
ImageAttention
,SeqAttention
MaskedConv2d
,MaskedConv2dA
,MaskedConv2dB
Linear
Log
PatchEmbedding2d
PositionalEmbedding
Reshape
Stack
VectorQuantizer
Modules:
DenseBlock
InceptionModule
MLP
ResidualBlock
andResidualModule
AttentionModule
,TransformerEncoderBlock
andTransformerEncoder
GatedConv
Architectures:
AutoEncoder
GAN
WGAN
VQVAE
All tensorneko.layer
and tensorneko.module
are NekoModule
. They can be used in
fn.py pipe operation.
from tensorneko.layer import Linear
from torch.nn import ReLU
import torch
linear0 = Linear(16, 128, build_activation=ReLU)
linear1 = Linear(128, 1)
f = linear0 >> linear1
print(f(torch.rand(16)).shape)
# torch.Size([1])
Easily load and save different modal data.
import tensorneko as neko
from tensorneko.io import json_data
from typing import List
# read video (Temporal, Channel, Height, Width)
video_tensor, audio_tensor, video_info = neko.io.read.video("path/to/video.mp4")
# write video
neko.io.write.video("path/to/video.mp4",
video_tensor, video_info.video_fps,
audio_tensor, video_info.audio_fps
)
# read audio (Channel, Temporal)
audio_tensor, sample_rate = neko.io.read.audio("path/to/audio.wav")
# write audio
neko.io.write.audio("path/to/audio.wav", audio_tensor, sample_rate)
# read image (Channel, Height, Width) with float value in range [0, 1]
image_tensor = neko.io.read.image("path/to/image.png")
# write image
neko.io.write.image("path/to/image.png", image_tensor)
neko.io.write.image("path/to/image.jpg", image_tensor)
# read plain text
text_string = neko.io.read.text("path/to/text.txt")
# write plain text
neko.io.write.text("path/to/text.txt", text_string)
# read json as python dict or list
json_dict = neko.io.read.json("path/to/json.json")
# read json as an object
@json_data
class JsonData:
x: int
y: int
json_obj: List[JsonData] = neko.io.read.json("path/to/json.json", cls=List[JsonData])
# write json from python dict/list or json_data decorated object
neko.io.write.json("path/to/json.json", json_dict)
neko.io.write.json("path/to/json.json", json_obj)
Besides, the read/write for mat
and pickle
files is also supported.
import tensorneko as neko
# A video tensor with (120, 3, 720, 1280)
video = neko.io.read.video("example/video.mp4").video
# Get a resized tensor with (120, 3, 256, 256)
resized_video = neko.preprocess.resize_video(video, (256, 256))
resize_video
resize_image
padding_video
padding_audio
crop_with_padding
frames2video
if ffmpeg
is available, you can use below ffmpeg wrappers.
video2frames
merge_video_audio
resample_video_fps
mp32wav
Start a web server to watch the variable status when the program (e.g. training, inference, data preprocessing) is running.
import time
from tensorneko.visualization.watcher import *
data_list = ... # a list of data
def preprocessing(d): ...
# initialize the components
pb = ProgressBar("Processing", total=len(data_list))
logger = Logger("Log message")
var = Variable("Some Value", 0)
line_chart = LineChart("Line Chart", x_label="x", y_label="y")
view = View("Data preprocessing").add_all()
t0 = time.time()
# open server when the code block in running.
with Server(view, port=8000):
for i, data in enumerate(data_list):
preprocessing(data) # do some processing here
x = time.time() - t0 # time since the start of the program
y = i # processed number of data
line_chart.add(x, y) # add to the line chart
logger.log("Some messages") # log messages to the server
var.value = ... # keep tracking a variable
pb.add(1) # update the progress bar by add 1
When the script is running, go to 127.0.0.1:8000
to keep tracking the status.
Simply run tensorboard server in Python script.
import tensorneko as neko
with neko.visualization.tensorboard.Server(port=6006):
trainer.fit(model, dm)
Display an image of (C, H, W) shape by plt.imshow
wrapper.
import tensorneko as neko
import matplotlib.pyplot as plt
image_tensor = ... # an image tensor with shape (C, H, W)
neko.visualization.matplotlib.imshow(image_tensor)
plt.show()
Several aesthetic colors are predefined.
import tensorneko as neko
import matplotlib.pyplot as plt
# use with matplotlib
plt.plot(..., color=neko.visualization.Colors.RED)
# the palette for seaborn is also available
from tensorneko_util.visualization.seaborn import palette
import seaborn as sns
sns.set_palette(palette)
Build and train a simple model for classifying MNIST with MLP.
from typing import Optional, Union, Sequence, Dict, List
import torch.nn
from torch import Tensor
from torch.optim import Adam
from torchmetrics import Accuracy
from lightning.pytorch.callbacks import ModelCheckpoint
import tensorneko as neko
from tensorneko.util import get_activation, get_loss
class MnistClassifier(neko.NekoModel):
def __init__(self, name: str, mlp_neurons: List[int], activation: str, dropout_rate: float, loss: str,
learning_rate: float, weight_decay: float
):
super().__init__(name)
self.weight_decay = weight_decay
self.learning_rate = learning_rate
self.flatten = torch.nn.Flatten()
self.mlp = neko.module.MLP(
neurons=mlp_neurons,
build_activation=get_activation(activation),
dropout_rate=dropout_rate
)
self.loss_func = get_loss(loss)()
self.acc_func = Accuracy()
def forward(self, x):
# (batch, 28, 28)
x = self.flatten(x)
# (batch, 768)
x = self.mlp(x)
# (batch, 10)
return x
def training_step(self, batch: Optional[Union[Tensor, Sequence[Tensor]]] = None, batch_idx: Optional[int] = None,
optimizer_idx: Optional[int] = None, hiddens: Optional[Tensor] = None
) -> Dict[str, Tensor]:
x, y = batch
logit = self(x)
prob = logit.sigmoid()
loss = self.loss_func(logit, y)
acc = self.acc_func(prob.max(dim=1)[1], y)
return {"loss": loss, "acc": acc}
def validation_step(self, batch: Optional[Union[Tensor, Sequence[Tensor]]] = None, batch_idx: Optional[int] = None,
dataloader_idx: Optional[int] = None
) -> Dict[str, Tensor]:
x, y = batch
logit = self(x)
prob = logit.sigmoid()
loss = self.loss_func(logit, y)
acc = self.acc_func(prob.max(dim=1)[1], y)
return {"loss": loss, "acc": acc}
def configure_optimizers(self):
optimizer = Adam(self.parameters(), lr=self.learning_rate, betas=(0.5, 0.9), weight_decay=self.weight_decay)
return {
"optimizer": optimizer
}
model = MnistClassifier("mnist_mlp_classifier", [784, 1024, 512, 10], "ReLU", 0.5, "CrossEntropyLoss", 1e-4, 1e-4)
dm = ... # The MNIST datamodule from PyTorch Lightning
trainer = neko.NekoTrainer(log_every_n_steps=100, gpus=1, logger=model.name, precision=32,
callbacks=[ModelCheckpoint(dirpath="./ckpt",
save_last=True, filename=model.name + "-{epoch}-{val_acc:.3f}", monitor="val_acc", mode="max"
)])
trainer.fit(model, dm)
Some simple but useful pytorch-lightning callbacks are provided.
DisplayMetricsCallback
EarlyStoppingLR
: Early stop training when learning rate reaches threshold.
Here are some helper functions to better interact with Jupyter Notebook.
import tensorneko as neko
# display a video
neko.notebook.display.video("path/to/video.mp4")
# display an audio
neko.notebook.display.audio("path/to/audio.wav")
# display a code file
neko.notebook.display.code("path/to/code.java")
Get the default values from ArgumentParser
args. It's convenient to use this in the notebook.
from argparse import ArgumentParser
from tensorneko.debug import get_parser_default_args
parser = ArgumentParser()
parser.add_argument("integers", type=int, nargs="+", default=[1, 2, 3])
parser.add_argument("--sum", dest="accumulate", action="store_const", const=sum, default=max)
args = get_parser_default_args(parser)
print(args.integers) # [1, 2, 3]
print(args.accumulate) # <function sum at ...>
Some metrics function for evaluation are provided.
iou_1d
iou_2d
psnr_video
psnr_image
ssim_video
ssim_image
Send a message to the Gotify server.
The title, URL and APP_TOKEN is the environment variable GOTIFY_TITLE
, GOTIFY_URL
and GOTIFY_TOKEN
, or overwritten
in the function arguments.
from tensorneko.msg import gotify
gotify.push("This is a test message", "<URL>", "<APP_TOKEN>")
# then the message will be sent to the Gotify server.
# title = "<HOST_NAME>", message = "This is a test message", priority = 0
Require the psycopg
package. Provide one single function to execute one SQL query with a temp connection.
The database URL is the environment variable DB_URL
, or overwritten in the function arguments.
from tensorneko.msg import postgres
result = postgres.execute("<SQL>", "<DB_URL>")
# also async version is provided
result = await postgres.execute_async("<SQL>", "<DB_URL>")
__
: The arguments to pipe operator. (Inspired from fn.py)
from tensorneko.util import __, _
result = __(20) >> (_ + 1) >> (_ * 2) >> __.get
print(result)
# 42
Seq
and Stream
: A collection wrapper for method chaining with concurrent supporting.
from tensorneko.util import Seq, Stream, _
from tensorneko_util.backend.parallel import ParallelType
# using method chaining
seq = Seq.of(1, 2, 3).map(_ + 1).filter(_ % 2 == 0).map(_ * 2).take(2).to_list()
# return [4, 8]
# using bit shift operator to chain the sequence
seq = Seq.of(1, 2, 3) << Seq.of(2, 3, 4) << [3, 4, 5]
# return Seq(1, 2, 3, 2, 3, 4, 3, 4, 5)
# run concurrent with `for_each` for Stream
if __name__ == '__main__':
Stream.of(1, 2, 3, 4).for_each(print, progress_bar=True, parallel_type=ParallelType.PROCESS)
Option
: A monad for dealing with data.
from tensorneko.util import return_option
@return_option
def get_data():
if some_condition:
return 1
else:
return None
def process_data(n: int):
if condition(n):
return n
else:
return None
data = get_data()
data = data.map(process_data).get_or_else(-1) # if the response is None, return -1
Eval
: A monad for lazy evaluation.
from tensorneko.util import Eval
@Eval.always
def call_by_name_var():
return 42
@Eval.later
def call_by_need_var():
return 43
@Eval.now
def call_by_value_var():
return 44
print(call_by_name_var.value) # 42
This library provides event bus based reactive tools. The API integrates the Python type annotation syntax.
# useful decorators for default event bus
from tensorneko.util import subscribe
# Event base type
from tensorneko.util import Event, EventBus
class LogEvent(Event):
def __init__(self, message: str):
self.message = message
# the event argument should be annotated correctly
@subscribe # run in the main thread
def log_information(event: LogEvent):
print(event.message)
@subscribe.thread # run in a new thread
def log_information_thread(event: LogEvent):
print(event.message, "in another thread")
@subscribe.coro # run with async
async def log_information_async(event: LogEvent):
print(event.message, "async")
@subscribe.process # run in a new process
def log_information_process(event: LogEvent):
print(event.message, "in a new process")
if __name__ == '__main__':
# emit an event, and then the event handler will be invoked
# The sequential order is not guaranteed
LogEvent("Hello world!")
EventBus.default.wait() # it's not blocking, need to call wait manually before exit.
# one possible output:
# Hello world! in another thread
# Hello world! async
# Hello world!
# Hello world! in a new process
dispatch
: Multi-dispatch implementation for Python.
To my knowledge, 3 popular multi-dispatch libraries still have critical limitations. plum doesn't support static methods, mutipledispatch doesn't support Python type annotation syntax and multimethod doesn't support default argument. TensorNeko can do it all.
from tensorneko.util import dispatch
class DispatchExample:
@staticmethod
@dispatch
def go() -> None:
print("Go0")
@staticmethod
@dispatch
def go(x: int) -> None:
print("Go1")
@staticmethod
@dispatch
def go(x: float, y: float = 1.0) -> None:
print("Go2")
@dispatch
def come(x: int) -> str:
return "Come1"
@dispatch.of(str)
def come(x) -> str:
return "Come2"
StringGetter
: Get PyTorch class from string.
import tensorneko as neko
activation = neko.util.get_activation("leakyRelu")()
Seed
: The universal seed for numpy
, torch
and Python random
.
from tensorneko.util import Seed
from torch.utils.data import DataLoader
# set seed to 42 for all numpy, torch and python random
Seed.set(42)
# Apply seed to parallel workers of DataLoader
DataLoader(
train_dataset,
batch_size=batch_size,
num_workers=num_workers,
worker_init_fn=Seed.get_loader_worker_init(),
generator=Seed.get_torch_generator()
)
Timer
: A timer for measuring the time.
from tensorneko.util import Timer
import time
# use as a context manager with single time
with Timer():
time.sleep(1)
# use as a context manager with multiple segments
with Timer() as t:
time.sleep(1)
t.time("sleep A")
time.sleep(1)
t.time("sleep B")
time.sleep(1)
# use as a decorator
@Timer()
def f():
time.sleep(1)
print("f")
Singleton
: A decorator to make a class as a singleton. Inspired from Scala/Kotlin.
from tensorneko.util import Singleton
@Singleton
class MyObject:
def __init__(self):
self.value = 0
def add(self, value):
self.value += value
return self.value
print(MyObject.value) # 0
MyObject.add(1)
print(MyObject.value) # 1
Besides, many miscellaneous functions are also provided.
Functions list (in tensorneko_util
):
generate_inf_seq
compose
listdir
with_printed
ifelse
dict_add
as_list
identity
list_to_dict
get_tensorneko_util_path
Functions list (in tensorneko
):
reduce_dict_by
summarize_dict_by
with_printed_shape
is_bad_num
count_parameters
Some CLI tools are provided in the tensorneko_tool
package.
The gotify
can send a message to the Gotify server, with the environment variables GOTIFY_URL
and GOTIFY_TOKEN
set.
tensorneko gotify "Script finished!"