diff --git a/click_odoo_contrib/_dbutils.py b/click_odoo_contrib/_dbutils.py index f375952..ad450e8 100644 --- a/click_odoo_contrib/_dbutils.py +++ b/click_odoo_contrib/_dbutils.py @@ -5,7 +5,7 @@ import hashlib from contextlib import contextmanager -from click_odoo import odoo +from click_odoo import OdooEnvironment, odoo @contextmanager @@ -61,3 +61,34 @@ def advisory_lock(cr, name): yield finally: cr.execute("SELECT pg_advisory_unlock(%s::bigint)", (lock_id,)) + + +def reset_config_parameters(dbname): + """ + Reset config parameters to default value. This is useful to avoid + conflicts between databases on copy or restore + (dbuuid, ...) + """ + with OdooEnvironment(dbname) as env: + if odoo.release.version_info < (10, 0): + env.registry("ir.config_parameter").init(env.cr, force=True) + else: + env["ir.config_parameter"].init(force=True) + + # reset enterprise keys if exists + env.cr.execute( + """ + DELETE FROM ir_config_parameter + WHERE key = 'database.enterprise_code'; + + UPDATE ir_config_parameter + SET value = 'copy' + WHERE key = 'database.expiration_reason' + AND value != 'demo'; + + UPDATE ir_config_parameter + SET value = CURRENT_DATE + INTERVAL '2 month' + WHERE key = 'database.expiration_date'; + + """ + ) diff --git a/click_odoo_contrib/copydb.py b/click_odoo_contrib/copydb.py index 71d1666..09adb6b 100644 --- a/click_odoo_contrib/copydb.py +++ b/click_odoo_contrib/copydb.py @@ -11,7 +11,12 @@ from click_odoo import odoo from psycopg2.extensions import AsIs, quote_ident -from ._dbutils import db_exists, pg_connect, terminate_connections +from ._dbutils import ( + db_exists, + pg_connect, + reset_config_parameters, + terminate_connections, +) def _copy_db(cr, source, dest): @@ -74,6 +79,7 @@ def main(env, source, dest, force_disconnect, unless_dest_exists, if_source_exis if force_disconnect: terminate_connections(source) _copy_db(cr, source, dest) + reset_config_parameters(dest) _copy_filestore(source, dest) diff --git a/newsfragments/25.feature b/newsfragments/25.feature new file mode 100644 index 0000000..65de2be --- /dev/null +++ b/newsfragments/25.feature @@ -0,0 +1,2 @@ +Ensures reset of ir.config_params into target database when db are copied +This will prevent conflicts between databases (db.uuid, db.secret, db.enterprise_code, ...) diff --git a/tests/test_copydb.py b/tests/test_copydb.py index fe04dd7..beb220b 100644 --- a/tests/test_copydb.py +++ b/tests/test_copydb.py @@ -4,10 +4,11 @@ import os import shutil import subprocess +from collections import defaultdict import pytest from click.testing import CliRunner -from click_odoo import odoo +from click_odoo import OdooEnvironment, odoo from click_odoo_contrib._dbutils import db_exists from click_odoo_contrib.copydb import main @@ -15,6 +16,12 @@ TEST_DBNAME = "click-odoo-contrib-testcopydb" TEST_DBNAME_NEW = "click-odoo-contrib-testcopydb-new" +ENTERPRISE_IR_CONGIG_KEYS = [ + "database.enterprise_code", + "database.expiration_reason", + "database.expiration_date", +] + def _dropdb(dbname): subprocess.check_call(["dropdb", "--if-exists", dbname]) @@ -24,6 +31,31 @@ def _createdb(dbname): subprocess.check_call(["createdb", dbname]) +def _get_reset_config_params_key(): + major = odoo.release.version_info[0] + default = ["database.uuid", "database.create_date", "web.base.url"] + default = default + ENTERPRISE_IR_CONGIG_KEYS + if major >= 9: + default.append("database.secret") + if major >= 12: + default.extend(["base.login_cooldown_after", "base.login_cooldown_duration"]) + return default + + +def _assert_ir_config_reset(db1, db2): + params_by_db = defaultdict(dict) + for db in (db1, db2): + with OdooEnvironment(database=db) as env: + IrConfigParameters = env["ir.config_parameter"] + for key in _get_reset_config_params_key(): + params_by_db[db][key] = IrConfigParameters.get_param(key) + params1 = params_by_db[db1] + params2 = params_by_db[db2] + assert set(params1.keys()) == set(params2.keys()) + for k, v in params1.items(): + assert v != params2[k] + + @pytest.fixture def filestore(): filestore_dir = odoo.tools.config.filestore(TEST_DBNAME) @@ -43,17 +75,50 @@ def pgdb(): _dropdb(TEST_DBNAME) -def tests_copydb(pgdb, filestore): +@pytest.fixture +def ir_config_param_test_values(odoodb): + with OdooEnvironment(database=odoodb) as env: + for key in _get_reset_config_params_key(): + env.cr.execute( + """ + UPDATE + ir_config_parameter + SET + value='test value' + WHERE + key=%s + RETURNING id + """, + (key,), + ) + if not env.cr.fetchall(): + # Config parameter doesn't exist: create (ex enterprise params) + env.cr.execute( + """ + INSERT INTO + ir_config_parameter + (key, value) + VALUES + (%s, 'test value') + """, + (key,), + ) + + +def tests_copydb(odoodb, ir_config_param_test_values): filestore_dir_new = odoo.tools.config.filestore(TEST_DBNAME_NEW) + filestore_dir_original = odoo.tools.config.filestore(odoodb) + if not os.path.exists(filestore_dir_original): + os.makedirs(filestore_dir_original) try: assert not db_exists(TEST_DBNAME_NEW) assert not os.path.exists(filestore_dir_new) result = CliRunner().invoke( - main, ["--force-disconnect", TEST_DBNAME, TEST_DBNAME_NEW] + main, ["--force-disconnect", odoodb, TEST_DBNAME_NEW] ) assert result.exit_code == 0 - # this dropdb will indirectly test that the new db exists - subprocess.check_call(["dropdb", TEST_DBNAME_NEW]) + # this assert will indirectly test that the new db exists + _assert_ir_config_reset(odoodb, TEST_DBNAME_NEW) assert os.path.isdir(filestore_dir_new) finally: _dropdb(TEST_DBNAME_NEW) @@ -110,15 +175,18 @@ def test_copydb_template_not_exists_target_exists(): _dropdb(TEST_DBNAME_NEW) -def test_copydb_no_source_filestore(pgdb): +def test_copydb_no_source_filestore(odoodb, ir_config_param_test_values): filestore_dir_new = odoo.tools.config.filestore(TEST_DBNAME_NEW) + filestore_dir_original = odoo.tools.config.filestore(odoodb) + if os.path.exists(filestore_dir_original): + shutil.rmtree(filestore_dir_original) try: result = CliRunner().invoke( - main, ["--force-disconnect", TEST_DBNAME, TEST_DBNAME_NEW] + main, ["--force-disconnect", odoodb, TEST_DBNAME_NEW] ) assert result.exit_code == 0 - # this dropdb will indirectly test that the new db exists - subprocess.check_call(["dropdb", TEST_DBNAME_NEW]) + # this assert will indirectly test that the new db exists + _assert_ir_config_reset(odoodb, TEST_DBNAME_NEW) assert not os.path.isdir(filestore_dir_new) finally: _dropdb(TEST_DBNAME_NEW)