Skip to content

Commit

Permalink
Update fetching to be sync for now
Browse files Browse the repository at this point in the history
  • Loading branch information
SinaKhalili committed Nov 21, 2024
1 parent 4c29154 commit fb5215d
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 43 deletions.
56 changes: 13 additions & 43 deletions backend/scripts/generate_ucache.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ class Endpoint:
params: dict


def process_multiple_endpoints(state_pickle_path: str, endpoints: list[Endpoint]):
async def process_multiple_endpoints(state_pickle_path: str, endpoints: list[Endpoint]):
"""Process a single endpoint in its own process"""
state = BackendState()
state.initialize(os.getenv("RPC_URL"))
asyncio.run(state.load_pickle_snapshot(state_pickle_path))
await state.load_pickle_snapshot(state_pickle_path)

results = []

Expand Down Expand Up @@ -93,76 +93,46 @@ async def run_request():
json.dump(response_data, f)
return f"Generated cache for {endpoint}"

asyncio.run(run_request())
await run_request()

await state.close()
return results


async def generate_ucache(state: BackendState, endpoints: list[Endpoint]):
async def generate_ucache(endpoints: list[Endpoint]):
"""Generate ucache files by splitting endpoints across processes"""
ucache_dir = "ucache"
use_snapshot = os.getenv("USE_SNAPSHOT", "false").lower() == "true"

print("Generating ucache")
if not os.path.exists(ucache_dir):
os.makedirs(ucache_dir)

state_pickle_path = sorted(glob.glob("pickles/*"))[-1]
# n_processes = max(1, multiprocessing.cpu_count() - 1)
n_processes = 1
with ProcessPoolExecutor(max_workers=n_processes) as executor:
futures = [
executor.submit(process_multiple_endpoints, state_pickle_path, list(chunk))
for chunk in chunk_list(endpoints, n_processes)
]

for future in futures:
for result in future.result():
print(result)
await process_multiple_endpoints(state_pickle_path, endpoints)


async def main():
load_dotenv()
state = BackendState()
state.initialize(os.getenv("RPC_URL"))

use_snapshot = os.getenv("USE_SNAPSHOT", "false").lower() == "true"
print(f"use_snapshot: {use_snapshot}")

if not use_snapshot:
state = BackendState()
state.initialize(os.getenv("RPC_URL"))
print("Taking snapshot")
await state.bootstrap()
await state.take_pickle_snapshot()
await state.close()

endpoints = [
# Endpoint(
# endpoint="price-shock/usermap",
# params={
# "asset_group": "ignore+stables",
# "oracle_distortion": 0.05,
# "n_scenarios": 5,
# },
# ),
Endpoint(
endpoint="asset-liability/matrix",
params={"mode": 0, "perp_market_index": 0},
),
# Endpoint(
# endpoint="asset-liability/matrix",
# params={"mode": 0, "perp_market_index": 2},
# ),
# Endpoint(
# endpoint="asset-liability/matrix",
# params={"mode": 0, "perp_market_index": 3},
# ),
# Endpoint(
# endpoint="asset-liability/matrix",
# params={"mode": 0, "perp_market_index": 4},
# ),
# Endpoint(
# endpoint="asset-liability/matrix",
# params={"mode": 2, "perp_market_index": 30},
# ),
]

await generate_ucache(state, endpoints)
await generate_ucache(endpoints)


if __name__ == "__main__":
Expand Down
4 changes: 4 additions & 0 deletions backend/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ async def load_pickle_snapshot(self, directory: str):
self.last_oracle_slot = pickle_map["perporacles"].split("_")[-1].split(".")[0]
return pickle_map

async def close(self):
await self.dc.unsubscribe()
await self.connection.close()


class BackendRequest(Request):
@property
Expand Down

0 comments on commit fb5215d

Please sign in to comment.