diff --git a/lilac b/lilac index e9cfbc1..084ddb7 100755 --- a/lilac +++ b/lilac @@ -20,6 +20,7 @@ from concurrent.futures import ( ) import subprocess from functools import partial +import json import prctl import structlog @@ -152,20 +153,17 @@ def packages_with_depends( logger.info('building %s because of %r', p, build_reasons[p]) if db.USE: - from sqlalchemy import delete with db.get_session() as s: - stmt = delete(db.PkgCurrent) - s.execute(stmt) + s.execute('delete from pkgcurrent') + rows = [] for idx, pkg in enumerate(packages): - rs = [r.to_dict() for r in build_reasons[pkg]] - pp = db.PkgCurrent( - pkgbase = pkg, - index = idx, - status = 'pending', - build_reasons = rs, - ) - s.add(pp) + rs = json.dumps([r.to_dict() for r in build_reasons[pkg]]) + rows.append((pkg, idx, 'pending', rs)) + s.executemany( + '''insert into pkgcurrent + (pkgbase, index, status, build_reasons) values + (%s, %s, %s, %s)''', rows) db.build_updated(s) return sorter, dep_building_map @@ -476,22 +474,16 @@ def build_it( memory = r.rusage.memory else: cputime = memory = None - rs = [r.to_dict() for r in build_reasons[pkg]] + rs = json.dumps([r.to_dict() for r in build_reasons[pkg]]) with db.get_session() as s: - maintainers = repo.lilacinfos[pkg].maintainers - p = db.PkgLog( - pkgbase = pkg, - nv_version = newver, - pkg_version = version, - elapsed = elapsed, - result = r.__class__.__name__, - cputime = cputime, - memory = memory, - msg = msg, - build_reasons = rs, - maintainers = maintainers, - ) - s.add(p) + maintainers = json.dumps(repo.lilacinfos[pkg].maintainers) + s.execute( + '''insert into pkglog + (pkgbase, nv_version, pkg_version, elapsed, result, cputime, memory, + msg, build_reasons, maintainers) values + (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)''', + (pkg, newver, version, elapsed, r.__class__.__name__, cputime, memory, + msg, rs, maintainers)) db.mark_pkg_as(s, pkg, 'done') db.build_updated(s) @@ -650,8 +642,8 @@ def main_may_raise( if db.USE: logdir_name = logdir.name with db.get_session() as s: - b = db.Batch(event='start', logdir=logdir_name) - s.add(b) + s.execute('insert into batch (event, logdir) values (%s, %s)', + ('start', logdir_name)) db.build_updated(s) start_build(REPO, logdir, failed, update_succeeded, config['lilac'].get('max_concurrency', 1)) @@ -692,8 +684,7 @@ def main_may_raise( build_logger.info('build end') if db.USE: with db.get_session() as s: - b = db.Batch(event='stop') - s.add(b) + s.execute('''insert into batch (event) values ('stop')''') db.build_updated(s) git_reset_hard() diff --git a/lilac2/db.py b/lilac2/db.py index bdf461d..27d7380 100644 --- a/lilac2/db.py +++ b/lilac2/db.py @@ -2,155 +2,93 @@ import datetime import re import logging -from typing import Optional, Any +from functools import partial -from sqlalchemy import update, select, func -from sqlalchemy.sql import functions -from sqlalchemy.types import BigInteger, Integer, String, DateTime -from sqlalchemy.orm import sessionmaker, MappedAsDataclass, DeclarativeBase, mapped_column, Mapped -from sqlalchemy.dialects.postgresql import JSONB +import psycopg2 +import psycopg2.pool from .typing import RUsage, OnBuildEntry logger = logging.getLogger(__name__) -class Base(MappedAsDataclass, DeclarativeBase): - pass - -class PkgLog(Base): - __tablename__ = 'pkglog' - __table_args__ = {'schema': 'lilac'} - id: Mapped[int] = mapped_column(Integer, init=False, primary_key=True) - ts: Mapped[datetime.datetime] = mapped_column(DateTime, init=False, nullable=False) - pkgbase: Mapped[str] - nv_version: Mapped[Optional[str]] - pkg_version: Mapped[Optional[str]] - elapsed: Mapped[int] - result: Mapped[str] - cputime: Mapped[Optional[int]] - memory: Mapped[Optional[int]] = mapped_column(BigInteger) - msg: Mapped[Optional[str]] - build_reasons: Mapped[list[dict[str, Any]]] = mapped_column(JSONB, nullable=False) - maintainers: Mapped[list[dict[str, Any]]] = mapped_column(JSONB, nullable=False) - -class Batch(Base): - __tablename__ = 'batch' - __table_args__ = {'schema': 'lilac'} - id: Mapped[int] = mapped_column(Integer, init=False, primary_key=True) - ts: Mapped[datetime.datetime] = mapped_column(DateTime, init=False, nullable=False) - event: Mapped[str] - logdir: Mapped[Optional[str]] = mapped_column(String, default=None) - -class PkgCurrent(Base): - __tablename__ = 'pkgcurrent' - __table_args__ = {'schema': 'lilac'} - id: Mapped[int] = mapped_column(Integer, init=False, primary_key=True) - ts: Mapped[datetime.datetime] = mapped_column(DateTime, init=False, nullable=False) - updated_at: Mapped[datetime.datetime] = mapped_column(DateTime, init=False, nullable=False) - pkgbase: Mapped[str] - index: Mapped[int] - status: Mapped[str] - build_reasons: Mapped[list[dict[str, Any]]] = mapped_column(JSONB, nullable=False) - USE = False -SCHEMA = None +Pool = None + +def connect_with_schema(schema, dsn): + conn = psycopg2.connect(dsn) + schema = schema or 'lilac' + if "'" in schema: + raise ValueError('bad schema', schema) + with conn.cursor() as cur: + cur.execute(f"set search_path to '{schema}'") + return conn def setup(engine, schema): - global USE, SCHEMA - Session.configure(bind=engine) + global USE, Pool, SCHEMA + Pool = psycopg2.pool.ThreadedConnectionPool( + 1, 10, '', partial(connect_with_schema, schema)) USE = True - if schema: - SCHEMA = schema - -Session = sessionmaker() @contextmanager def get_session(): - session = Session() - if SCHEMA: - session.connection( - execution_options = { - "schema_translate_map": {"lilac": SCHEMA} - } - ) + conn = Pool.getconn() try: - yield session - session.commit() - except: - session.rollback() - raise + with conn: + with conn.cursor() as cur: + yield cur finally: - session.close() + Pool.putconn(conn) def build_updated(s) -> None: - if s.bind.dialect.name != 'postgresql': - return - - from sqlalchemy import text - s.execute(text('notify build_updated')) + s.execute('notify build_updated') def is_last_build_failed(pkgbase: str) -> bool: with get_session() as s: - r = s.query(PkgLog.result).filter( - PkgLog.pkgbase == pkgbase, - ).order_by(PkgLog.ts.desc()).limit(1).one_or_none() + s.execute( + '''select result from pkglog + where pkgbase = %s + order by ts desc limit 1''', (pkgbase,)) + r = s.fetchall() return r and r[0] == 'failed' def mark_pkg_as(s, pkg: str, status: str) -> None: - stmt = update( - PkgCurrent - ).where( - PkgCurrent.pkgbase == pkg, - ).values( - status = status, - ) - s.execute(stmt) + s.execute('update pkgcurrent set status = %s where pkgbase = %s', (status, pkg)) def get_pkgs_last_success_times(pkgs: list[str]) -> list[tuple[str, datetime.datetime]]: + if not pkgs: + return [] + with get_session() as s: - r = s.query( - PkgLog.pkgbase, functions.max(PkgLog.ts), - ).filter( - PkgLog.pkgbase.in_(pkgs), - PkgLog.result.in_(['successful', 'staged']), - ).group_by(PkgLog.pkgbase).all() + s.execute( + '''select pkgbase, max(ts) from pkglog + where pkgbase in %s and result in ('successful', 'staged') + group by pkgbase''', (pkgs,)) + r = s.fetchall() return r def get_pkgs_last_rusage(pkgs: list[str]) -> dict[str, RUsage]: - # select pkgbase, cputime from ( - # select id, pkgbase, row_number() over (partition by pkgbase order by ts desc) as k - # from pkglog - # where pkgbase in ('vim-lily', 'julia-git') - # ) as w where k = 1 + if not pkgs: + return {} + with get_session() as s: - w = select( - func.row_number().over( - partition_by = PkgLog.pkgbase, - order_by = PkgLog.ts.desc(), - ).label('k'), - PkgLog.pkgbase, PkgLog.cputime, PkgLog.memory, - ).where( - PkgLog.pkgbase.in_(pkgs), - PkgLog.result.in_(['successful', 'staged']), - ).subquery() - - stmt = select( - w.c.pkgbase, w.c.cputime, w.c.memory, - ).select_from(w).where(w.c.k == 1) - - rs = s.execute(stmt).all() + s.execute(''' + select pkgbase, cputime, memory from ( + select pkgbase, cputime, memory, row_number() over (partition by pkgbase order by ts desc) as k + from pkglog + where pkgbase in %s and result in ('successful', 'staged') + ) as w where k = 1''', (pkgs,)) + rs = s.fetchall() ret = {r[0]: RUsage(r[1], r[2]) for r in rs} return ret def _get_last_two_versions(s, pkg: str) -> tuple[str, str]: - r = s.query( - PkgLog.pkg_version, - ).filter( - PkgLog.pkgbase == pkg, - PkgLog.result.in_(['successful', 'staged']), - ).order_by(PkgLog.ts.desc()).limit(2).all() + s.execute( + '''select pkg_version from pkglog + where pkgbase = %s and result in ('successful', 'staged') + order by ts desc limit 2''', (pkg,)) + r = s.fetchall() if len(r) == 1: return '', r[0][0]