Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proof of concept implementation of get_run via api (RFC). #1993

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/fishtest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def group_finder(username, request):
config.add_route("api_get_elo", "/api/get_elo/{id}")
config.add_route("api_actions", "/api/actions")
config.add_route("api_calc_elo", "/api/calc_elo")
config.add_route("api_serialize_run", "/api/serialize_run/{id}")

config.scan()
return config.make_wsgi_app()
28 changes: 25 additions & 3 deletions server/fishtest/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from fishtest.schemas import api_access_schema, api_schema, gzip_data
from fishtest.stats.stat_util import SPRT_elo, get_elo
from fishtest.util import strip_run, worker_name
from fishtest.util import serialize, strip_run, worker_name
from pyramid.httpexceptions import (
HTTPBadRequest,
HTTPException,
Expand Down Expand Up @@ -126,7 +126,7 @@ def validate_request(self):
# is a supplied run_id correct?
if "run_id" in self.request_body:
run_id = self.request_body["run_id"]
run = self.request.rundb.get_run(run_id)
run = self.request.rundb.get_run(run_id, db=False)
if run is None:
self.handle_error("Invalid run_id: {}".format(run_id))
self.__run = run
Expand Down Expand Up @@ -620,4 +620,26 @@ def download_nn(self):


class InternalApi(GenericApi):
pass
@view_config(route_name="api_serialize_run")
def serialize_run(self):
# Must be hosted on the primary instance!!
if not self.request.rundb.is_primary_instance():
self.handle_error("This api must be hosted on the primary instance")
host_url = self.request.host_url
if host_url != "http://localhost":
self.handle_error("Access denied", exception=HTTPUnauthorized)
try:
run_id = self.request.matchdict["id"]
except Exception as e:
self.handle_error(str(e))
try:
run = self.request.rundb.get_run(run_id, cache=True)
except Exception as e:
self.handle_error(str(e))
if run is None:
self.handle_error("Run does not exist")
try:
s = serialize(run)
except Exception as e:
self.handle_error(str(e))
return Response(body=s, content_type="application/octet-stream")
34 changes: 29 additions & 5 deletions server/fishtest/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from datetime import datetime, timezone

import fishtest.stats.stat_util
import requests
from bson.binary import Binary
from bson.codec_options import CodecOptions
from bson.errors import InvalidId
Expand Down Expand Up @@ -39,6 +40,7 @@
GeneratorAsFileReader,
Scheduler,
crash_or_time,
deserialize,
estimate_game_duration,
format_results,
get_bad_workers,
Expand Down Expand Up @@ -670,14 +672,26 @@ def exit_run(self, signum, frame):
self.actiondb.system_event(message=f"stop fishtest@{self.port}")
sys.exit(0)

def get_run(self, r_id):
r_id = str(r_id)
def get_run(self, r_id, db=True, cache=None):
"""
If the instance is non-primary then there are two strategies
to get the run depending on the value of "db". If db=True
then Fishtest will load the run from the db.
If db=False then Fishtest will request the run from the
primary instance via an internal api. The first option is
probably(?) faster but it may give a slightly outdated run
which may in particular be missing some recently created tasks.

The cache argument is for testing and should be left alone.
"""
try:
r_id_obj = ObjectId(r_id)
except InvalidId:
print(f"Invalid object id {r_id}", flush=True)
return None

if self.is_primary_instance():
if cache is None:
cache = self.is_primary_instance()
if cache:
with self.run_cache_lock:
if r_id in self.run_cache:
self.run_cache[r_id]["last_access_time"] = time.time()
Expand All @@ -692,7 +706,17 @@ def get_run(self, r_id):
}
return run
else:
return self.runs.find_one({"_id": r_id_obj})
if db:
return self.runs.find_one({"_id": r_id_obj})
else:
result = requests.get(f"http://localhost/api/serialize_run/{r_id}")
try:
result.raise_for_status()
run = deserialize(result.content)
except Exception as e:
print(f"Failed to deserialize {r_id}: {str(e)}", flush=True)
return None
return run

def buffer(self, run, flush):
if not self.is_primary_instance():
Expand Down
19 changes: 19 additions & 0 deletions server/fishtest/util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import base64
import copy
import gzip
import hashlib
import math
import re
Expand All @@ -7,9 +9,11 @@
from functools import cache
from random import uniform

import bson
import fishtest.stats.stat_util
import numpy
import scipy.stats
from bson.codec_options import CodecOptions
from email_validator import EmailNotValidError, caching_resolver, validate_email
from zxcvbn import zxcvbn

Expand Down Expand Up @@ -779,3 +783,18 @@ def __next_schedule(self):
else:
self.__event.wait()
self.__event.clear()


def serialize(run):
b = bson.encode(run)
g = gzip.compress(b)
return g


codec_options = CodecOptions(tz_aware=True, tzinfo=timezone.utc)


def deserialize(g):
b = gzip.decompress(g)
run = bson.decode(b, codec_options=codec_options)
return run
Loading