Skip to content

Commit

Permalink
give up SQLAlchemy 2.0 and use psycopg2 instead
Browse files Browse the repository at this point in the history
  • Loading branch information
lilydjwg committed Sep 9, 2024
1 parent 5017595 commit 53ed81a
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 143 deletions.
51 changes: 21 additions & 30 deletions lilac
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ from concurrent.futures import (
)
import subprocess
from functools import partial
import json

import prctl
import structlog
Expand Down Expand Up @@ -152,20 +153,17 @@ def packages_with_depends(
logger.info('building %s because of %r', p, build_reasons[p])

if db.USE:
from sqlalchemy import delete
with db.get_session() as s:
stmt = delete(db.PkgCurrent)
s.execute(stmt)
s.execute('delete from pkgcurrent')

rows = []
for idx, pkg in enumerate(packages):
rs = [r.to_dict() for r in build_reasons[pkg]]
pp = db.PkgCurrent(
pkgbase = pkg,
index = idx,
status = 'pending',
build_reasons = rs,
)
s.add(pp)
rs = json.dumps([r.to_dict() for r in build_reasons[pkg]])
rows.append((pkg, idx, 'pending', rs))
s.executemany(
'''insert into pkgcurrent
(pkgbase, index, status, build_reasons) values
(%s, %s, %s, %s)''', rows)
db.build_updated(s)

return sorter, dep_building_map
Expand Down Expand Up @@ -476,22 +474,16 @@ def build_it(
memory = r.rusage.memory
else:
cputime = memory = None
rs = [r.to_dict() for r in build_reasons[pkg]]
rs = json.dumps([r.to_dict() for r in build_reasons[pkg]])
with db.get_session() as s:
maintainers = repo.lilacinfos[pkg].maintainers
p = db.PkgLog(
pkgbase = pkg,
nv_version = newver,
pkg_version = version,
elapsed = elapsed,
result = r.__class__.__name__,
cputime = cputime,
memory = memory,
msg = msg,
build_reasons = rs,
maintainers = maintainers,
)
s.add(p)
maintainers = json.dumps(repo.lilacinfos[pkg].maintainers)
s.execute(
'''insert into pkglog
(pkgbase, nv_version, pkg_version, elapsed, result, cputime, memory,
msg, build_reasons, maintainers) values
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)''',
(pkg, newver, version, elapsed, r.__class__.__name__, cputime, memory,
msg, rs, maintainers))
db.mark_pkg_as(s, pkg, 'done')
db.build_updated(s)

Expand Down Expand Up @@ -650,8 +642,8 @@ def main_may_raise(
if db.USE:
logdir_name = logdir.name
with db.get_session() as s:
b = db.Batch(event='start', logdir=logdir_name)
s.add(b)
s.execute('insert into batch (event, logdir) values (%s, %s)',
('start', logdir_name))
db.build_updated(s)
start_build(REPO, logdir, failed, update_succeeded,
config['lilac'].get('max_concurrency', 1))
Expand Down Expand Up @@ -692,8 +684,7 @@ def main_may_raise(
build_logger.info('build end')
if db.USE:
with db.get_session() as s:
b = db.Batch(event='stop')
s.add(b)
s.execute('''insert into batch (event) values ('stop')''')
db.build_updated(s)

git_reset_hard()
Expand Down
164 changes: 51 additions & 113 deletions lilac2/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,155 +2,93 @@
import datetime
import re
import logging
from typing import Optional, Any
from functools import partial

from sqlalchemy import update, select, func
from sqlalchemy.sql import functions
from sqlalchemy.types import BigInteger, Integer, String, DateTime
from sqlalchemy.orm import sessionmaker, MappedAsDataclass, DeclarativeBase, mapped_column, Mapped
from sqlalchemy.dialects.postgresql import JSONB
import psycopg2
import psycopg2.pool

from .typing import RUsage, OnBuildEntry

logger = logging.getLogger(__name__)

class Base(MappedAsDataclass, DeclarativeBase):
pass

class PkgLog(Base):
__tablename__ = 'pkglog'
__table_args__ = {'schema': 'lilac'}
id: Mapped[int] = mapped_column(Integer, init=False, primary_key=True)
ts: Mapped[datetime.datetime] = mapped_column(DateTime, init=False, nullable=False)
pkgbase: Mapped[str]
nv_version: Mapped[Optional[str]]
pkg_version: Mapped[Optional[str]]
elapsed: Mapped[int]
result: Mapped[str]
cputime: Mapped[Optional[int]]
memory: Mapped[Optional[int]] = mapped_column(BigInteger)
msg: Mapped[Optional[str]]
build_reasons: Mapped[list[dict[str, Any]]] = mapped_column(JSONB, nullable=False)
maintainers: Mapped[list[dict[str, Any]]] = mapped_column(JSONB, nullable=False)

class Batch(Base):
__tablename__ = 'batch'
__table_args__ = {'schema': 'lilac'}
id: Mapped[int] = mapped_column(Integer, init=False, primary_key=True)
ts: Mapped[datetime.datetime] = mapped_column(DateTime, init=False, nullable=False)
event: Mapped[str]
logdir: Mapped[Optional[str]] = mapped_column(String, default=None)

class PkgCurrent(Base):
__tablename__ = 'pkgcurrent'
__table_args__ = {'schema': 'lilac'}
id: Mapped[int] = mapped_column(Integer, init=False, primary_key=True)
ts: Mapped[datetime.datetime] = mapped_column(DateTime, init=False, nullable=False)
updated_at: Mapped[datetime.datetime] = mapped_column(DateTime, init=False, nullable=False)
pkgbase: Mapped[str]
index: Mapped[int]
status: Mapped[str]
build_reasons: Mapped[list[dict[str, Any]]] = mapped_column(JSONB, nullable=False)

USE = False
SCHEMA = None
Pool = None

def connect_with_schema(schema, dsn):
conn = psycopg2.connect(dsn)
schema = schema or 'lilac'
if "'" in schema:
raise ValueError('bad schema', schema)
with conn.cursor() as cur:
cur.execute(f"set search_path to '{schema}'")
return conn

def setup(engine, schema):
global USE, SCHEMA
Session.configure(bind=engine)
global USE, Pool, SCHEMA
Pool = psycopg2.pool.ThreadedConnectionPool(
1, 10, '', partial(connect_with_schema, schema))
USE = True
if schema:
SCHEMA = schema

Session = sessionmaker()

@contextmanager
def get_session():
session = Session()
if SCHEMA:
session.connection(
execution_options = {
"schema_translate_map": {"lilac": SCHEMA}
}
)
conn = Pool.getconn()
try:
yield session
session.commit()
except:
session.rollback()
raise
with conn:
with conn.cursor() as cur:
yield cur
finally:
session.close()
Pool.putconn(conn)

def build_updated(s) -> None:
if s.bind.dialect.name != 'postgresql':
return

from sqlalchemy import text
s.execute(text('notify build_updated'))
s.execute('notify build_updated')

def is_last_build_failed(pkgbase: str) -> bool:
with get_session() as s:
r = s.query(PkgLog.result).filter(
PkgLog.pkgbase == pkgbase,
).order_by(PkgLog.ts.desc()).limit(1).one_or_none()
s.execute(
'''select result from pkglog
where pkgbase = %s
order by ts desc limit 1''', (pkgbase,))
r = s.fetchall()

return r and r[0] == 'failed'

def mark_pkg_as(s, pkg: str, status: str) -> None:
stmt = update(
PkgCurrent
).where(
PkgCurrent.pkgbase == pkg,
).values(
status = status,
)
s.execute(stmt)
s.execute('update pkgcurrent set status = %s where pkgbase = %s', (status, pkg))

def get_pkgs_last_success_times(pkgs: list[str]) -> list[tuple[str, datetime.datetime]]:
if not pkgs:
return []

with get_session() as s:
r = s.query(
PkgLog.pkgbase, functions.max(PkgLog.ts),
).filter(
PkgLog.pkgbase.in_(pkgs),
PkgLog.result.in_(['successful', 'staged']),
).group_by(PkgLog.pkgbase).all()
s.execute(
'''select pkgbase, max(ts) from pkglog
where pkgbase in %s and result in ('successful', 'staged')
group by pkgbase''', (pkgs,))
r = s.fetchall()
return r

def get_pkgs_last_rusage(pkgs: list[str]) -> dict[str, RUsage]:
# select pkgbase, cputime from (
# select id, pkgbase, row_number() over (partition by pkgbase order by ts desc) as k
# from pkglog
# where pkgbase in ('vim-lily', 'julia-git')
# ) as w where k = 1
if not pkgs:
return {}

with get_session() as s:
w = select(
func.row_number().over(
partition_by = PkgLog.pkgbase,
order_by = PkgLog.ts.desc(),
).label('k'),
PkgLog.pkgbase, PkgLog.cputime, PkgLog.memory,
).where(
PkgLog.pkgbase.in_(pkgs),
PkgLog.result.in_(['successful', 'staged']),
).subquery()

stmt = select(
w.c.pkgbase, w.c.cputime, w.c.memory,
).select_from(w).where(w.c.k == 1)

rs = s.execute(stmt).all()
s.execute('''
select pkgbase, cputime, memory from (
select pkgbase, cputime, memory, row_number() over (partition by pkgbase order by ts desc) as k
from pkglog
where pkgbase in %s and result in ('successful', 'staged')
) as w where k = 1''', (pkgs,))
rs = s.fetchall()
ret = {r[0]: RUsage(r[1], r[2]) for r in rs}

return ret

def _get_last_two_versions(s, pkg: str) -> tuple[str, str]:
r = s.query(
PkgLog.pkg_version,
).filter(
PkgLog.pkgbase == pkg,
PkgLog.result.in_(['successful', 'staged']),
).order_by(PkgLog.ts.desc()).limit(2).all()
s.execute(
'''select pkg_version from pkglog
where pkgbase = %s and result in ('successful', 'staged')
order by ts desc limit 2''', (pkg,))
r = s.fetchall()

if len(r) == 1:
return '', r[0][0]
Expand Down

0 comments on commit 53ed81a

Please sign in to comment.