diff --git a/kubernetes.js b/kubernetes.js index d5e17e2..f38c7c4 100644 --- a/kubernetes.js +++ b/kubernetes.js @@ -9,7 +9,9 @@ const { serviceTemplate, ingressTemplate, customIngressTemplate, - persistentVolumeClaimTemplate + persistentVolumeClaimTemplate, + mqttSchemaAgentPodTemplate, + mqttSchemaAgentServiceTemplate } = require('./templates.js') /** @@ -506,6 +508,46 @@ const getStaticFileUrl = async (instance, filePath) => { return `http://${prefix}${instance.safeName}.${this._namespace}:2880/flowforge/files/_/${encodeURIComponent(filePath)}` } +const createMQTTTopicAgent = async (broker) => { + this._app.log.info(`[k8s] Starting MQTT Schema agent ${broker.hashid} for ${broker.Team.hashid}`) + const localPod = JSON.parse(JSON.stringify(mqttSchemaAgentPodTemplate)) + const localService = JSON.parse(JSON.stringify(mqttSchemaAgentServiceTemplate)) + + const namespace = this._app.config.driver.options.projectNamespace || 'flowforge' + + const { token } = await broker.refreshAuthTokens() + localPod.spec.containers[0].env.push({ name: 'FORGE_TEAM_TOKEN', value: token }) + localPod.spec.containers[0].env.push({ name: 'FORGE_URL', value: this._app.config.api_url }) + localPod.spec.containers[0].env.push({ name: 'FORGE_BROKER_ID', value: broker.hashid }) + localPod.spec.containers[0].env.push({ name: 'FORGE_TEAM_ID', value: broker.Team.hashid }) + + localPod.metadata.name = `mqtt-schema-agent-${broker.Team.hashid.toLowerCase()}-${broker.hashid.toLowerCase()}` + localPod.metadata.labels = { + name: `mqtt-schema-agent-${broker.Team.hashid.toLowerCase()}-${broker.hashid.toLowerCase()}`, + team: broker.Team.hashid, + broker: broker.hashid + } + localService.metadata.name = `mqtt-schema-agent-${broker.Team.hashid.toLowerCase()}-${broker.hashid.toLowerCase()}` + localService.metadata.labels = { + team: broker.Team.hashid, + broker: broker.hashid + } + localService.spec.selector.name = `mqtt-schema-agent-${broker.Team.hashid.toLowerCase()}-${broker.hashid.toLowerCase()}` + + // TODO remove registry entry + localPod.spec.containers[0].image = this._app.config.driver.options?.mqttSchemaContainer || `${this._app.config.driver.options.registry ? this._app.config.driver.options.registry + '/' : ''}flowfuse/mqtt-schema-agent` + + // console.log(JSON.stringify(localPod,null,2)) + // console.log(JSON.stringify(localService,null,2)) + try { + await this._k8sApi.createNamespacedPod(namespace, localPod) + await this._k8sApi.createNamespacedService(namespace, localService) + } catch (err) { + this._app.log.error(`[k8s] Problem creating MQTT Agent ${broker.hashid} - ${err.toString()}`) + console.log(err) + } +} + module.exports = { /** * Initialises this driver @@ -561,6 +603,11 @@ module.exports = { } }) + // get list of all MQTTBrokers + const brokers = await app.db.models.BrokerCredentials.findAll({ + include: [{ model: app.db.models.Team }] + }) + this._initialCheckTimeout = setTimeout(() => { this._app.log.debug('[k8s] Restarting projects') const namespace = this._namespace @@ -623,6 +670,22 @@ module.exports = { this._app.log.error(`[k8s] Instance ${project.id} - error resuming project: ${err.stack}`) } }) + + // Check restarting MQTT-Schema-Agent + brokers.forEach(async (broker) => { + if (broker.Team) { + try { + this._app.log.info(`[k8s] Testing MQTT Agent ${broker.hashid} in ${namespace} pod exists`) + this._app.log.debug(`mqtt-schema-agent-${broker.Team.hashid.toLowerCase()}-${broker.hashid.toLowerCase()}`) + await this._k8sApi.readNamespacedPodStatus(`mqtt-schema-agent-${broker.Team.hashid.toLowerCase()}-${broker.hashid.toLowerCase()}`, namespace) + this._app.log.info(`[k8s] MQTT Agent pod ${broker.hashid} in ${namespace} found`) + } catch (err) { + this._app.log.debug(`[k8s] MQTT Agent ${broker.hashid} - failed ${err.toString()}`) + this._app.log.debug(`[k8s] MQTT Agent ${broker.hashid} - recreating pod`) + await createMQTTTopicAgent(broker) + } + } + }) }, 1000) // need to work out what we can expose for K8s @@ -1183,5 +1246,41 @@ module.exports = { err.statusCode = err.response.statusCode throw err } + }, + + // Broker Agent + startBrokerAgent: async (broker) => { + createMQTTTopicAgent(broker) + }, + stopBrokerAgent: async (broker) => { + try { + await this._k8sApi.deleteNamespacedService(`mqtt-schema-agent-${broker.Team.hashid.toLowerCase()}-${broker.hashid.toLowerCase()}`, this._namespace) + await this._k8sApi.deleteNamespacedPod(`mqtt-schema-agent-${broker.Team.hashid.toLowerCase()}-${broker.hashid.toLowerCase()}`, this._namespace) + } catch (err) { + this._app.log.error(`[k8s] Error deleting MQTT Agent ${broker.hashid}: ${err.toString()} ${err.statusCode}`) + } + }, + getBrokerAgentState: async (broker) => { + try { + const status = got.get(`http://mqtt-schema-agent-${broker.Team.hashid.toLowerCase()}-${broker.hashid.toLowerCase()}.${this._namespace}:3500/api/v1/status`).json() + return status + } catch (err) { + return { error: 'error_getting_status', message: err.toString() } + } + }, + sendBrokerAgentCommand: async (broker, command) => { + if (command === 'start' || command === 'restart') { + try { + await got.post(`http://mqtt-schema-agent-${broker.Team.hashid.toLowerCase()}-${broker.hashid.toLowerCase()}.${this._namespace}:3500/api/v1/commands/start`) + } catch (err) { + + } + } else if (command === 'stop') { + try { + await got.post(`http://mqtt-schema-agent-${broker.Team.hashid.toLowerCase()}-${broker.hashid.toLowerCase()}.${this._namespace}:3500/api/v1/commands/stop`) + } catch (err) { + + } + } } } diff --git a/templates.js b/templates.js index d99fe1d..686dff3 100644 --- a/templates.js +++ b/templates.js @@ -162,10 +162,67 @@ const persistentVolumeClaimTemplate = { } } +const mqttSchemaAgentPodTemplate = { + apiVersion: 'v1', + kind: 'Pod', + metadata: { + + }, + spec: { + containers: [ + { + name: 'mqtt-schema-agent', + // image: 'flowfuse/mqtt-schema-agent', + imagePullPolicy: 'Always', + securityContext: { + allowPrivilegeEscalation: false + }, + env: [ + { name: 'TZ', value: 'Europe/London' } + ], + ports: [ + { name: 'web', containerPort: 3500, protocol: 'TCP' } + ], + resources: { + requests: { + // 10th of a core + cpu: '100m', + memory: '128Mi' + }, + limits: { + cpu: '100m', + memory: '128Mi' + } + } + } + ] + }, + enableServiceLinks: false +} + +const mqttSchemaAgentServiceTemplate = { + apiVersion: 'v1', + kind: 'Service', + metadata: { + // name: "k8s-client-test-service" + }, + spec: { + type: 'ClusterIP', + selector: { + // name: "k8s-client-test" + }, + ports: [ + { name: 'web', port: 3500, protocol: 'TCP' } + ] + } +} + module.exports = { deploymentTemplate, serviceTemplate, ingressTemplate, customIngressTemplate, - persistentVolumeClaimTemplate + persistentVolumeClaimTemplate, + mqttSchemaAgentPodTemplate, + mqttSchemaAgentServiceTemplate }