Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ const createVideoRoutes = require('./routes/video');
const { buildAuditLogCsv } = require('./src/utils/export/auditLogCsv');
const { buildAuditLogPdf } = require('./src/utils/export/auditLogPdf');
const { getRequestIp } = require('./src/utils/requestIp');
const { getRedisClient, closeRedisClient } = require('./src/config/redis');
const { createRateLimiter } = require('./middleware/rateLimiter');

/**
* Create the Express application with injectable services for testing.
Expand Down Expand Up @@ -54,6 +56,19 @@ function createApp(dependencies = {}) {
next();
});

// Leaky-bucket rate limiting per wallet address (requires Redis).
if (dependencies.rateLimiter) {
app.use('/api', dependencies.rateLimiter);
} else if (process.env.REDIS_URL || process.env.REDIS_HOST) {
app.use('/api', createRateLimiter({
redis: getRedisClient(),
bucketCapacity: Number(process.env.RATE_LIMIT_CAPACITY || 60),
leakRatePerSecond: Number(process.env.RATE_LIMIT_LEAK_RATE || 1),
blockDurationSeconds: Number(process.env.RATE_LIMIT_BLOCK_SECONDS || 300),
sybilThreshold: Number(process.env.SYBIL_THRESHOLD || 3),
}));
}

app.get('/', (req, res) => {
res.json({
project: 'SubStream Protocol',
Expand Down Expand Up @@ -353,6 +368,112 @@ const port = Number(process.env.PORT || 3000);

if (require.main === module) {
app.listen(port, () => console.log(`SubStream API running on port ${port}`));
const cors = require('cors');
const dotenv = require('dotenv');

// Load environment variables
dotenv.config();

const app = express();
const port = process.env.PORT || 3000;

// Middleware
app.use(cors());
app.use(express.json({ limit: "10mb" }));
app.use(express.urlencoded({ extended: true }));

// Leaky-bucket rate limiting per wallet address
if (process.env.REDIS_URL || process.env.REDIS_HOST) {
const { createRateLimiter: createRL } = require('./middleware/rateLimiter');
const { getRedisClient: getRC } = require('./src/config/redis');
app.use(createRL({
redis: getRC(),
bucketCapacity: Number(process.env.RATE_LIMIT_CAPACITY || 60),
leakRatePerSecond: Number(process.env.RATE_LIMIT_LEAK_RATE || 1),
blockDurationSeconds: Number(process.env.RATE_LIMIT_BLOCK_SECONDS || 300),
sybilThreshold: Number(process.env.SYBIL_THRESHOLD || 3),
}));
}

// Routes
app.use('/auth', require('./routes/auth'));
app.use('/content', require('./routes/content'));
app.use('/analytics', require('./routes/analytics'));
app.use('/storage', require('./routes/storage'));
app.use('/posts', require('./routes/posts'));
app.use("/auth", require("./routes/auth"));
app.use("/auth", require("./routes/stellarAuth"));
app.use("/content", require("./routes/content"));
app.use("/analytics", require("./routes/analytics"));
app.use("/storage", require("./routes/storage"));

// Health check endpoint
app.get("/health", (req, res) => {
res.json({
status: "healthy",
timestamp: new Date().toISOString(),
version: "1.0.0",
services: {
auth: 'active',
content: 'active',
analytics: 'active',
storage: 'active',
posts: 'active'
}
auth: "active",
content: "active",
analytics: "active",
storage: "active",
},
});
});

// Root endpoint
app.get("/", (req, res) => {
res.json({
project: "SubStream Protocol",
status: "Active",
contract: "CAOUX2FZ65IDC4F2X7LJJ2SVF23A35CCTZB7KVVN475JCLKTTU4CEY6L",
version: "1.0.0",
endpoints: {
auth: '/auth',
content: '/content',
analytics: '/analytics',
storage: '/storage',
posts: '/posts',
health: '/health'
}
auth: "/auth",
content: "/content",
analytics: "/analytics",
storage: "/storage",
health: "/health",
},
});
});

// Error handling middleware
app.use((err, req, res, next) => {
console.error("Unhandled error:", err);
res.status(500).json({
success: false,
error: "Internal server error",
});
});

// 404 handler
app.use('*', (req, res) => {
res.status(404).json({
success: false,
error: "Endpoint not found",
});
});

if (require.main === module) {
app.listen(port, () => {
console.log(`SubStream API running on port ${port}`);
console.log(`Health check: http://localhost:${port}/health`);
});
}

module.exports = app;
Expand Down
121 changes: 121 additions & 0 deletions middleware/rateLimiter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/**
* Express middleware – per-wallet Leaky Bucket rate limiting with Sybil flagging.
*
* Usage:
* const { createRateLimiter } = require('./middleware/rateLimiter');
* app.use('/api', createRateLimiter({ redis, bucketCapacity: 60 }));
*
* The middleware extracts the wallet address from (in priority order):
* 1. req.user.address (set by auth middleware)
* 2. req.user.publicKey (Stellar auth)
* 3. req.body.walletAddress
* 4. req.query.walletAddress || req.query.publicKey
*
* When a wallet cannot be determined the request falls through to the
* next middleware so unauthenticated routes still work.
*/

const {
LeakyBucketRateLimiter,
} = require("../src/services/leakyBucketRateLimiter");
const {
SybilAnalysisService,
} = require("../src/services/sybilAnalysisService");
const { getRequestIp } = require("../src/utils/requestIp");

/**
* Extract the wallet / public-key identifier from the request.
*
* @param {import('express').Request} req
* @returns {string|null}
*/
function extractWallet(req) {
if (req.user?.address) return req.user.address;
if (req.user?.publicKey) return req.user.publicKey;
if (req.body?.walletAddress) return req.body.walletAddress;
if (req.query?.walletAddress) return req.query.walletAddress;
if (req.query?.publicKey) return req.query.publicKey;
return null;
}

/**
* Create the rate-limiting middleware.
*
* @param {object} options
* @param {import('ioredis').Redis} options.redis Redis client instance.
* @param {number} [options.bucketCapacity=60] Max burst size.
* @param {number} [options.leakRatePerSecond=1] Tokens drained per second.
* @param {number} [options.blockDurationSeconds=300] Temp-block length after overflow.
* @param {number} [options.sybilThreshold=3] Violations before Sybil flag.
* @param {boolean} [options.skipIfNoWallet=true] Pass through when wallet is unknown.
* @returns {import('express').RequestHandler}
*/
function createRateLimiter(options = {}) {
const { redis, skipIfNoWallet = true } = options;

if (!redis) {
throw new Error("createRateLimiter requires a redis client instance");
}

const limiter = new LeakyBucketRateLimiter(redis, {
bucketCapacity: options.bucketCapacity,
leakRatePerSecond: options.leakRatePerSecond,
blockDurationSeconds: options.blockDurationSeconds,
sybilThreshold: options.sybilThreshold,
});

const sybil = new SybilAnalysisService(redis, {
flagThreshold: options.sybilThreshold,
});

return async function rateLimiterMiddleware(req, res, next) {
const wallet = extractWallet(req);

if (!wallet) {
if (skipIfNoWallet) return next();
return res.status(400).json({
success: false,
error: "Wallet address required for rate-limited endpoints",
});
}

try {
const result = await limiter.consume(wallet);

// Always attach rate-limit headers so clients can self-throttle.
res.set("X-RateLimit-Limit", String(result.capacity));
res.set(
"X-RateLimit-Remaining",
String(Math.max(0, Math.floor(result.capacity - result.currentLevel))),
);

if (result.allowed) {
return next();
}

// --- Request denied ---

// Flag for Sybil analysis when violations cross the threshold.
if (result.violations !== undefined) {
await sybil.evaluate(wallet, result.violations, {
endpoint: req.originalUrl,
ip: getRequestIp(req),
});
}

res.set("Retry-After", String(result.retryAfterSeconds));

return res.status(429).json({
success: false,
error: "Rate limit exceeded. You have been temporarily blocked.",
retryAfterSeconds: result.retryAfterSeconds,
});
} catch (err) {
// If Redis is unavailable, fail open so the API stays usable.
console.error("[RateLimiter] Redis error – failing open:", err.message);
return next();
}
};
}

module.exports = { createRateLimiter, extractWallet };
Loading
Loading