Skip to content

Commit

Permalink
Merge pull request #2019 from akhilmhdh/feat/read-replica
Browse files Browse the repository at this point in the history
Postgres read replica support
  • Loading branch information
maidul98 committed Jun 27, 2024
2 parents 15b4c39 + 5d59fe8 commit 1b2a1f2
Show file tree
Hide file tree
Showing 48 changed files with 603 additions and 224 deletions.
33 changes: 19 additions & 14 deletions backend/e2e-test/vitest-environment-knex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import "ts-node/register";

import dotenv from "dotenv";
import jwt from "jsonwebtoken";
import knex from "knex";
import path from "path";

import { seedData1 } from "@app/db/seed-data";
Expand All @@ -15,6 +14,7 @@ import { AuthMethod, AuthTokenType } from "@app/services/auth/auth-type";
import { mockQueue } from "./mocks/queue";
import { mockSmtpServer } from "./mocks/smtp";
import { mockKeyStore } from "./mocks/keystore";
import { initDbConnection } from "@app/db";

dotenv.config({ path: path.join(__dirname, "../../.env.test"), debug: true });
export default {
Expand All @@ -23,23 +23,21 @@ export default {
async setup() {
const logger = await initLogger();
const cfg = initEnvConfig(logger);
const db = knex({
client: "pg",
connection: cfg.DB_CONNECTION_URI,
migrations: {
const db = initDbConnection({
dbConnectionUri: cfg.DB_CONNECTION_URI,
dbRootCert: cfg.DB_ROOT_CERT
});

try {
await db.migrate.latest({
directory: path.join(__dirname, "../src/db/migrations"),
extension: "ts",
tableName: "infisical_migrations"
},
seeds: {
});
await db.seed.run({
directory: path.join(__dirname, "../src/db/seeds"),
extension: "ts"
}
});

try {
await db.migrate.latest();
await db.seed.run();
});
const smtp = mockSmtpServer();
const queue = mockQueue();
const keyStore = mockKeyStore();
Expand Down Expand Up @@ -74,7 +72,14 @@ export default {
// @ts-expect-error type
delete globalThis.jwtToken;
// called after all tests with this env have been run
await db.migrate.rollback({}, true);
await db.migrate.rollback(
{
directory: path.join(__dirname, "../src/db/migrations"),
extension: "ts",
tableName: "infisical_migrations"
},
true
);
await db.destroy();
}
};
Expand Down
239 changes: 146 additions & 93 deletions backend/src/@types/knex.d.ts

Large diffs are not rendered by default.

52 changes: 49 additions & 3 deletions backend/src/db/instance.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,38 @@
import knex from "knex";
import knex, { Knex } from "knex";

export type TDbClient = ReturnType<typeof initDbConnection>;
export const initDbConnection = ({ dbConnectionUri, dbRootCert }: { dbConnectionUri: string; dbRootCert?: string }) => {
const db = knex({
export const initDbConnection = ({
dbConnectionUri,
dbRootCert,
readReplicas = []
}: {
dbConnectionUri: string;
dbRootCert?: string;
readReplicas?: {
dbConnectionUri: string;
dbRootCert?: string;
}[];
}) => {
// akhilmhdh: the default Knex is knex.Knex<any, any[]>. but when assigned with knex({<config>}) the value is knex.Knex<any, unknown[]>
// this was causing issue with files like `snapshot-dal` `findRecursivelySnapshots` this i am explicitly putting the any and unknown[]
// eslint-disable-next-line
let db: Knex<any, unknown[]>;
// eslint-disable-next-line
let readReplicaDbs: Knex<any, unknown[]>[];
// @ts-expect-error the querybuilder type is expected but our intension is to return a knex instance
knex.QueryBuilder.extend("primaryNode", () => {
return db;
});

// @ts-expect-error the querybuilder type is expected but our intension is to return a knex instance
knex.QueryBuilder.extend("replicaNode", () => {
if (!readReplicaDbs.length) return db;

const selectedReplica = readReplicaDbs[Math.floor(Math.random() * readReplicaDbs.length)];
return selectedReplica;
});

db = knex({
client: "pg",
connection: {
connectionString: dbConnectionUri,
Expand All @@ -22,5 +52,21 @@ export const initDbConnection = ({ dbConnectionUri, dbRootCert }: { dbConnection
}
});

readReplicaDbs = readReplicas.map((el) => {
const replicaDbCertificate = el.dbRootCert || dbRootCert;
return knex({
client: "pg",
connection: {
connectionString: el.dbConnectionUri,
ssl: replicaDbCertificate
? {
rejectUnauthorized: true,
ca: Buffer.from(replicaDbCertificate, "base64").toString("ascii")
}
: false
}
});
});

return db;
};
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export const accessApprovalPolicyDALFactory = (db: TDbClient) => {

const findById = async (id: string, tx?: Knex) => {
try {
const doc = await accessApprovalPolicyFindQuery(tx || db, {
const doc = await accessApprovalPolicyFindQuery(tx || db.replicaNode(), {
[`${TableName.AccessApprovalPolicy}.id` as "id"]: id
});
const formatedDoc = mergeOneToManyRelation(
Expand All @@ -54,7 +54,7 @@ export const accessApprovalPolicyDALFactory = (db: TDbClient) => {

const find = async (filter: TFindFilter<TAccessApprovalPolicies & { projectId: string }>, tx?: Knex) => {
try {
const docs = await accessApprovalPolicyFindQuery(tx || db, filter);
const docs = await accessApprovalPolicyFindQuery(tx || db.replicaNode(), filter);
const formatedDoc = mergeOneToManyRelation(
docs,
"id",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ export const accessApprovalRequestDALFactory = (db: TDbClient) => {

const findRequestsWithPrivilegeByPolicyIds = async (policyIds: string[]) => {
try {
const docs = await db(TableName.AccessApprovalRequest)
const docs = await db
.replicaNode()(TableName.AccessApprovalRequest)
.whereIn(`${TableName.AccessApprovalRequest}.policyId`, policyIds)

.leftJoin(
Expand Down Expand Up @@ -170,7 +171,7 @@ export const accessApprovalRequestDALFactory = (db: TDbClient) => {

const findById = async (id: string, tx?: Knex) => {
try {
const sql = findQuery({ [`${TableName.AccessApprovalRequest}.id` as "id"]: id }, tx || db);
const sql = findQuery({ [`${TableName.AccessApprovalRequest}.id` as "id"]: id }, tx || db.replicaNode());
const docs = await sql;
const formatedDoc = sqlNestRelationships({
data: docs,
Expand Down Expand Up @@ -207,7 +208,8 @@ export const accessApprovalRequestDALFactory = (db: TDbClient) => {

const getCount = async ({ projectId }: { projectId: string }) => {
try {
const accessRequests = await db(TableName.AccessApprovalRequest)
const accessRequests = await db
.replicaNode()(TableName.AccessApprovalRequest)
.leftJoin(
TableName.AccessApprovalPolicy,
`${TableName.AccessApprovalRequest}.policyId`,
Expand Down
2 changes: 1 addition & 1 deletion backend/src/ee/services/audit-log/audit-log-dal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export const auditLogDALFactory = (db: TDbClient) => {
tx?: Knex
) => {
try {
const sqlQuery = (tx || db)(TableName.AuditLog)
const sqlQuery = (tx || db.replicaNode())(TableName.AuditLog)
.where(
stripUndefinedInWhere({
projectId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ export const dynamicSecretLeaseDALFactory = (db: TDbClient) => {

const countLeasesForDynamicSecret = async (dynamicSecretId: string, tx?: Knex) => {
try {
const doc = await (tx || db)(TableName.DynamicSecretLease).count("*").where({ dynamicSecretId }).first();
const doc = await (tx || db.replicaNode())(TableName.DynamicSecretLease)
.count("*")
.where({ dynamicSecretId })
.first();
return parseInt(doc || "0", 10);
} catch (error) {
throw new DatabaseError({ error, name: "DynamicSecretCountLeases" });
Expand All @@ -21,7 +24,7 @@ export const dynamicSecretLeaseDALFactory = (db: TDbClient) => {

const findById = async (id: string, tx?: Knex) => {
try {
const doc = await (tx || db)(TableName.DynamicSecretLease)
const doc = await (tx || db.replicaNode())(TableName.DynamicSecretLease)
.where({ [`${TableName.DynamicSecretLease}.id` as "id"]: id })
.first()
.join(
Expand Down
11 changes: 6 additions & 5 deletions backend/src/ee/services/group/group-dal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export const groupDALFactory = (db: TDbClient) => {

const findGroups = async (filter: TFindFilter<TGroups>, { offset, limit, sort, tx }: TFindOpt<TGroups> = {}) => {
try {
const query = (tx || db)(TableName.Groups)
const query = (tx || db.replicaNode())(TableName.Groups)
// eslint-disable-next-line
.where(buildFindFilter(filter))
.select(selectAllTableCols(TableName.Groups));
Expand All @@ -32,7 +32,7 @@ export const groupDALFactory = (db: TDbClient) => {

const findByOrgId = async (orgId: string, tx?: Knex) => {
try {
const docs = await (tx || db)(TableName.Groups)
const docs = await (tx || db.replicaNode())(TableName.Groups)
.where(`${TableName.Groups}.orgId`, orgId)
.leftJoin(TableName.OrgRoles, `${TableName.Groups}.roleId`, `${TableName.OrgRoles}.id`)
.select(selectAllTableCols(TableName.Groups))
Expand Down Expand Up @@ -74,11 +74,12 @@ export const groupDALFactory = (db: TDbClient) => {
username?: string;
}) => {
try {
let query = db(TableName.OrgMembership)
let query = db
.replicaNode()(TableName.OrgMembership)
.where(`${TableName.OrgMembership}.orgId`, orgId)
.join(TableName.Users, `${TableName.OrgMembership}.userId`, `${TableName.Users}.id`)
.leftJoin(TableName.UserGroupMembership, function () {
this.on(`${TableName.UserGroupMembership}.userId`, "=", `${TableName.Users}.id`).andOn(
.leftJoin(TableName.UserGroupMembership, (bd) => {
bd.on(`${TableName.UserGroupMembership}.userId`, "=", `${TableName.Users}.id`).andOn(
`${TableName.UserGroupMembership}.groupId`,
"=",
db.raw("?", [groupId])
Expand Down
15 changes: 8 additions & 7 deletions backend/src/ee/services/group/user-group-membership-dal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export const userGroupMembershipDALFactory = (db: TDbClient) => {
*/
const filterProjectsByUserMembership = async (userId: string, groupId: string, projectIds: string[], tx?: Knex) => {
try {
const userProjectMemberships: string[] = await (tx || db)(TableName.ProjectMembership)
const userProjectMemberships: string[] = await (tx || db.replicaNode())(TableName.ProjectMembership)
.where(`${TableName.ProjectMembership}.userId`, userId)
.whereIn(`${TableName.ProjectMembership}.projectId`, projectIds)
.pluck(`${TableName.ProjectMembership}.projectId`);
Expand All @@ -43,7 +43,8 @@ export const userGroupMembershipDALFactory = (db: TDbClient) => {
// special query
const findUserGroupMembershipsInProject = async (usernames: string[], projectId: string) => {
try {
const usernameDocs: string[] = await db(TableName.UserGroupMembership)
const usernameDocs: string[] = await db
.replicaNode()(TableName.UserGroupMembership)
.join(
TableName.GroupProjectMembership,
`${TableName.UserGroupMembership}.groupId`,
Expand Down Expand Up @@ -73,7 +74,7 @@ export const userGroupMembershipDALFactory = (db: TDbClient) => {
try {
// get list of groups in the project with id [projectId]
// that that are not the group with id [groupId]
const groups: string[] = await (tx || db)(TableName.GroupProjectMembership)
const groups: string[] = await (tx || db.replicaNode())(TableName.GroupProjectMembership)
.where(`${TableName.GroupProjectMembership}.projectId`, projectId)
.whereNot(`${TableName.GroupProjectMembership}.groupId`, groupId)
.pluck(`${TableName.GroupProjectMembership}.groupId`);
Expand All @@ -83,8 +84,8 @@ export const userGroupMembershipDALFactory = (db: TDbClient) => {
.where(`${TableName.UserGroupMembership}.groupId`, groupId)
.where(`${TableName.UserGroupMembership}.isPending`, false)
.join(TableName.Users, `${TableName.UserGroupMembership}.userId`, `${TableName.Users}.id`)
.leftJoin(TableName.ProjectMembership, function () {
this.on(`${TableName.Users}.id`, "=", `${TableName.ProjectMembership}.userId`).andOn(
.leftJoin(TableName.ProjectMembership, (bd) => {
bd.on(`${TableName.Users}.id`, "=", `${TableName.ProjectMembership}.userId`).andOn(
`${TableName.ProjectMembership}.projectId`,
"=",
db.raw("?", [projectId])
Expand All @@ -107,9 +108,9 @@ export const userGroupMembershipDALFactory = (db: TDbClient) => {
db.ref("publicKey").withSchema(TableName.UserEncryptionKey)
)
.where({ isGhost: false }) // MAKE SURE USER IS NOT A GHOST USER
.whereNotIn(`${TableName.UserGroupMembership}.userId`, function () {
.whereNotIn(`${TableName.UserGroupMembership}.userId`, (bd) => {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.select(`${TableName.UserGroupMembership}.userId`)
bd.select(`${TableName.UserGroupMembership}.userId`)
.from(TableName.UserGroupMembership)
.whereIn(`${TableName.UserGroupMembership}.groupId`, groups);
});
Expand Down
3 changes: 2 additions & 1 deletion backend/src/ee/services/ldap-config/ldap-group-map-dal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ export const ldapGroupMapDALFactory = (db: TDbClient) => {

const findLdapGroupMapsByLdapConfigId = async (ldapConfigId: string) => {
try {
const docs = await db(TableName.LdapGroupMap)
const docs = await db
.replicaNode()(TableName.LdapGroupMap)
.where(`${TableName.LdapGroupMap}.ldapConfigId`, ldapConfigId)
.join(TableName.Groups, `${TableName.LdapGroupMap}.groupId`, `${TableName.Groups}.id`)
.select(selectAllTableCols(TableName.LdapGroupMap))
Expand Down
2 changes: 1 addition & 1 deletion backend/src/ee/services/license/license-dal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export type TLicenseDALFactory = ReturnType<typeof licenseDALFactory>;
export const licenseDALFactory = (db: TDbClient) => {
const countOfOrgMembers = async (orgId: string | null, tx?: Knex) => {
try {
const doc = await (tx || db)(TableName.OrgMembership)
const doc = await (tx || db.replicaNode())(TableName.OrgMembership)
.where({ status: OrgMembershipStatus.Accepted })
.andWhere((bd) => {
if (orgId) {
Expand Down
15 changes: 10 additions & 5 deletions backend/src/ee/services/permission/permission-dal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ export type TPermissionDALFactory = ReturnType<typeof permissionDALFactory>;
export const permissionDALFactory = (db: TDbClient) => {
const getOrgPermission = async (userId: string, orgId: string) => {
try {
const membership = await db(TableName.OrgMembership)
const membership = await db
.replicaNode()(TableName.OrgMembership)
.leftJoin(TableName.OrgRoles, `${TableName.OrgMembership}.roleId`, `${TableName.OrgRoles}.id`)
.join(TableName.Organization, `${TableName.OrgMembership}.orgId`, `${TableName.Organization}.id`)
.where("userId", userId)
Expand All @@ -28,7 +29,8 @@ export const permissionDALFactory = (db: TDbClient) => {

const getOrgIdentityPermission = async (identityId: string, orgId: string) => {
try {
const membership = await db(TableName.IdentityOrgMembership)
const membership = await db
.replicaNode()(TableName.IdentityOrgMembership)
.leftJoin(TableName.OrgRoles, `${TableName.IdentityOrgMembership}.roleId`, `${TableName.OrgRoles}.id`)
.join(TableName.Organization, `${TableName.IdentityOrgMembership}.orgId`, `${TableName.Organization}.id`)
.where("identityId", identityId)
Expand All @@ -45,11 +47,13 @@ export const permissionDALFactory = (db: TDbClient) => {

const getProjectPermission = async (userId: string, projectId: string) => {
try {
const groups: string[] = await db(TableName.GroupProjectMembership)
const groups: string[] = await db
.replicaNode()(TableName.GroupProjectMembership)
.where(`${TableName.GroupProjectMembership}.projectId`, projectId)
.pluck(`${TableName.GroupProjectMembership}.groupId`);

const groupDocs = await db(TableName.UserGroupMembership)
const groupDocs = await db
.replicaNode()(TableName.UserGroupMembership)
.where(`${TableName.UserGroupMembership}.userId`, userId)
.whereIn(`${TableName.UserGroupMembership}.groupId`, groups)
.join(
Expand Down Expand Up @@ -231,7 +235,8 @@ export const permissionDALFactory = (db: TDbClient) => {

const getProjectIdentityPermission = async (identityId: string, projectId: string) => {
try {
const docs = await db(TableName.IdentityProjectMembership)
const docs = await db
.replicaNode()(TableName.IdentityProjectMembership)
.join(
TableName.IdentityProjectMembershipRole,
`${TableName.IdentityProjectMembershipRole}.projectMembershipId`,
Expand Down
3 changes: 2 additions & 1 deletion backend/src/ee/services/saml-config/saml-config-dal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ export const samlConfigDALFactory = (db: TDbClient) => {

const findEnforceableSamlCfg = async (orgId: string) => {
try {
const samlCfg = await db(TableName.SamlConfig)
const samlCfg = await db
.replicaNode()(TableName.SamlConfig)
.where({
orgId,
isActive: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export const secretApprovalPolicyDALFactory = (db: TDbClient) => {

const findById = async (id: string, tx?: Knex) => {
try {
const doc = await sapFindQuery(tx || db, {
const doc = await sapFindQuery(tx || db.replicaNode(), {
[`${TableName.SecretApprovalPolicy}.id` as "id"]: id
});
const formatedDoc = mergeOneToManyRelation(
Expand All @@ -52,7 +52,7 @@ export const secretApprovalPolicyDALFactory = (db: TDbClient) => {

const find = async (filter: TFindFilter<TSecretApprovalPolicies & { projectId: string }>, tx?: Knex) => {
try {
const docs = await sapFindQuery(tx || db, filter);
const docs = await sapFindQuery(tx || db.replicaNode(), filter);
const formatedDoc = mergeOneToManyRelation(
docs,
"id",
Expand Down
Loading

0 comments on commit 1b2a1f2

Please sign in to comment.