diff --git a/.gitignore b/.gitignore index cb2fd28d8..d7fb89118 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,8 @@ __pycache__ /pydantic/ node_modules *.squashfs +.vscode +rootfs/ /examples/example_http_rust/target/ /examples/example_django/static/admin/ /runtimes/aleph-debian-11-python/rootfs/ diff --git a/docker/run_vm_supervisor.sh b/docker/run_vm_supervisor.sh index 7b076783c..a193e5002 100755 --- a/docker/run_vm_supervisor.sh +++ b/docker/run_vm_supervisor.sh @@ -11,10 +11,11 @@ fi $DOCKER_COMMAND build -t alephim/vm-supervisor-dev -f docker/vm_supervisor-dev.dockerfile . $DOCKER_COMMAND run -ti --rm \ - -v "$(pwd)/runtimes/aleph-debian-11-python/rootfs.squashfs:/opt/aleph-vm/runtimes/aleph-debian-11-python/rootfs.squashfs:ro" \ - -v "$(pwd)/examples/volumes/volume-venv.squashfs:/opt/aleph-vm/examples/volumes/volume-venv.squashfs:ro" \ - -v "$(pwd)/vm_supervisor:/opt/aleph-vm/vm_supervisor:ro" \ - -v "$(pwd)/firecracker:/opt/aleph-vm/firecracker:ro" \ --device /dev/kvm \ -p 4020:4020 \ alephim/vm-supervisor-dev $@ + + # -v "$(pwd)/runtimes/aleph-debian-11-python/rootfs.squashfs:/opt/aleph-vm/runtimes/aleph-debian-11-python/rootfs.squashfs:ro" \ +# -v "$(pwd)/examples/volumes/volume-venv.squashfs:/opt/aleph-vm/examples/volumes/volume-venv.squashfs:ro" \ +# -v "$(pwd)/vm_supervisor:/opt/aleph-vm/vm_supervisor:ro" \ +# -v "$(pwd)/firecracker:/opt/aleph-vm/firecracker:ro" \ diff --git a/docker/run_vm_supervisor_2.sh b/docker/run_vm_supervisor_2.sh new file mode 100755 index 000000000..82175e0ad --- /dev/null +++ b/docker/run_vm_supervisor_2.sh @@ -0,0 +1,21 @@ +#!/bin/sh + +# Use Podman if installed, else use Docker +if hash podman 2> /dev/null +then + DOCKER_COMMAND=podman +else + DOCKER_COMMAND=docker +fi + +$DOCKER_COMMAND build -t alephim/vm-supervisor-dev -f docker/vm_supervisor-dev-docker.dockerfile . + +$DOCKER_COMMAND run -ti --privileged --name=vm_supervisor_docker --rm \ + -v "$(pwd)/runtimes/aleph-docker/:/opt/aleph-vm/runtimes/aleph-docker/:ro" \ + -v "$(pwd)/examples/volumes/docker-data.squashfs:/opt/aleph-vm/examples/volumes/docker-data.squashfs:ro" \ + -v "$(pwd)/examples/example_docker_container:/opt/aleph-vm/examples/example_docker_container:ro" \ + -v "$(pwd)/vm_supervisor:/opt/aleph-vm/vm_supervisor:ro" \ + -v "$(pwd)/firecracker:/opt/aleph-vm/firecracker:ro" \ + --device /dev/kvm \ + -p 4020:4020 \ + alephim/vm-supervisor-dev $@ diff --git a/docker/vm_supervisor-dev-docker.dockerfile b/docker/vm_supervisor-dev-docker.dockerfile new file mode 100644 index 000000000..7bedfd13c --- /dev/null +++ b/docker/vm_supervisor-dev-docker.dockerfile @@ -0,0 +1,57 @@ +# This is mainly a copy of the installation instructions from [vm_supervisor/README.md] + +FROM debian:bullseye + +RUN apt-get update && apt-get -y upgrade && apt-get install -y \ + sudo acl curl squashfs-tools git \ + python3 python3-aiohttp python3-msgpack python3-pip python3-aiodns python3-aioredis \ + python3-psutil python3-setproctitle python3-sqlalchemy python3-packaging python3-cpuinfo \ + && rm -rf /var/lib/apt/lists/* + +RUN useradd jailman + +RUN mkdir /opt/firecracker +RUN chown $(whoami) /opt/firecracker +RUN curl -fsSL https://github.com/firecracker-microvm/firecracker/releases/download/v1.1.1/firecracker-v1.1.1-x86_64.tgz | tar -xz --directory /opt/firecracker +RUN curl -fsSL -o /opt/firecracker/vmlinux.bin https://s3.amazonaws.com/spec.ccfc.min/img/quickstart_guide/x86_64/kernels/vmlinux.bin + +# Link binaries on version-agnostic paths: +RUN ln /opt/firecracker/release-*/firecracker-v* /opt/firecracker/firecracker +RUN ln /opt/firecracker/release-*/jailer-v* /opt/firecracker/jailer + +RUN pip3 install typing-extensions 'aleph-message>=0.1.19' + +RUN mkdir -p /var/lib/aleph/vm/jailer + +ENV PYTHONPATH /mnt + +# Networking only works in privileged containers +ENV ALEPH_VM_ALLOW_VM_NETWORKING False +ENV ALEPH_VM_NETWORK_INTERFACE "tap0" +# Jailer does not work in Docker containers +ENV ALEPH_VM_USE_JAILER False +# Use fake test data +ENV ALEPH_VM_FAKE_DATA True +# Allow connections from host +ENV ALEPH_VM_SUPERVISOR_HOST "0.0.0.0" + +# Make it easy to enter this command from a shell script +RUN echo "python3 -m vm_supervisor --print-settings --very-verbose --system-logs --profile -f ./examples/example_docker_container" >> /root/.bash_history + + +ENV BENCHMARK_FAKE_DATA_PROGRAM="/opt/aleph-vm/examples/example_docker_container" +ENV FAKE_DATA_MESSAGE="/opt/aleph-vm/examples/message_from_aleph_docker_runtime.json" +ENV FAKE_DATA_DATA="/opt/aleph-vm/examples/data/" +ENV FAKE_DATA_RUNTIME="/opt/aleph-vm/runtimes/aleph-docker/rootfs.squashfs" +ENV FAKE_DATA_VOLUME="/opt/aleph-vm/examples/volumes/docker-data.squashfs" + +RUN mkdir /opt/aleph-vm/ +COPY ./vm_supervisor /opt/aleph-vm/vm_supervisor +COPY ./firecracker /opt/aleph-vm/firecracker +COPY ./guest_api /opt/aleph-vm/guest_api +COPY ./examples /opt/aleph-vm/examples +COPY ./runtimes /opt/aleph-vm/runtimes + +WORKDIR /opt/aleph-vm + +CMD "bash" \ No newline at end of file diff --git a/examples/example_docker_container/entrypoint.sh b/examples/example_docker_container/entrypoint.sh new file mode 100755 index 000000000..00b53a5d9 --- /dev/null +++ b/examples/example_docker_container/entrypoint.sh @@ -0,0 +1,3 @@ +#!/bin/sh +docker image ls +docker run --rm -p 8080:8080 amozpay/hello_node \ No newline at end of file diff --git a/examples/message_from_aleph_docker_runtime.json b/examples/message_from_aleph_docker_runtime.json new file mode 100644 index 000000000..30b218729 --- /dev/null +++ b/examples/message_from_aleph_docker_runtime.json @@ -0,0 +1,99 @@ +{ + "_id": { + "$oid": "6080402d7f44efefd611dc1e" + }, + "chain": "ETH", + "item_hash": "fake-hash-fake-hash-fake-hash-fake-hash-fake-hash-fake-hash-hash", + "sender": "0x9319Ad3B7A8E0eE24f2E639c40D8eD124C5520Ba", + "type": "PROGRAM", + "channel": "Fun-dApps", + "confirmed": true, + "content": { + "type": "vm-function", + "address": "0x9319Ad3B7A8E0eE24f2E639c40D8eD124C5520Ba", + "allow_amend": false, + "code": { + "encoding": "squashfs", + "entrypoint": "entrypoint.sh", + "ref": "7eb2eca2378ea8855336ed76c8b26219f1cb90234d04441de9cf8cb1c649d003", + "use_latest": false + }, + "variables": { + "VM_CUSTOM_NUMBER": "32", + "DOCKER_MOUNTPOINT": "/opt/docker" + }, + "on": { + "http": true, + "message": [ + { + "sender": "0xb5F010860b0964090d5414406273E6b3A8726E96", + "channel": "TEST" + }, + { + "content": { + "ref": "4d4db19afca380fdf06ba7f916153d0f740db9de9eee23ad26ba96a90d8a2920" + } + } + ] + }, + "environment": { + "reproducible": true, + "internet": true, + "aleph_api": true, + "shared_cache": true + }, + "resources": { + "vcpus": 1, + "memory": 512, + "seconds": 30 + }, + "runtime": { + "ref": "5f31b0706f59404fad3d0bff97ef89ddf24da4761608ea0646329362c662ba51", + "use_latest": false, + "comment": "Aleph Debian Linux with Docker" + }, + "volumes": [ + { + "mount": "/opt/docker/metadata", + "ref": "5f31b0706f59404fad3d0bff97ef89ddf24da4761608ea0646329362c662ba51", + "use_latest": false + }, + { + "mount": "/opt/docker/layers", + "ref": "5f31b0706f59404fad3d0bff97ef89ddf24da4761608ea0646329362c662ba51", + "use_latest": false + }, + { + "comment": "Working data persisted on the VM supervisor, not available on other nodes", + "mount": "/var/lib/example", + "name": "data", + "persistence": "host", + "size_mib": 5 + } + ], + "data": { + "encoding": "zip", + "mount": "/data", + "ref": "7eb2eca2378ea8855336ed76c8b26219f1cb90234d04441de9cf8cb1c649d003", + "use_latest": false + }, + "export": { + "encoding": "zip", + "mount": "/data" + }, + "replaces": "0x9319Ad3B7A8E0eE24f2E639c40D8eD124C5520Ba", + "time": 1619017773.8950517 + }, + "item_content": "{\"type\": \"vm-function\", \"address\": \"0x9319Ad3B7A8E0eE24f2E639c40D8eD124C5520Ba\", \"allow_amend\": false, \"code\": {\"encoding\": \"squashfs\", \"entrypoint\": \"main:app\", \"ref\": \"7eb2eca2378ea8855336ed76c8b26219f1cb90234d04441de9cf8cb1c649d003\", \"use_latest\": false}, \"on\": {\"http\": true, \"message\": [{\"sender\": \"0xB31B787AdA86c6067701d4C0A250c89C7f1f29A5\", \"channel\": \"TEST\"}, {\"content\": {\"ref\": \"4d4db19afca380fdf06ba7f916153d0f740db9de9eee23ad26ba96a90d8a2920\"}}]}, \"environment\": {\"reproducible\": true, \"internet\": true, \"aleph_api\": true, \"shared_cache\": false}, \"resources\": {\"vcpus\": 1, \"memory\": 128, \"seconds\": 30}, \"runtime\": {\"ref\": \"5f31b0706f59404fad3d0bff97ef89ddf24da4761608ea0646329362c662ba51\", \"use_latest\": false, \"comment\": \"Aleph Alpine Linux with Python 3.8\"}, \"volumes\": [{\"mount\": \"/opt/venv\", \"ref\": \"5f31b0706f59404fad3d0bff97ef89ddf24da4761608ea0646329362c662ba51\", \"use_latest\": false}, {\"comment\": \"Working data persisted on the VM supervisor, not available on other nodes\", \"mount\": \"/var/lib/sqlite\", \"name\": \"database\", \"persistence\": \"host\", \"size_mib\": 5}], \"data\": {\"encoding\": \"zip\", \"mount\": \"/data\", \"ref\": \"7eb2eca2378ea8855336ed76c8b26219f1cb90234d04441de9cf8cb1c649d003\", \"use_latest\": false}, \"export\": {\"encoding\": \"zip\", \"mount\": \"/data\"}, \"replaces\": \"0x9319Ad3B7A8E0eE24f2E639c40D8eD124C5520Ba\", \"time\": 1619017773.8950517}", + "item_type": "inline", + "signature": "0x372da8230552b8c3e65c05b31a0ff3a24666d66c575f8e11019f62579bf48c2b7fe2f0bbe907a2a5bf8050989cdaf8a59ff8a1cbcafcdef0656c54279b4aa0c71b", + "size": 749, + "time": 1619017773.8950577, + "confirmations": [ + { + "chain": "ETH", + "height": 12284734, + "hash": "0x67f2f3cde5e94e70615c92629c70d22dc959a118f46e9411b29659c2fce87cdc" + } + ] +} diff --git a/examples/volumes/build_squashfs.sh b/examples/volumes/build_squashfs.sh index a48e133a0..4a546f8ed 100755 --- a/examples/volumes/build_squashfs.sh +++ b/examples/volumes/build_squashfs.sh @@ -10,5 +10,7 @@ else DOCKER_COMMAND=docker fi +echo DOCKER_COMMAND=$DOCKER_COMMAND + $DOCKER_COMMAND build -t aleph-vm-build-squashfs . $DOCKER_COMMAND run --rm -v "$(pwd)":/mnt aleph-vm-build-squashfs diff --git a/run_supervisor_host.sh b/run_supervisor_host.sh new file mode 100755 index 000000000..e7a547cc6 --- /dev/null +++ b/run_supervisor_host.sh @@ -0,0 +1,18 @@ +#!/bin/sh + +export PYTHONPATH=$(pwd) + +export ALEPH_VM_ALLOW_VM_NETWORKING=False +export ALEPH_VM_NETWORK_INTERFACE=tap0 +export ALEPH_VM_USE_JAILER=False +export ALEPH_VM_FAKE_DATA=True +export ALEPH_VM_SUPERVISOR_HOST=0.0.0.0 + +export BENCHMARK_FAKE_DATA_PROGRAM=$(pwd)/examples/example_docker_container +export FAKE_DATA_MESSAGE=$(pwd)/examples/message_from_aleph_docker_runtime.json +export FAKE_DATA_DATA=$(pwd)/examples/data/ +export FAKE_DATA_RUNTIME=$(pwd)/runtimes/aleph-docker/rootfs.squashfs +export FAKE_DATA_VOLUME=$(pwd)/examples/volumes/docker/layers:/opt/docker/layers,$(pwd)/examples/volumes/docker/metadata:/opt/docker/metadata + + +python3 -m vm_supervisor --print-settings --very-verbose --system-logs --profile -f ./examples/example_docker_container \ No newline at end of file diff --git a/runtimes/aleph-alpine-3.13-python/init0.sh b/runtimes/aleph-alpine-3.13-python/init0.sh index 914debaa8..a43ed7be5 100644 --- a/runtimes/aleph-alpine-3.13-python/init0.sh +++ b/runtimes/aleph-alpine-3.13-python/init0.sh @@ -43,5 +43,7 @@ log "Setup socat" socat UNIX-LISTEN:/tmp/socat-socket,fork,reuseaddr VSOCK-CONNECT:2:53 & log "Socat ready" +pip show aiohttp + # Replace this script with the manager exec /root/init1.py diff --git a/runtimes/aleph-alpine-3.13-python/init1.py b/runtimes/aleph-alpine-3.13-python/init1.py index 587b2bbed..368277016 100644 --- a/runtimes/aleph-alpine-3.13-python/init1.py +++ b/runtimes/aleph-alpine-3.13-python/init1.py @@ -10,22 +10,27 @@ logger.debug("Imports starting") -import ctypes -import asyncio -import os -import socket -from enum import Enum -import subprocess -import sys -import traceback -from contextlib import redirect_stdout -from dataclasses import dataclass, field -from io import StringIO -from os import system -from shutil import make_archive -from typing import Optional, Dict, Any, Tuple, List, NewType, Union, AsyncIterable - -import aiohttp +# import ctypes +# import asyncio +# import os +# import socket +# from enum import Enum +# import subprocess +# import sys +# import traceback +# from contextlib import redirect_stdout +# from dataclasses import dataclass, field +# from io import StringIO +# from os import system +# from shutil import make_archive +# from typing import Optional, Dict, Any, Tuple, List, NewType, Union, AsyncIterable + +from aiohttp import ( + ClientTimeout, + ClientConnectorError, + ClientSession +) + import msgpack logger.debug("Imports finished") @@ -326,13 +331,13 @@ async def run_executable_http(scope: dict) -> Tuple[Dict, Dict, str, Optional[by headers = None body = None - timeout = aiohttp.ClientTimeout(total=5) - async with aiohttp.ClientSession(timeout=timeout) as session: + timeout = ClientTimeout(total=5) + async with ClientSession(timeout=timeout) as session: while not body: try: tries += 1 headers, body = await make_request(session, scope) - except aiohttp.ClientConnectorError: + except ClientConnectorError: if tries > 20: raise await asyncio.sleep(0.05) @@ -361,7 +366,7 @@ async def process_instruction( # Close the cached session in aleph_client: from aleph_client.asynchronous import get_fallback_session - session: aiohttp.ClientSession = get_fallback_session() + session: ClientSession = get_fallback_session() await session.close() logger.debug("Aiohttp cached session closed") yield b"STOP\n" diff --git a/runtimes/aleph-debian-11-python/create_disk_image.sh b/runtimes/aleph-debian-11-python/create_disk_image.sh index 7fbb6209a..31b678cba 100755 --- a/runtimes/aleph-debian-11-python/create_disk_image.sh +++ b/runtimes/aleph-debian-11-python/create_disk_image.sh @@ -17,7 +17,6 @@ apt-get install -y --no-install-recommends --no-install-suggests \ python3-minimal \ openssh-server \ socat libsecp256k1-0 \ - \ python3-aiohttp python3-msgpack \ python3-setuptools \ python3-pip python3-cytoolz python3-pydantic \ diff --git a/runtimes/aleph-docker/create_disk_image.sh b/runtimes/aleph-docker/create_disk_image.sh new file mode 100755 index 000000000..ce6250ca3 --- /dev/null +++ b/runtimes/aleph-docker/create_disk_image.sh @@ -0,0 +1,112 @@ +#!/bin/sh + +rm -f ./rootfs.squashfs + +set -euf + +rm -fr ./rootfs +mkdir ./rootfs + +debootstrap --variant=minbase bullseye ./rootfs http://deb.debian.org/debian/ + +chroot ./rootfs /bin/sh <<EOT + +set -euf + +apt-get install -y --no-install-recommends --no-install-suggests \ + python3-minimal \ + openssh-server \ + socat libsecp256k1-0 \ + \ + python3-msgpack \ + python3-setuptools \ + python3-pip python3-cytoolz python3-pydantic \ + iproute2 unzip \ + curl\ + docker.io\ + cgroupfs-mount \ + build-essential python3-dev +pip3 install 'fastapi~=0.71.0' +pip3 install aiohttp + + + +echo "Pip installing aleph-client" +pip3 install 'aleph-client>=0.4.6' 'coincurve==15.0.0' + +# Compile all Python bytecode +python3 -m compileall -f /usr/local/lib/python3.9 + +echo "root:toor" | /usr/sbin/chpasswd + +mkdir -p /overlay + +mkdir -p /var/lib/docker +cd /var/lib/docker +mkdir -m 710 vfs +mkdir -m 700 image +mkdir -m 700 image/vfs +mkdir -m 700 plugins +mkdir -m 700 swarm +cmkdir -m 750 network +mkdir -m 700 trust +mkdir -m 701 volumes +mkdir -m 711 buildkit +mkdir -m 710 containers + +# Set up a login terminal on the serial console (ttyS0): +ln -s agetty /etc/init.d/agetty.ttyS0 +echo ttyS0 > /etc/securetty + +update-alternatives --set iptables /usr/sbin/iptables-legacy +EOT + +echo "PermitRootLogin yes" >> ./rootfs/etc/ssh/sshd_config + +# Generate SSH host keys +#systemd-nspawn -D ./rootfs/ ssh-keygen -q -N "" -t dsa -f /etc/ssh/ssh_host_dsa_key +#systemd-nspawn -D ./rootfs/ ssh-keygen -q -N "" -t rsa -b 4096 -f /etc/ssh/ssh_host_rsa_key +#systemd-nspawn -D ./rootfs/ ssh-keygen -q -N "" -t ecdsa -f /etc/ssh/ssh_host_ecdsa_key +#systemd-nspawn -D ./rootfs/ ssh-keygen -q -N "" -t ed25519 -f /etc/ssh/ssh_host_ed25519_key + +cat <<EOT > ./rootfs/etc/inittab +# /etc/inittab + +::sysinit:/sbin/init sysinit +::sysinit:/sbin/init boot +::wait:/sbin/init default + +# Set up a couple of getty's +tty1::respawn:/sbin/getty 38400 tty1 +tty2::respawn:/sbin/getty 38400 tty2 +tty3::respawn:/sbin/getty 38400 tty3 +tty4::respawn:/sbin/getty 38400 tty4 +tty5::respawn:/sbin/getty 38400 tty5 +tty6::respawn:/sbin/getty 38400 tty6 + +# Put a getty on the serial port +ttyS0::respawn:/sbin/getty -L ttyS0 115200 vt100 + +# Stuff to do for the 3-finger salute +::ctrlaltdel:/sbin/reboot + +# Stuff to do before rebooting +::shutdown:/sbin/init shutdown +EOT + +# Reduce size +rm -fr ./rootfs/root/.cache +rm -fr ./rootfs/var/cache +mkdir -p ./rootfs/var/cache/apt/archives/partial +rm -fr ./rootfs/usr/share/doc +rm -fr ./rootfs/usr/share/man +rm -fr ./rootfs/var/lib/apt/lists/ + +# Custom init +rm -f ./rootfs/sbin/init +cp ./init0.sh ./rootfs/sbin/init +cp ./init1.py ./rootfs/root/init1.py +chmod +x ./rootfs/sbin/init +chmod +x ./rootfs/root/init1.py + +mksquashfs ./rootfs/ ./rootfs.squashfs diff --git a/runtimes/aleph-docker/init0.sh b/runtimes/aleph-docker/init0.sh new file mode 100644 index 000000000..cea122214 --- /dev/null +++ b/runtimes/aleph-docker/init0.sh @@ -0,0 +1,53 @@ +#!/bin/sh + +set -euf + +mount -t proc proc /proc -o nosuid,noexec,nodev + +log() { + echo "$(cat /proc/uptime | awk '{printf $1}')" '|S' "$@" +} +log "init0.sh is launching" + +# Switch root from read-only ext4 to to read-write overlay +mkdir -p /overlay +/bin/mount -t tmpfs -o noatime,mode=0755 tmpfs /overlay +mkdir -p /overlay/root/rw /overlay/root/work +/bin/mount -o noatime,lowerdir=/,upperdir=/overlay/root/rw,workdir=/overlay/root/work -t overlay "overlayfs:/overlay/root/rw" /mnt +mkdir -p /mnt/rom +pivot_root /mnt /mnt/rom + +mount --move /rom/proc /proc +mount --move /rom/dev /dev + +mkdir -p /dev/pts +mkdir -p /dev/shm + +mount -t sysfs sys /sys -o nosuid,noexec,nodev +mount -t tmpfs run /run -o mode=0755,nosuid,nodev +#mount -t devtmpfs dev /dev -o mode=0755,nosuid +mount -t devpts devpts /dev/pts -o mode=0620,gid=5,nosuid,noexec +mount -t tmpfs shm /dev/shm -omode=1777,nosuid,nodev + + +# List block devices +lsblk + +#cat /proc/sys/kernel/random/entropy_avail + +# TODO: Move in init1 +mkdir -p /run/sshd +/usr/sbin/sshd & +log "SSH UP" + +log "Setup socat" +socat UNIX-LISTEN:/tmp/socat-socket,fork,reuseaddr VSOCK-CONNECT:2:53 & +log "Socat ready" + +cgroupfs-mount + +export PATH=$PATH:/usr/local/bin:/usr/bin:/usr/sbin + +log "INIT 0 DONE2" +# Replace this script with the manager +exec /root/init1.py diff --git a/runtimes/aleph-docker/init1.py b/runtimes/aleph-docker/init1.py new file mode 100644 index 000000000..a9abcf845 --- /dev/null +++ b/runtimes/aleph-docker/init1.py @@ -0,0 +1,592 @@ +#!/usr/bin/python3 -OO + +import logging +from time import sleep + +logging.basicConfig( + level=logging.DEBUG, + format="%(relativeCreated)4f |V %(levelname)s | %(message)s", +) +logger = logging.getLogger(__name__) + +logger.debug("Imports starting") + +import ctypes +import asyncio +import os +import socket +from enum import Enum +import subprocess +import sys +import traceback +from contextlib import redirect_stdout +from dataclasses import dataclass, field +from io import StringIO +from os import system + +from shutil import make_archive +from typing import Optional, Dict, Any, Tuple, List, NewType, Union, AsyncIterable + +import aiohttp +import msgpack + +logger.debug("Imports finished") + +ASGIApplication = NewType("AsgiApplication", Any) + +class Encoding(str, Enum): + plain = "plain" + zip = "zip" + squashfs = "squashfs" + + +class Interface(str, Enum): + asgi = "asgi" + executable = "executable" + + +class ShutdownException(Exception): + pass + + +@dataclass +class Volume: + mount: str + device: str + read_only: bool + + +@dataclass +class ConfigurationPayload: + code: bytes + encoding: Encoding + entrypoint: str + input_data: bytes + interface: Interface + vm_hash: str + ip: Optional[str] = None + route: Optional[str] = None + dns_servers: List[str] = field(default_factory=list) + volumes: List[Volume] = field(default_factory=list) + variables: Optional[Dict[str, str]] = None + + +@dataclass +class RunCodePayload: + scope: Dict + + +# Open a socket to receive instructions from the host +s = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) +s.bind((socket.VMADDR_CID_ANY, 52)) +s.listen() + +# Send the host that we are ready +s0 = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) +s0.connect((2, 52)) +s0.close() + +# Configure aleph-client to use the guest API +os.environ["ALEPH_API_HOST"] = "http://localhost" +os.environ["ALEPH_API_UNIX_SOCKET"] = "/tmp/socat-socket" +os.environ["ALEPH_REMOTE_CRYPTO_HOST"] = "http://localhost" +os.environ["ALEPH_REMOTE_CRYPTO_UNIX_SOCKET"] = "/tmp/socat-socket" + +logger.debug("init1.py is launching") + + +def setup_hostname(hostname: str): + os.environ["ALEPH_ADDRESS_TO_USE"] = hostname + system(f"hostname {hostname}") + + +def setup_variables(variables: Optional[Dict[str, str]]): + if variables is None: + return + for key, value in variables.items(): + print(key) + os.environ[key] = value + + +def setup_network( + ip: Optional[str], route: Optional[str], dns_servers: Optional[List[str]] = None +): + """Setup the system with info from the host.""" + dns_servers = dns_servers or [] + if not os.path.exists("/sys/class/net/eth0"): + logger.info("No network interface eth0") + return + + if not ip: + logger.info("No network IP") + return + + logger.debug("Setting up networking") + logger.debug("IP ADDR:" + ip) + system("ip addr add 127.0.0.1/8 dev lo brd + scope host") + system("ip addr add ::1/128 dev lo") + system("ip link set lo up") + system(f"ip addr add {ip}/24 dev eth0") + system("ip link set eth0 up") + + if route: + system(f"ip route add default via {route} dev eth0") + logger.debug("IP and route set") + else: + logger.warning("IP set with no network route") + + with open("/etc/resolv.conf", "wb") as resolvconf_fd: + for server in dns_servers: + resolvconf_fd.write(f"nameserver {server}\n".encode()) + + +def setup_input_data(input_data: bytes): + logger.debug("Extracting data") + if input_data: + # Unzip in /data + if not os.path.exists("/opt/input.zip"): + open("/opt/input.zip", "wb").write(input_data) + os.makedirs("/data", exist_ok=True) + os.system("unzip -q /opt/input.zip -d /data") + + +def setup_volumes(volumes: List[Volume]): + for volume in volumes: + logger.debug(f"Mounting /dev/{volume.device} on {volume.mount}") + os.makedirs(volume.mount, exist_ok=True) + if volume.read_only: + system(f"mount -t squashfs -o ro /dev/{volume.device} {volume.mount}") + else: + system(f"mount -o rw /dev/{volume.device} {volume.mount}") + +def setup_code_asgi( + code: bytes, encoding: Encoding, entrypoint: str +) -> ASGIApplication: + # Allow importing packages from /opt/packages + sys.path.append("/opt/packages") + + logger.debug("Extracting code") + if encoding == Encoding.squashfs: + sys.path.append("/opt/code") + module_name, app_name = entrypoint.split(":", 1) + logger.debug("import module") + module = __import__(module_name) + for level in module_name.split(".")[1:]: + module = getattr(module, level) + app: ASGIApplication = getattr(module, app_name) + elif encoding == Encoding.zip: + # Unzip in /opt and import the entrypoint from there + if not os.path.exists("/opt/archive.zip"): + open("/opt/archive.zip", "wb").write(code) + logger.debug("Run unzip") + os.system("unzip -q /opt/archive.zip -d /opt") + sys.path.append("/opt") + module_name, app_name = entrypoint.split(":", 1) + logger.debug("import module") + module = __import__(module_name) + for level in module_name.split(".")[1:]: + module = getattr(module, level) + app: ASGIApplication = getattr(module, app_name) + elif encoding == Encoding.plain: + # Execute the code and extract the entrypoint + locals: Dict[str, Any] = {} + exec(code, globals(), locals) + app: ASGIApplication = locals[entrypoint] + else: + raise ValueError(f"Unknown encoding '{encoding}'") + return app + + +def setup_code_executable( + code: bytes, encoding: Encoding, entrypoint: str +) -> subprocess.Popen: + logger.debug("Extracting code") + if encoding == Encoding.squashfs: + path = f"/opt/code/{entrypoint}" + if not os.path.isfile(path): + os.system("find /opt/code/") + raise FileNotFoundError(f"No such file: {path}") + os.system(f"chmod +x {path}") + elif encoding == Encoding.zip: + open("/opt/archive.zip", "wb").write(code) + logger.debug("Run unzip") + os.makedirs("/opt/code", exist_ok=True) + os.system("unzip /opt/archive.zip -d /opt/code") + path = f"/opt/code/{entrypoint}" + if not os.path.isfile(path): + os.system("find /opt/code") + raise FileNotFoundError(f"No such file: {path}") + os.system(f"chmod +x {path}") + elif encoding == Encoding.plain: + os.makedirs("/opt/code", exist_ok=True) + path = f"/opt/code/executable {entrypoint}" + open(path, "wb").write(code) + os.system(f"chmod +x {path}") + else: + raise ValueError(f"Unknown encoding '{encoding}'. This should never happen.") + + process = subprocess.Popen(path) + return process + + +def setup_code( + code: bytes, encoding: Encoding, entrypoint: str, interface: Interface +) -> Union[ASGIApplication, subprocess.Popen]: + + logger.debug("\n\n\nCODE") + logger.debug(interface) + logger.debug(code.decode() + "\n\n\n") + + if interface == Interface.asgi: + return setup_code_asgi(code=code, encoding=encoding, entrypoint=entrypoint) + elif interface == Interface.executable: + return setup_code_executable( + code=code, encoding=encoding, entrypoint=entrypoint + ) + else: + raise ValueError("Invalid interface. This should never happen.") + + +async def run_python_code_http( + application: ASGIApplication, scope: dict +) -> Tuple[Dict, Dict, str, Optional[bytes]]: + + logger.debug("Running code") + with StringIO() as buf, redirect_stdout(buf): + # Execute in the same process, saves ~20ms than a subprocess + + # The body should not be part of the ASGI scope itself + body: bytes = scope.pop("body") + + async def receive(): + type_ = ( + "http.request" + if scope["type"] in ("http", "websocket") + else "aleph.message" + ) + return {"type": type_, "body": body, "more_body": False} + + send_queue: asyncio.Queue = asyncio.Queue() + + async def send(dico): + await send_queue.put(dico) + + # TODO: Better error handling + logger.debug("Awaiting application...") + await application(scope, receive, send) + + logger.debug("Waiting for headers") + headers: Dict + if scope["type"] == "http": + headers = await send_queue.get() + else: + headers = {} + + logger.debug("Waiting for body") + body: Dict = await send_queue.get() + + logger.debug("Waiting for buffer") + output = buf.getvalue() + + logger.debug(f"Headers {headers}") + logger.debug(f"Body {body}") + logger.debug(f"Output {output}") + + logger.debug("Getting output data") + output_data: bytes + if os.path.isdir("/data") and os.listdir("/data"): + make_archive("/opt/output", "zip", "/data") + with open("/opt/output.zip", "rb") as output_zipfile: + output_data = output_zipfile.read() + else: + output_data = b"" + + logger.debug("Returning result") + return headers, body, output, output_data + + +async def make_request(session, scope): + async with session.request( + scope["method"], + url="http://localhost:8080{}".format(scope["path"]), + params=scope["query_string"], + headers=[(a.decode("utf-8"), b.decode("utf-8")) for a, b in scope["headers"]], + data=scope.get("body", None), + ) as resp: + headers = { + "headers": [ + (a.encode("utf-8"), b.encode("utf-8")) for a, b in resp.headers.items() + ], + "status": resp.status, + } + body = {"body": await resp.content.read()} + return headers, body + + +async def run_executable_http(scope: dict) -> Tuple[Dict, Dict, str, Optional[bytes]]: + logger.debug("Calling localhost") + + tries = 0 + headers = None + body = None + + timeout = aiohttp.ClientTimeout(total=5) + async with aiohttp.ClientSession(timeout=timeout) as session: + while not body: + try: + tries += 1 + headers, body = await make_request(session, scope) + except aiohttp.ClientConnectorError: + if tries > 20: + raise + await asyncio.sleep(0.05) + + output = "" # Process stdout is not captured per request + output_data = None + logger.debug("Returning result") + return headers, body, output, output_data + + +async def process_instruction( + instruction: bytes, + interface: Interface, + application: Union[ASGIApplication, subprocess.Popen], +) -> AsyncIterable[bytes]: + + if instruction == b"halt": + logger.info("Received halt command") + system("sync") + logger.debug("Filesystems synced") + if isinstance(application, subprocess.Popen): + application.terminate() + logger.debug("Application terminated") + # application.communicate() + else: + # Close the cached session in aleph_client: + from aleph_client.asynchronous import get_fallback_session + + session: aiohttp.ClientSession = get_fallback_session() + await session.close() + logger.debug("Aiohttp cached session closed") + yield b"STOP\n" + logger.debug("Supervisor informed of halt") + raise ShutdownException + elif instruction.startswith(b"!"): + # Execute shell commands in the form `!ls /` + msg = instruction[1:].decode() + try: + process_output = subprocess.check_output( + msg, stderr=subprocess.STDOUT, shell=True + ) + yield process_output + except subprocess.CalledProcessError as error: + yield str(error).encode() + b"\n" + error.output + else: + # Python + logger.debug("msgpack.loads (") + msg_ = msgpack.loads(instruction, raw=False) + logger.debug("msgpack.loads )") + payload = RunCodePayload(**msg_) + + output: Optional[str] = None + try: + headers: Dict + body: Dict + output_data: Optional[bytes] + + if interface == Interface.asgi: + headers, body, output, output_data = await run_python_code_http( + application=application, scope=payload.scope + ) + elif interface == Interface.executable: + headers, body, output, output_data = await run_executable_http( + scope=payload.scope + ) + else: + raise ValueError("Unknown interface. This should never happen") + + result = { + "headers": headers, + "body": body, + "output": output, + "output_data": output_data, + } + yield msgpack.dumps(result, use_bin_type=True) + except Exception as error: + yield msgpack.dumps( + { + "error": str(error), + "traceback": str(traceback.format_exc()), + "output": output, + } + ) + + +def receive_data_length(client) -> int: + """Receive the length of the data to follow.""" + buffer = b"" + for _ in range(9): + byte = client.recv(1) + if byte == b"\n": + break + else: + buffer += byte + return int(buffer) + + +def load_configuration(data: bytes) -> ConfigurationPayload: + msg_ = msgpack.loads(data, raw=False) + msg_["volumes"] = [Volume(**volume_dict) for volume_dict in msg_.get("volumes")] + return ConfigurationPayload(**msg_) + + +def receive_config(client) -> ConfigurationPayload: + length = receive_data_length(client) + data = b"" + while len(data) < length: + data += client.recv(1024 * 1024) + return load_configuration(data) + + +def setup_docker(): + logger.debug("Setting up docker") + docker_mountpoint = os.environ.get("DOCKER_MOUNTPOINT") + os.makedirs("/docker", exist_ok=True) + system("bin/mount -t tmpfs -o noatime,mode=0755 tmpfs /docker") + os.makedirs("/docker/persist/layers/work", exist_ok=True) + os.makedirs("/docker/persist/metadata/work", exist_ok=True) + os.makedirs("/docker/persist/layers/upper", exist_ok=True) + os.makedirs("/docker/persist/metadata/upper", exist_ok=True) + # docker_daemon = subprocess.Popen(["/usr/sbin/dockerd", "--storage-driver=vfs"], stderr=subprocess.PIPE, stdout=subprocess.PIPE, encoding='utf-8') + system(f'/bin/mount -o noatime,lowerdir={docker_mountpoint}/layers,upperdir=/docker/persist/layers/upper,workdir=/docker/persist/layers/work -t overlay "overlayfs:/docker/persist/layers/upper" /var/lib/docker/vfs') + system(f'/bin/mount -o noatime,lowerdir={docker_mountpoint}/metadata,upperdir=/docker/persist/metadata/upper,workdir=/docker/persist/metadata/work -t overlay "overlayfs:/docker/persist/metadata/upper" /var/lib/docker/image/vfs') + print("Before daemon:\n") + os.system("stat -f /var/lib/docker/image/vfs/repositories.json") + os.system("cat /var/lib/docker/image/vfs/repositories.json") + print("here") + # docker_daemon = subprocess.Popen(["/usr/sbin/dockerd", "--storage-driver=vfs"], stderr=subprocess.PIPE, stdout=subprocess.PIPE, encoding='utf-8') + print("there") + os.system("docker info") + docker_daemon = subprocess.Popen(["/usr/sbin/dockerd", "--storage-driver=vfs"], stderr=subprocess.PIPE, encoding='utf-8') + # os.system("/usr/sbin/dockerd --storage-driver=vfs") + while os.system("docker ps > /dev/null 2>&1") != 0: + # print("yulu") + # stderr = docker_daemon.communicate() + # print("stderr: " + stderr) + continue + print("After daemon:\n") + os.system("stat -f /var/lib/docker/image/vfs/repositories.json") + os.system("cat /var/lib/docker/image/vfs/repositories.json") + system("mount") + + +def setup_system(config: ConfigurationPayload): + setup_hostname(config.vm_hash) + setup_variables(config.variables) + setup_volumes(config.volumes) + setup_docker() + print("dameon ready") + setup_network(config.ip, config.route, config.dns_servers) + setup_input_data(config.input_data) + logger.debug("Setup finished") + + +def umount_volumes(volumes: List[Volume]): + "Umount user related filesystems" + system("sync") + for volume in volumes: + logger.debug(f"Umounting /dev/{volume.device} on {volume.mount}") + system(f"umount {volume.mount}") + + +async def main(): + client, addr = s.accept() + + logger.debug("Receiving setup...") + config = receive_config(client) + setup_system(config) + try: + app: Union[ASGIApplication, subprocess.Popen] = setup_code( + config.code, config.encoding, config.entrypoint, config.interface + ) + client.send(msgpack.dumps({"success": True})) + except Exception as error: + client.send( + msgpack.dumps( + { + "success": False, + "error": str(error), + "traceback": str(traceback.format_exc()), + } + ) + ) + logger.exception("Program could not be started") + raise + + class ServerReference: + "Reference used to close the server from within `handle_instruction" + server: asyncio.AbstractServer + + server_reference = ServerReference() + + async def handle_instruction(reader, writer): + data = await reader.read(1000_1000) # Max 1 Mo + + logger.debug("Init received msg") + if logger.level <= logging.DEBUG: + data_to_print = f"{data[:500]}..." if len(data) > 500 else data + logger.debug(f"<<<\n\n{data_to_print}\n\n>>>") + + try: + async for result in process_instruction( + instruction=data, interface=config.interface, application=app + ): + writer.write(result) + await writer.drain() + + logger.debug("Instruction processed") + except ShutdownException: + logger.info("Initiating shutdown") + writer.write(b"STOPZ\n") + await writer.drain() + logger.debug("Shutdown confirmed to supervisor") + server_reference.server.close() + logger.debug("Supervisor socket server closed") + finally: + writer.close() + + server = await asyncio.start_server(handle_instruction, sock=s) + server_reference.server = server + + addr = server.sockets[0].getsockname() + print(f"Serving on {addr}") + + try: + async with server: + await server.serve_forever() + except asyncio.CancelledError: + logger.debug("Server was properly cancelled") + finally: + logger.warning("System shutdown") + server.close() + logger.debug("Server closed") + umount_volumes(config.volumes) + logger.debug("User volumes unmounted") + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + asyncio.run(main()) + + logger.info("Unmounting system filesystems") + system("umount /dev/shm") + system("umount /dev/pts") + system("umount -a") + + logger.info("Sending reboot syscall") + # Send reboot syscall, see man page + # https://man7.org/linux/man-pages/man2/reboot.2.html + libc = ctypes.CDLL(None) + libc.syscall(169, 0xFEE1DEAD, 672274793, 0x4321FEDC, None) + # The exit should not happen due to system halt. + sys.exit(0) diff --git a/runtimes/aleph-docker/update_inits.sh b/runtimes/aleph-docker/update_inits.sh new file mode 100644 index 000000000..55a1c99b1 --- /dev/null +++ b/runtimes/aleph-docker/update_inits.sh @@ -0,0 +1,14 @@ +#!/bin/sh + +rm ./rootfs.squashfs + +set -euf + +cp ./init0.sh ./rootfs/sbin/init +cp ./init1.py ./rootfs/root/init1.py +chmod +x ./rootfs/sbin/init +chmod +x ./rootfs/root/init1.py + +mksquashfs ./rootfs/ ./rootfs.squashfs + +echo "OK" diff --git a/vm_supervisor/conf.py b/vm_supervisor/conf.py index 59e497099..44d1a382f 100644 --- a/vm_supervisor/conf.py +++ b/vm_supervisor/conf.py @@ -5,7 +5,8 @@ from os.path import isfile, join, exists, abspath, isdir from pathlib import Path from subprocess import check_output -from typing import NewType, Optional, List, Dict, Any +from typing import NewType, Optional, List +from os import environ from pydantic import BaseSettings, Field @@ -118,20 +119,25 @@ class Settings(BaseSettings): FAKE_DATA_PROGRAM: Optional[Path] = None BENCHMARK_FAKE_DATA_PROGRAM = Path( - abspath(join(__file__, "../../examples/example_fastapi")) + environ.get("BENCHMARK_FAKE_DATA_PROGRAM") + or abspath(join(__file__, "../../examples/example_fastapi")) ) FAKE_DATA_MESSAGE = Path( - abspath(join(__file__, "../../examples/message_from_aleph.json")) + environ.get("FAKE_DATA_MESSAGE") + or abspath(join(__file__, "../../examples/message_from_aleph.json")) ) FAKE_DATA_DATA: Optional[Path] = Path( - abspath(join(__file__, "../../examples/data/")) + environ.get("FAKE_DATA_DATA") + or abspath(join(__file__, "../../examples/data/")) ) FAKE_DATA_RUNTIME = Path( - abspath(join(__file__, "../../runtimes/aleph-debian-11-python/rootfs.squashfs")) + environ.get("FAKE_DATA_RUNTIME") + or abspath(join(__file__, "../../runtimes/aleph-debian-11-python/rootfs.squashfs")) ) FAKE_DATA_VOLUME: Optional[Path] = Path( - abspath(join(__file__, "../../examples/volumes/volume-venv.squashfs")) + environ.get("FAKE_DATA_VOLUME") + or abspath(join(__file__, "../../examples/volumes/volume-venv.squashfs")) ) CHECK_FASTAPI_VM_ID = ( @@ -176,9 +182,15 @@ def check(self): assert isfile( self.FAKE_DATA_RUNTIME ), "Local runtime .squashfs build is missing" - assert isfile( - self.FAKE_DATA_VOLUME - ), "Local data volume .squashfs is missing" + if "," in str(self.FAKE_DATA_VOLUME): # allow multiple volumes with format "host_path:mountpoint,host_path:mountpoint" + for volume_bind in str(self.FAKE_DATA_VOLUME).split(","): + assert isfile( + volume_bind.split(":")[0] + ), f"Local data volume {volume_bind.split(':')[0]} is missing" + else: + assert isfile( + self.FAKE_DATA_VOLUME + ), f"Local data volume {volume_bind.split(':')[0]} is missing" def setup(self): os.makedirs(self.MESSAGE_CACHE, exist_ok=True) diff --git a/vm_supervisor/storage.py b/vm_supervisor/storage.py index 2945b249a..953b28fbd 100644 --- a/vm_supervisor/storage.py +++ b/vm_supervisor/storage.py @@ -157,9 +157,14 @@ def create_ext4(path: Path, size_mib: int) -> bool: async def get_volume_path(volume: MachineVolume, namespace: str) -> Path: if isinstance(volume, ImmutableVolume): + print(volume) ref = volume.ref if settings.FAKE_DATA_PROGRAM and settings.FAKE_DATA_VOLUME: - return Path(settings.FAKE_DATA_VOLUME) + if "," not in str(settings.FAKE_DATA_VOLUME): + return Path(settings.FAKE_DATA_VOLUME) + for volume_bind in str(settings.FAKE_DATA_VOLUME).split(","): + if volume.mount == volume_bind.split(":")[1]: + return volume_bind.split(":")[0] cache_path = Path(join(settings.DATA_CACHE, ref)) url = f"{settings.CONNECTOR_URL}/download/data/{ref}"