Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,61 @@ Kue based job queue for sails v1.1.0+. Its a wrapper around [**Kue**](https://au

[**Kue**](https://automattic.github.io/kue/) (it will install itself)

[**ioredis**](https://github.com/redis/ioredis) (required for TLS / `rediss://` connections — install as a peer dependency: `npm install ioredis`)

### Installation and Setup guide

Please check [this blog](https://www.logisticinfotech.com/blog/easiest-way-to-create-job-queue-in-sails-with-sails-hook-job-queue/) for step by step guide.

### Redis Configuration

Set `sails.config.redis_url` (env: `sails_redis_url`) to your Redis connection string:

```js
// config/env/production.js
module.exports = {
redis_url: 'rediss://user:password@host:port/0',
};
```

#### Connection Modes

| URL scheme | `redis_cluster_mode` | Behavior |
|------------|---------------------|----------|
| `redis://` | (ignored) | Plain Redis — URL passed directly to kue |
| `rediss://` | unset / `false` (default) | **Standalone ioredis with TLS** — correct for most managed providers |
| `rediss://` | `true` | **ioredis Cluster with TLS** — for clustered deployments only |

**Default behavior for `rediss://` is standalone mode.** This works with:

- DigitalOcean Managed Valkey / Redis
- Redis Cloud (Essentials & Pro non-cluster)
- Upstash
- AWS ElastiCache (cluster mode disabled)
- Azure Cache for Redis (Basic / Standard)

Only enable cluster mode if your provider explicitly runs Redis in cluster topology:

```js
// config/env/production.js
module.exports = {
redis_url: 'rediss://user:password@host:port/0',
redis_cluster_mode: true, // Only for clustered deployments (e.g., Azure Premium with clustering)
};
```

#### TLS Certificate Verification

By default, TLS connections verify server certificates. To disable verification (e.g., self-signed certs in development):

```js
module.exports = {
redis_url: 'rediss://...',
redis_tls_reject_unauthorized: false, // Default: true
};
```

### Breaking Changes in 0.1.0

- **`rediss://` URLs now default to standalone mode** instead of cluster mode. If you were relying on the previous behavior where all `rediss://` connections used `ioredis.Cluster`, set `sails.config.redis_cluster_mode = true` to restore it.
- **`rejectUnauthorized` now defaults to `true`** for TLS connections. Set `sails.config.redis_tls_reject_unauthorized = false` if you need the old behavior.
204 changes: 130 additions & 74 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,94 +35,150 @@ module.exports = function jobqueue(sails) {
}
};

/**
* Attach kue-specific methods that the kue library expects on the client.
* Required for both standalone and cluster Redis clients.
*/
function attachKueMethods(client) {
client.prefix = 'q';
client.getKey = function(key) {
return this.prefix + ':' + key;
};
client.createFIFO = function(id) {
var idLen = '' + id.toString().length;
var len = 2 - idLen.length;
while (len--) idLen = '0' + idLen;
return idLen + '|' + id;
};
client.stripFIFO = function(zid) {
if (typeof zid === 'string') {
return +zid.substr(zid.indexOf('|') + 1);
}
return zid;
};
return client;
}

/**
* Create a standalone ioredis client with TLS for rediss:// URLs.
* Works with: DO Managed Valkey, Redis Cloud, Upstash,
* AWS ElastiCache (non-cluster), Azure Redis Basic/Standard.
*/
function createStandaloneClient(redis_url) {
sails.log.info('Kue Redis: using ioredis standalone with TLS');

var client = new Redis(redis_url, {
tls: {
rejectUnauthorized: sails.config.redis_tls_reject_unauthorized !== false
},
maxRetriesPerRequest: 3,
enableOfflineQueue: true,
enableReadyCheck: true
});

client.on('ready', function() {
sails.log.info('Kue Redis standalone ready');
});
client.on('connect', function() {
sails.log.info('Kue Redis standalone connected');
});
client.on('error', function(err) {
sails.log.error('Kue Redis standalone error:', err);
});

return attachKueMethods(client);
}

/**
* Create an ioredis Cluster client for clustered Redis/Valkey deployments.
* Works with: Azure Redis Premium (clustering enabled),
* AWS ElastiCache (cluster mode enabled).
*/
function createClusterClient(redis_url) {
var urlObj = new URL(redis_url);

sails.log.info('Kue Redis: using ioredis Cluster with TLS');

var client = new Redis.Cluster([
{
host: urlObj.hostname,
port: parseInt(urlObj.port) || 6380
}
], {
redisOptions: {
password: urlObj.password ? decodeURIComponent(urlObj.password) : '',
tls: {
rejectUnauthorized: sails.config.redis_tls_reject_unauthorized !== false
}
},
enableOfflineQueue: true,
maxRetriesPerRequest: 3,
clusterRetryStrategy: function(times) {
if (times > 10) {
return null;
}
return Math.min(times * 50, 1000);
},
enableReadyCheck: true
});

client.on('ready', function() {
sails.log.info('Kue Redis Cluster ready');
});
client.on('connect', function() {
sails.log.info('Kue Redis Cluster connected');
});
client.on('error', function(err) {
if (err.message && !err.message.includes('MOVED') && !err.message.includes('ASK')) {
sails.log.error('Kue Redis Cluster error:', err);
}
});

return attachKueMethods(client);
}

function initJobQueue() {
// Create job queue on Jobs service
var processors = Jobs._processors;
var redis_url = sails.config.redis_url ? sails.config.redis_url : 'redis://127.0.0.1:6379';

// Handle SSL connections (rediss://)
// Determine Redis connection mode:
// sails.config.redis_cluster_mode = true → use ioredis.Cluster (opt-in)
// sails.config.redis_cluster_mode = false or unset → use standalone ioredis
//
// TLS is determined by the URL scheme (rediss:// vs redis://), NOT by topology.
// Cluster mode is orthogonal to TLS — most managed Redis/Valkey providers use
// TLS without clustering (DO Valkey, Redis Cloud Essentials, Upstash, Azure Basic/Standard).
var useCluster = sails.config.redis_cluster_mode === true || sails.config.redis_cluster_mode === 'true';
var redisConfig;

if (redis_url.startsWith('rediss://')) {
try {
if (!Redis || !Redis.Cluster) {
throw new Error('ioredis is required for SSL Redis connections. Please install: npm install ioredis');
if (!Redis) {
sails.log.error('Kue Redis: ioredis is required for rediss:// (TLS) connections. Install: npm install ioredis');
// Fallback to non-SSL (will likely fail but better than crashing without explanation)
redisConfig = redis_url.replace('rediss://', 'redis://');
} else if (useCluster) {
// Cluster mode — opt-in for clustered Redis deployments
try {
redisConfig = {
createClientFactory: function() {
return createClusterClient(redis_url);
}
};
} catch (e) {
sails.log.error('Kue Redis: Cluster client creation failed:', e.message);
redisConfig = redis_url.replace('rediss://', 'redis://');
}

} else {
// Standalone mode — default for non-clustered TLS providers
redisConfig = {
createClientFactory: function() {
var urlObj = new URL(redis_url);

sails.log.info('Using ioredis Cluster for Azure Redis Cache with SSL');

// Use Redis.Cluster - it handles MOVED redirects automatically and uses callbacks natively
var client = new Redis.Cluster([
{
host: urlObj.hostname,
port: parseInt(urlObj.port) || 6380
}
], {
redisOptions: {
password: urlObj.password ? decodeURIComponent(urlObj.password) : '',
tls: {
rejectUnauthorized: false // Set to true in production with proper certificates
}
},
// Cluster options
enableOfflineQueue: true, // Allow commands to queue while cluster is connecting
maxRetriesPerRequest: 3,
clusterRetryStrategy: function(times) {
if (times > 10) {
return null;
}
return Math.min(times * 50, 1000);
},
enableReadyCheck: true
});

// Set up event listeners
client.on('ready', function() {
sails.log.info('Kue Redis Cluster ready');
});
client.on('connect', function() {
sails.log.info('Kue Redis Cluster connected');
});
client.on('error', function(err) {
// Don't log MOVED/ASK errors as they're handled automatically
if (err.message && !err.message.includes('MOVED') && !err.message.includes('ASK')) {
sails.log.error('Kue Redis Cluster error:', err);
}
});

// Add kue-specific methods that it expects
client.prefix = 'q';
client.getKey = function(key) {
return this.prefix + ':' + key;
};
client.createFIFO = function(id) {
var idLen = '' + id.toString().length;
var len = 2 - idLen.length;
while (len--) idLen = '0' + idLen;
return idLen + '|' + id;
};
client.stripFIFO = function(zid) {
if (typeof zid === 'string') {
return +zid.substr(zid.indexOf('|') + 1);
}
return zid;
};

// Return the client directly - ioredis uses callbacks natively which is what kue expects
return client;
return createStandaloneClient(redis_url);
}
};

sails.log.info('Kue Redis SSL: Using ioredis Cluster (simplified)');
} catch (e) {
sails.log.error('Error setting up Redis SSL client:', e);
// Fallback to non-SSL (will likely fail but better than crashing)
redisConfig = redis_url.replace('rediss://', 'redis://');
}
} else {
// Plain redis:// — pass URL directly, no TLS
redisConfig = redis_url;
}

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@logisticinfotech/sails-hook-job-queue",
"version": "0.0.12",
"version": "0.1.0",
"description": "sails hook for job-queue with kue",
"main": "index.js",
"scripts": {
Expand Down