Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] enable xcvrd with xcvr emulation in the VS environment #21242

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
vs: extend platform_module to emulate xcvrs
Signed-off-by: Wataru Ishida <wataru.ishid@gmail.com>
  • Loading branch information
ishidawataru committed Dec 20, 2024
commit cb538b53919c5a00653f1854b13f8088a1edcf54
51 changes: 48 additions & 3 deletions platform/vs/sonic-platform-modules-vs/sonic_platform/chassis.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,22 @@
#

try:
from sonic_platform_base.chassis_base import ChassisBase
from sonic_platform_base.module_base import ModuleBase
import os
import time
import json
import threading
import sys

import grpc

from xcvr_emu.proto import emulator_pb2 as pb2
# see https://github.com/grpc/grpc/issues/29459#issuecomment-1641587881
proto_dir = os.path.dirname(pb2.__file__)
sys.path.append(proto_dir)

from xcvr_emu.proto import emulator_pb2_grpc

from sonic_platform_base.chassis_base import ChassisBase
from .sfp import Sfp

except ImportError as e:
raise ImportError(str(e) + "- required module not found")
Expand All @@ -24,6 +35,9 @@ def __init__(self):
ChassisBase.__init__(self)
self.metadata_file = '/etc/sonic/vs_chassis_metadata.json'
self.metadata = self._read_metadata()
channel = grpc.insecure_channel("localhost:50051")
self.xcvr_emu = emulator_pb2_grpc.SfpEmulatorServiceStub(channel)
self.sfps = {}

def _read_metadata(self):
metadata = {}
Expand Down Expand Up @@ -53,3 +67,34 @@ def get_my_slot(self):
else:
raise ValueError("Invalid configuration: Neither supervisor nor line card")

def get_sfp(self, index):
if index not in self.sfps:
sfp = Sfp(index, self.xcvr_emu)
self.sfps[index] = (sfp, sfp.get_presence())

return self.sfps[index][0]

def get_change_event(self, timeout=0):

port_dict = {}
change_dict = {"sfp": port_dict}

start = time.time()

while True:
for (sfp, present) in self.sfps.values():
current = sfp.get_presence()
if current != present:
port_dict[sfp.index] = '1' if current else '0'
self.sfps[sfp.index] = (sfp, current)

if len(port_dict):
return True, change_dict

if timeout > 0 and (time.time() - start > (timeout / 1000)):
return True, change_dict

time.sleep(0.5)

def get_reboot_cause(self):
return ("REBOOT_CAUSE_NON_HARDWARE", "Unknown")
95 changes: 95 additions & 0 deletions platform/vs/sonic-platform-modules-vs/sonic_platform/sfp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import grpc

from xcvr_emu.proto.emulator_pb2 import ReadRequest, WriteRequest, GetInfoRequest

from sonic_platform_base.sfp_base import SfpBase


class Sfp(SfpBase):
def __init__(self, index, xcvr_emu):
self.index = index
self.xcvr_emu = xcvr_emu
super().__init__()

def get_model(self):
api = self.get_xcvr_api()
return api.get_model() if api is not None else None

def get_serial(self):
api = self.get_xcvr_api()
return api.get_serial() if api is not None else None

def get_transceiver_info(self):
api = self.get_xcvr_api()
return api.get_transceiver_info() if api is not None else None

def get_transceiver_info_firmware_versions(self):
api = self.get_xcvr_api()
return api.get_transceiver_info_firmware_versions() if api is not None else None

def get_transceiver_bulk_status(self):
api = self.get_xcvr_api()
return api.get_transceiver_bulk_status() if api is not None else None

def get_transceiver_threshold_info(self):
api = self.get_xcvr_api()
return api.get_transceiver_threshold_info() if api is not None else None

def get_transceiver_status(self):
api = self.get_xcvr_api()
return api.get_transceiver_status() if api is not None else None

def get_transceiver_loopback(self):
api = self.get_xcvr_api()
return api.get_transceiver_loopback() if api is not None else None

def is_coherent_module(self):
api = self.get_xcvr_api()
return api.is_coherent_module() if api is not None else None

def get_transceiver_pm(self):
api = self.get_xcvr_api()
return api.get_transceiver_pm() if api is not None else None

def get_presence(self):
try:
info = self.xcvr_emu.GetInfo(GetInfoRequest(index=self.index))
except grpc.RpcError:
return False
return info.present

def is_replaceable(self):
return True

def read_eeprom(self, offset, num_bytes):
if not self.get_presence():
return None
# convert optoe offset to SFF page and offset
# optoe maps the SFF 2D address to a linear address
page = offset // 128
if page > 0:
page = page - 1

if offset > 128:
offset = (offset % 128) + 128

return self.xcvr_emu.Read(
ReadRequest(index=self.index, offset=offset, page=page, length=num_bytes)
).data

def write_eeprom(self, offset, num_bytes, write_buffer):
assert len(write_buffer) <= num_bytes
# convert optoe offset to SFF page and offset
# optoe maps the SFF 2D address to a linear address
page = offset // 128
if page > 0:
page = page - 1

if offset > 128:
offset = (offset % 128) + 128

return self.xcvr_emu.Write(
WriteRequest(
index=self.index, page=page, offset=offset, length=num_bytes, data=bytes(write_buffer)
)
)