Skip to content

nickyharpor/somnia_data_streams_sdk

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

6 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Somnia Data Streams Python SDK

The Somnia Data Streams Python SDK enables streaming data on-chain, integrated with off-chain reactivity to unlock new paradigms in the blockchain ecosystem.

PyPI Version

Features

  • Easy and intuitive interface and flow
  • Consistent with Somnia Data Streams JS/TS SDK
  • Schema encoding and decoding for structured data
  • Type-safe API with comprehensive type definitions
  • Asynchronized architecture for better CPU utilization on high load
  • Extensive unit tests and integration tests
  • Example code snippets for all functionalities

Installation

pip install somnia-data-streams-sdk

Quick Start

Initialize the SDK

from somnia_data_streams_sdk import SDK, SOMNIA_TESTNET

# Read-only access (no private key needed)
sdk = SDK.create_for_chain(SOMNIA_TESTNET["id"])

# With write access (provide private key for transaction signing)
sdk = SDK.create_for_chain(SOMNIA_TESTNET["id"], private_key="0x...")

Get All Registered Schemas

import asyncio

schemas = await sdk.streams.get_all_schemas()
for i, schema in enumerate(schemas):
    print(f"{i+1}. {schema}")

Compute Schema ID

test_schema = "uint256 balance, address owner"
schema_id = await sdk.streams.compute_schema_id(test_schema)
print(f"\nSchema ID for '{test_schema}': {schema_id}")

Check if Schema is Registered

is_registered = await sdk.streams.is_data_schema_registered(schema_id)
print(f"Schema registered: {is_registered}")

Schema Encoding/Decoding

from somnia_data_streams_sdk import SchemaEncoder, SchemaItem

encoder = SchemaEncoder("uint256 balance, address owner")
encoded = encoder.encode_data([
    SchemaItem(name="balance", type="uint256", value=666),
    SchemaItem(name="owner", type="address", value="0x..."),
])
print(f"Encoded Schema: {encoded}")

decoded = encoder.decode_data(encoded)
print("Decoded Schema:")
for item in decoded:
    print(f"  {item.name} ({item.type}): {item.value.value}")

Register a Data Schema (Consumes Gas)

from somnia_data_streams_sdk import DataSchemaRegistration

registrations = [
    DataSchemaRegistration(
        schema_name="your-unique-id-here-otherwise-wont-register",
        schema=test_schema,
        parent_schema_id=None
    )
]
tx_hash = await sdk.streams.register_data_schemas(registrations)
if tx_hash and isinstance(tx_hash, str) and tx_hash.startswith("0x"):
    print(f"Schema registered! TX: {tx_hash}")
else:
    print("Schema already registered or registration error")

Publish Data (Consumes Gas)

from eth_utils import to_hex, keccak
from somnia_data_streams_sdk import DataStream

data_id = to_hex(keccak(text="your-unique-id-here-for-this-data"))
data_streams = [
    DataStream(
        id=data_id,
        schema_id=schema_id,
        data=encoded,
    )
]
tx_hash = await sdk.streams.set(data_streams)
if tx_hash:
    print(f"Data published! TX: {tx_hash}")
else:
    print("Data publishing failed")

Read Data

data = await sdk.streams.get_all_publisher_data_for_schema(
    schema_id=schema_id,
    publisher=sdk.streams.web3_client.client.account.address,
)
    
if data:
    print(f"Found {len(data)} data points")
    if isinstance(data[0], list):  # Decoded data
        for i, decoded_items in enumerate(data):
            print(f"\nData point {i+1}:")
            for item in decoded_items:
                print(f"  {item.name}: {item.value.value}")
    else:  # Raw data
        print("Raw data (schema not public):", data)

Register Events (Consumes Gas)

from somnia_data_streams_sdk import EventSchema, EventParameter
from eth_utils import to_hex, keccak

event_signature = "TestV1(uint256 indexed x)"
event_id = "TestV1"

event_schemas = [
    EventSchema(
        params=[
            EventParameter(name="x", param_type="uint256", is_indexed=True)
        ],
        event_topic=to_hex(keccak(text=event_signature))
    )
]

tx_hash = await sdk.streams.register_event_schemas(
    ids=[event_id],
    schemas=event_schemas
)

if tx_hash and isinstance(tx_hash, str):
    print(f"Event schema registered! TX: 0x{tx_hash}")
else:
    print("Event schema registration failed or already registered")

time.sleep(5) # gives Somnia's blockchain a short time to register the event

tx_hash = await sdk.streams.manage_event_emitters_for_registered_streams_event(
    event_id, "0x...", True) # Replace 0x... with your public key
print(f"Event permission added! TX: 0x{tx_hash}")

Emit Events (Consumes Gas)

from eth_abi import encode
from somnia_data_streams_sdk import EventStream

event_data = encode(
    ["uint256"],
    [13] # Replace 13 with any unsigned int value you want to emit
)

events = [
    EventStream(
        id=event_id,
        argument_topics=[to_hex(keccak(text=event_signature))],
        data=event_data
    )
]

tx_hash = await sdk.streams.emit_events(events)
if tx_hash and isinstance(tx_hash, str):
    print(f"Events emitted! TX: 0x{tx_hash}")
else:
    print("Event emission failed")

Subscribe to Events

from somnia_data_streams_sdk import SubscriptionInitParams

def on_data(data):
    print(data)

def on_error(error):
    print(error)

subscription = await sdk.streams.subscribe(
    SubscriptionInitParams(
        somnia_streams_event_id="TestV1",
        eth_calls=[],
        on_data=on_data,
        on_error=on_error,
        only_push_changes=True
    )
)

print("Subscribed to TestV1. Press Ctrl+C to stop.")

try:
    await asyncio.Future()
except:
    await subscription.get("unsubscribe")

API Reference

Main Classes

  • SDK - Main SDK class for interacting with Somnia Data Streams
  • SchemaEncoder - Encode and decode data schemas

Chain Configuration

  • SOMNIA_TESTNET - Testnet configuration (Chain ID: 50312)
  • SOMNIA_MAINNET - Mainnet configuration (Chain ID: 5031)
  • get_chain_config(chain_id) - Get chain configuration by ID
  • get_default_rpc_url(chain_id) - Get default RPC URL for a chain

Frequently Used Types

  • SubscriptionInitParams
  • SchemaItem, SchemaDecodedItem
  • EventParameter, EventSchema, EventStream
  • DataStream, DataSchemaRegistration

Contribution Guide

If it's bug fix or code improvement (i.e. not a new feature), please make sure your code passes all tests before submitting a PR.

pytest -v -s

If it's a new feature, don't forget to write examples, unit tests, and integration tests for it.

About

Python SDK for Somnia Data Streams

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages