Quick Start Guide: https://github.com/LeoPlatform/Leo
- Creating a checksum bot
- Creating a Domain Object transform bot
- Support
- You know how to create bots in the Leo Platform. (https://github.com/LeoPlatform/cli)
- You know how to create AWS permissions.
- You know how to do AWS networking.
- leo-sdk: 1.1.0+
- leo-connector-common: 1.1.2+
- leo-connector-(mysql|postgres|sqlserver): 1.3.0+
- leo-connector-mysql:
npm install leo-connector-mysql
- leo-connector-postgres:
npm install leo-connector-postgres
- leo-connector-sqlserver:
npm install leo-connector-sqlserver
If using the AWS secrets manager, create secret keys for your databases. The secret names will be used in step 2.
If you already have a connector setup for this database connection, skip this step.
Using the CLI, create a connector bot for each database you need to connect to. If one or more of your connections are an endpoint or a database type we don't support, see the basicConnector section.
leo-cli create leo-connector-{connector type} checksum {bot name}
leo-cli create leo-connector-mysql checksum mysqlConnector
Now browse to your new bot (bots/mysqlConnector) and open up package.json and replace the dbsecret
key name
with the one you created in AWS Secrets Manager.
If you are using a VPC for access to your database, or are using an AWS RDS instance, add the VpcConfig to the *package.json under config.leo object.
"config": {
"leo": {
"type": "bot",
"memory": 256,
"timeout": 300,
"role": "ApiRole",
"env": {
"dbsecret": "database_secret_key_name"
},
"VpcConfig": {
"SecurityGroupIds": [
"sg-123456ab"
],
"SubnetIds": [
"subnet-abc12345",
"subnet-def67890",
"subnet-ghi45679"
]
}
}
}
Repeat this step for each master or slave database you will run a checksum against.
In your service, be sure to install the NPM modules for the connectors you are using.
Now publish and deploy the bots.
Congratulations! You now have connectors setup to run a checksum. Next we'll need to create a checksum runner.
const leo = require('leo-sdk');
const checksum = require('leo-connector-common/checksum');
const moment = require('moment');
Use lambdaConnector to connect to the 2 database connectors you created in the previous section and build out the data you want to compare between the 2 connectors. For this example, I'm using a MySQL connector for the master, and the Postgres for the slave. We're going to compare id and status from the orders tables in both databases.
exports.handler = function(event, context, callback) {
let db1 = checksum.lambdaConnector('MySQL DB Lead checksum', process.env.mysql_lambda, {
sql: `SELECT id, status FROM orders WHERE id __IDCOLUMNLIMIT__`,
table: 'orders',
id_column: 'id',
key_column: 'primary'
});
let db2 = checksum.lambdaConnector('Postgres DB Lead checksum', process.env.postgres_lambda, {
sql: `SELECT id, status FROM orders WHERE id __IDCOLUMNLIMIT__`,
table: 'orders',
id_column: 'id',
key_column: 'primary'
});
// checksum code in step 3 (below) goes here
}
Now create the checksum with parameters.
let system = 'default';
checksum.checksum(system, event.botId, db1, db2, {
stopOnStreak: 1750000, // Set the number of records that if the checksum finds in sequence that are identical, it will stop and mark itself as completed.
stop_at: moment().add({minutes: 4}), // Lambda has a 5-minute limit, so we set this to 4 so the bot has time to cleanup. It will restart right after this and continue where it left off.
limit: 20000, // the number of records to start comparing between the 2 databases.
maxLimit: 500000, // If a "block" 20,000 or more records are identical, increase the comparison block size from limit to this max limit
shouldDelete: false, // set this to true if you want records that exist in the slave database but not in master to be deleted.
loadSize: 50000, // this is the recommended load size
reverse: true, // Processes records from highest to lowest. Set to false to process from lowest to highest.
sample: true, //
queue: { // this controls the queue where the ID's go that are marked as missing from the slave database
name: event.destination, // queue name.
transform: leo.streams.through((obj, done) => { // How to transform the ID's before sending into the queue.
done(null, {
Orders: obj.missing.concat(obj.incorrect)
});
})
}
//skipBatch: true, // only set to true if you need to 2 connectors to compare individual records insteadof batches
//showOutput: false
})
.then(data=>{ console.log(data); callback()})
.catch(callback);
{
"name": "OrdersChecksum",
"version": "1.0.0",
"description": "Checksum for the Orders table",
"main": "index.js",
"directories": {
"test": "test"
},
"scripts": {
"test": "leo-cli test . "
},
"config": {
"leo": {
"type": "cron",
"memory": 256,
"timeout": 300,
"role": "ApiRole",
"env": {
"mysql_lambda": {
"Fn::Sub": "${MysqlConnector}"
},
"postgres_lambda": {
"Fn::Sub": "${PostgresConnector}"
}
},
"cron": {
"settings": {
"source": "system:mysqlConnector",
"destination": "orderChanges",
}
},
"time": "0 0 0 * * * "
}
}
}
Your cloudformation will now need to be configured to be able to "invoke lambda". Skip this step if you already have this set.
In your cloudformation.json, search for the configuration for the role you're using. In the package.json example above, we're using "ApiRole". Find the ApiRole in the Resources:
"Resources": {
"ApiRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": [...],
"ManagedPolicyArns": [...],
"Policies": [...Add Invoke Lambda policy here...]
}
},
Add policies to invoke lambda, connect to kms, and secrets manager.
{
"PolicyName": "Invoke_Lambda",
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": "lambda:*",
"Resource": "*"
}
]
}
},
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"secretsmanager:GetSecretValue",
"secretsmanager:DescribeSecret"
],
"Resource": "arn:aws:secretsmanager:*:*:secret:*"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": "secretsmanager:ListSecrets",
"Resource": "*"
}
]
},
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"kms:GetParametersForImport",
"kms:ListKeyPolicies",
"kms:GetKeyRotationStatus",
"kms:ListRetirableGrants",
"kms:GetKeyPolicy",
"kms:DescribeKey",
"kms:ListResourceTags",
"kms:ListGrants",
"kms:Decrypt"
],
"Resource": "arn:aws:kms:*:*:key/*"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"kms:ListKeys",
"kms:GenerateRandom",
"kms:ListAliases",
"kms:ReEncryptTo",
"kms:ReEncryptFrom"
],
"Resource": "*"
}
]
}
Make sure the checksum runner is not in a VPC (No VpcConfig in the package.json). Publish and deploy the checksum runner.
You can either wait for the checksum to run from the cron time set, or you can force it to run through botmon. Once the bot runs once, when you open it up in botmon, the checksum tab will appear and you can see the current status, if it's running, or the results from the last run.
If one of your endpoints is not a database (e.g. API), or not a database we support, you can create a custom
connector using basicConnector
. A custom connector differs from a database connector in that you don't create a
specific connector for it, instead you add this directly into the runner.
You can create a custom connector either as the master and use a database connector for the slave; use a database connector for the master and a custom connector for the slave; or use a custom connect for both the master and the slave.
let customConnector = checksum.basicConnector('< Checksum name >', {
id_column: 'id'
}, {
// custom handlers go here
});
Now add the handlers to handle the data.
- batch
- individual
- range
- nibble
- delete
- sample (required if sample is set to true)
- initialize
- destroy
- batch
- individual
- delete
- sample (required if sample is set to true)
- initialize
- destroy
- range
- nibble
Called when checksum starts (does not include restarts after a lambda 5-minute timeout)
/**
* Called when checksum starts.
* Used for situations such as when your endpoint requires authorization.
* Called with data, return a session
*/
initialize: function(data) {
return Promise.resolve({});
}
Called after initialize. Range gets the max and min id's, as well as the total number of id's. This is stored in the session until the checksum completes. Each restart of checksum after a lambda timeout will use the range stored in the session.
/**
* @int start
* @int end
* @object options (optional)
* @return object {min: int, max: int, total: int}
*/
// Respond to start and end -- options
// Return object with min, max, total
range: function(start, end) {
let min = null;
let max = null;
let total = 0;
/************************************************
* Begin example code to get min, max, and total.*
* This example loops through records returned *
* into “db” and creates a start and end from *
* the greatest and least id’s. *
*************************************************/
// db: object containing records to compare
let db = [{id: 1, name: 'foo', etc: 'etc'}, {...}];
Object.keys(db).map(id => {
id = db[id][this.settings.id_column];
if ((start === undefined || id >= start) && (end === undefined || id <= end)) {
total++;
if (min == null || id < min) {
min = id;
}
if (max == null || id > max) {
max = id;
}
}
});
/**********************************************
* End example code to get min, max, and total *
***********************************************/
// return a min, max and total
return Promise.resolve({
min,
max,
total
});
}
Gets a chunk of data between a specified start and end to compare against a master or slave set of data.
/**
* Respond to a start and end, and build an array of data returned into “db”
*
* @int start
* @int end
* @return mixed (Stream|Array|hash)
*/
batch: function(start, end) {
let data = [];
// db: object containing records to compare
let db = [{id: 1, name: 'foo', etc: 'etc'}, {...}];
/***********************************************************************************
* Example code to put together an array of data using the data returned from “db” *
***********************************************************************************/
for (v of db) {
data.push(v);
}
/**********************************************************************************************************
* Alternatively, if you cannot pass in a start and end and just get a chunk of data back, build an array *
* with the data having id’s between start and end *
***********************************************************************************************************/
for (let i = start; i <= end; i++) {
if (typeof db[i] !== 'undefined') {
data.push(db[i]);
}
}
// return the array of data
return Promise.resolve(data);
}
/**
* Nibble handler: Uses a start, end, limit, and reverse; and gives a “next” and “current” to continue checking data
* @int start
* @int end
* @int limit
* @bool reverse
*/
// Responds to start, end, limit, reverse
// Returns object with next, current
nibble: function(start, end, limit, reverse) {
// db: object containing records to compare
let db = [{id: 1, name: 'foo', etc: 'etc'}, {...}];
let current = null;
let next = null;
let dir = 1;
let ostart = start;
let oend = end;
if (reverse) {
start = end;
end = ostart;
dir = -1;
}
let cnt = 0;
for (let i = start; i >= ostart && i <= oend; i += dir) {
if (typeof db[i] !== undefined) {
let v = db[i];
cnt++;
if (cnt >= limit) {
if (!current) {
current = v[this.settings.id_column];
} else {
next = v[this.settings.id_column];
break;
}
}
}
}
return Promise.resolve({
current,
next
});
}
The code required is the same as “batch” above. If you have created batch, just call batch in the return and you're done here. If you don't already have batch, follow the example for batch, but use individual.
individual: function(start, end) {
return this.batch(start, end);
}
Delete records in the slave database that do not exist in the master database.
This only runs if shouldDelete
is set to true.
delete: function(ids) {
// db: object containing records to compare
let db = [{id: 1, name: 'foo', etc: 'etc'}, {...}];
ids.map(id => {
if (id in db) {
delete db[id];
}
});
return Promise.resolve();
}
Used to return a sample of ID that are different between the master and slave.
// Respond to ids
// Return Stream, Array
sample: function(ids) {
let data = [];
// db: object containing records to compare
let db = [{id: 1, name: 'foo', etc: 'etc'}, {...}];
ids.map(id => {
let v = db[id];
if (v !== undefined) {
data.push(v);
console.log(v);
}
});
return Promise.resolve(data);
}
Destroy runs once on checksum completion. Use this if you need to shutdown a session or add additional logging.
destroy: function(data) {
return Promise.resolve();
}
// include the required handlers
const leo = require("leo-sdk");
const helperFactory = require("leo-connector-common/botHelper");
// config requires leo-sdk 2.x
const config = require('leo-config');
// use the connector for your database type:
const connector = require('leo-connector-<dbtype>');
// use this handler for leo-sdk 2.x
exports.handler = require("leo-sdk/wrappers/cron.js")(async function(event, context, callback) {
// use this handler for leo-sdk 1.x
exports.handler = async function(event, context, callback) {
// create the helper
const helper = new helperFactory(event, context, leo);
Example:
"use strict";
const leo = require("leo-sdk");
const ls = require("leo-streams");
const config = require("leo-config");
const connector = require('leo-connector-mysql');
exports.handler = require("leo-sdk/wrappers/cron.js")(async function (event, context, callback) {
let source = 'system:mysql';
let destination = 'people';
// these settings are specific to snapshot.
let snapshotSettings = {
table: 'people', // table name that has our primary keys
pk: 'id', // primary key of the table we're using
limit: 5000, // max number of id's to insert into the select query
};
let domainObjectBuilder = connector.domainObjectBuilder(config.db);
let snapshotter = connector.snapshotter(config.db);
// translate id's into id's specific to the domain we're creating.
// In the translation object below, we're going to get people.id from an array of people ids, as well as an array of contact ids.
let tableIdTranslations = {
people: true, // No Translation needed. Expects id's to be in format like this: {payload.[update.]people[1, 2, 3, 5, 25, 35, 49, 50, etc…]}
contacts: data => `SELECT people_id FROM contacts WHERE id in (?)`, // Gets the people.id from the contacts table from a format like this: {payload.[update.]contacts[1, 2, 3, 5, 25, 35, 49, 50, etc…]}
};
let domainObject = new domainObjectBuilder.DomainObject(
// for mysql, c.database contains the database name
c => `SELECT
t.id as _domain_id,
t.*,
'test' as myField
FROM people AS t
WHERE t.id IN (?)`
);
// if you have a one-to-many to join:
domainObject.hasMany("sub_object", c => `SELECT related_table.pk as _domain_id, related_table.* from related_table where pk IN (?)`); // ? will be filled with the object primary key as laid out in the tableIdTranslation
//Advanced Settings
// if we're creating a snapshot
if (process.env.snapshot) {
ls.pipe(
// read through primary keys
snapshotter.read(snapshotSettings),
// transform keys into domain objects
dol.domainObjectTransform(domainObject),
// load into S3, toLeo, and checkpoint
snapshotter.write(context.botId, destination),
// end
err => {
callback(err);
}
);
} else {
// regular domain object load
ls.pipe(
leo.read(context.botId, source),
dol.translateIds(tableIdTranslations),
dol.domainObjectTransform(domainObject),
leo.load(context.botId, destination),
(err) => {
callback(err);
}
);
}
//End Advanced Settings
});
The query in the examples above is what builds the domain object. Here is an example of how to build a domain object and what the various parts of the query do to build it.
let query = `SELECT a.id
, a.name
, a.foo_id
, '' AS prefix_Foo
, b.id
, b.name
FROM tableOne a
JOIN tableTwo b ON (b.id = a.foo_id)
WHERE a.id IN (?)`;
{
"id": 1,
"name": "test",
"foo_id": 5,
"Foo": {
"id": 5,
"name": "bar"
}
}
Want to hire an expert, or need technical support? Reach out to the Leo team: https://leoinsights.com/contact