|
| 1 | +import contextlib |
| 2 | +import json |
| 3 | +import os |
| 4 | +import subprocess |
| 5 | +import tempfile |
| 6 | +from typing import Dict, Literal |
| 7 | + |
| 8 | +from fastapi import FastAPI, HTTPException, Query, Request |
| 9 | +from google.cloud import logging, secretmanager |
| 10 | + |
| 11 | +app = FastAPI() |
| 12 | + |
| 13 | +# Setup for Google Cloud clients and logging |
| 14 | +SECRET_CLIENT = secretmanager.SecretManagerServiceClient() |
| 15 | +LOGGING_CLIENT = logging.Client() |
| 16 | +SECRET_PROJECT = 'sample-metadata' |
| 17 | +SECRET_NAME = 'liquibase-schema-updater' |
| 18 | +log_name = 'lb_schema_update_log' |
| 19 | +logger = LOGGING_CLIENT.logger(log_name) |
| 20 | + |
| 21 | +# Important to maintain this filename otherwise Liquibase fails to recognise previous migrations |
| 22 | +changelog_file = 'project.xml' |
| 23 | + |
| 24 | + |
| 25 | +def read_db_credentials(env: Literal['prod', 'dev']) -> Dict[Literal['dbname', 'username', 'password', 'host'], str]: |
| 26 | + """Get database credentials from Secret Manager.""" |
| 27 | + try: |
| 28 | + secret_path = SECRET_CLIENT.secret_version_path(SECRET_PROJECT, SECRET_NAME, 'latest') |
| 29 | + response = SECRET_CLIENT.access_secret_version(request={'name': secret_path}) |
| 30 | + return json.loads(response.payload.data.decode('UTF-8'))[env] |
| 31 | + except Exception as e: # Broad exception for example; refine as needed |
| 32 | + text = f'Failed to retrieve or parse secrets: {e}' |
| 33 | + logger.log_text(text, severity='ERROR') |
| 34 | + raise HTTPException(status_code=500, detail=text) from e |
| 35 | + |
| 36 | + |
| 37 | +@app.post('/execute-liquibase') |
| 38 | +async def execute_liquibase(request: Request, environment: Literal['prod', 'dev'] = Query(default='dev', regex='^(prod|dev)$')): |
| 39 | + """Endpoint to remotely trigger Liquibase commands on a GCP VM using XML content.""" |
| 40 | + xml_content = await request.body() |
| 41 | + |
| 42 | + # Clean up the local temporary file |
| 43 | + credentials = read_db_credentials(env=environment) |
| 44 | + db_username = credentials['username'] |
| 45 | + db_password = credentials['password'] |
| 46 | + db_hostname = credentials['host'] |
| 47 | + db_name = credentials['dbname'] |
| 48 | + |
| 49 | + # Temporary file creation with XML content |
| 50 | + with tempfile.TemporaryDirectory() as tempdir: |
| 51 | + # Specify the file path within the temporary directory |
| 52 | + with contextlib.chdir(tempdir): # pylint: disable=E1101 |
| 53 | + with open(changelog_file, 'wb') as temp_file: |
| 54 | + temp_file.write(xml_content) |
| 55 | + temp_file_path = temp_file.name # Store file path to use later |
| 56 | + remote_file_path = os.path.basename(temp_file_path) |
| 57 | + |
| 58 | + # The actual command to run on the VM |
| 59 | + liquibase_command = [ |
| 60 | + '/opt/liquibase/liquibase', |
| 61 | + f'--changeLogFile={remote_file_path}', |
| 62 | + f'--url=jdbc:mariadb://{db_hostname}/{db_name}', |
| 63 | + f'--driver=org.mariadb.jdbc.Driver', |
| 64 | + f'--classpath=/opt/mariadb-java-client-3.0.3.jar', |
| 65 | + 'update', |
| 66 | + ] |
| 67 | + |
| 68 | + try: |
| 69 | + # Execute the gcloud command |
| 70 | + result = subprocess.run(liquibase_command, check=True, capture_output=True, text=True, env={'LIQUIBASE_COMMAND_PASSWORD': db_password, 'LIQUIBASE_COMMAND_USERNAME': db_username, **os.environ},) |
| 71 | + logger.log_text(f'Liquibase update successful: {result.stdout}', severity='INFO') |
| 72 | + os.remove(temp_file_path) |
| 73 | + return {'message': 'Liquibase update executed successfully', 'output': result.stdout} |
| 74 | + except subprocess.CalledProcessError as e: |
| 75 | + text = f'Failed to execute Liquibase update: {e.stderr}' |
| 76 | + logger.log_text(text, severity='ERROR') |
| 77 | + raise HTTPException(status_code=500, detail=text) from e |
| 78 | + |
| 79 | + |
| 80 | +if __name__ == '__main__': |
| 81 | + import uvicorn |
| 82 | + uvicorn.run(app, host='0.0.0.0', port=int(os.environ.get('PORT', 8080))) |
0 commit comments