Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mqtt schema agent #198

Merged
merged 18 commits into from
Feb 10, 2025
101 changes: 100 additions & 1 deletion kubernetes.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ const {
serviceTemplate,
ingressTemplate,
customIngressTemplate,
persistentVolumeClaimTemplate
persistentVolumeClaimTemplate,
mqttSchemaAgentPodTemplate,
mqttSchemaAgentServiceTemplate
} = require('./templates.js')

/**
Expand Down Expand Up @@ -506,6 +508,46 @@ const getStaticFileUrl = async (instance, filePath) => {
return `http://${prefix}${instance.safeName}.${this._namespace}:2880/flowforge/files/_/${encodeURIComponent(filePath)}`
}

const createMQTTTopicAgent = async (broker) => {
this._app.log.info(`[k8s] Starting MQTT Schema agent ${broker.hashid} for ${broker.Team.hashid}`)
const localPod = JSON.parse(JSON.stringify(mqttSchemaAgentPodTemplate))
const localService = JSON.parse(JSON.stringify(mqttSchemaAgentServiceTemplate))

const namespace = this._app.config.driver.options.projectNamespace || 'flowforge'

const { token } = await broker.refreshAuthTokens()
localPod.spec.containers[0].env.push({ name: 'FORGE_TEAM_TOKEN', value: token })
localPod.spec.containers[0].env.push({ name: 'FORGE_URL', value: this._app.config.api_url })
localPod.spec.containers[0].env.push({ name: 'FORGE_BROKER_ID', value: broker.hashid })
localPod.spec.containers[0].env.push({ name: 'FORGE_TEAM_ID', value: broker.Team.hashid })

localPod.metadata.name = `mqtt-schema-agent-${broker.Team.hashid.toLowerCase()}-${broker.hashid.toLowerCase()}`
localPod.metadata.labels = {
name: `mqtt-schema-agent-${broker.Team.hashid.toLowerCase()}-${broker.hashid.toLowerCase()}`,
team: broker.Team.hashid,
broker: broker.hashid
}
localService.metadata.name = `mqtt-schema-agent-${broker.Team.hashid.toLowerCase()}-${broker.hashid.toLowerCase()}`
localService.metadata.labels = {
team: broker.Team.hashid,
broker: broker.hashid
}
localService.spec.selector.name = `mqtt-schema-agent-${broker.Team.hashid.toLowerCase()}-${broker.hashid.toLowerCase()}`

// TODO remove registry entry
localPod.spec.containers[0].image = this._app.config.driver.options?.mqttSchemaContainer || `${this._app.config.driver.options.registry ? this._app.config.driver.options.registry + '/' : ''}flowfuse/mqtt-schema-agent`

// console.log(JSON.stringify(localPod,null,2))
// console.log(JSON.stringify(localService,null,2))
try {
await this._k8sApi.createNamespacedPod(namespace, localPod)
await this._k8sApi.createNamespacedService(namespace, localService)
} catch (err) {
this._app.log.error(`[k8s] Problem creating MQTT Agent ${broker.hashid} - ${err.toString()}`)
console.log(err)
}
}

module.exports = {
/**
* Initialises this driver
Expand Down Expand Up @@ -561,6 +603,11 @@ module.exports = {
}
})

// get list of all MQTTBrokers
const brokers = await app.db.models.BrokerCredentials.findAll({
include: [{ model: app.db.models.Team }]
})

this._initialCheckTimeout = setTimeout(() => {
this._app.log.debug('[k8s] Restarting projects')
const namespace = this._namespace
Expand Down Expand Up @@ -623,6 +670,22 @@ module.exports = {
this._app.log.error(`[k8s] Instance ${project.id} - error resuming project: ${err.stack}`)
}
})

// Check restarting MQTT-Schema-Agent
brokers.forEach(async (broker) => {
if (broker.Team) {
try {
this._app.log.info(`[k8s] Testing MQTT Agent ${broker.hashid} in ${namespace} pod exists`)
this._app.log.debug(`mqtt-schema-agent-${broker.Team.hashid.toLowerCase()}-${broker.hashid.toLowerCase()}`)
await this._k8sApi.readNamespacedPodStatus(`mqtt-schema-agent-${broker.Team.hashid.toLowerCase()}-${broker.hashid.toLowerCase()}`, namespace)
this._app.log.info(`[k8s] MQTT Agent pod ${broker.hashid} in ${namespace} found`)
} catch (err) {
this._app.log.debug(`[k8s] MQTT Agent ${broker.hashid} - failed ${err.toString()}`)
this._app.log.debug(`[k8s] MQTT Agent ${broker.hashid} - recreating pod`)
await createMQTTTopicAgent(broker)
}
}
})
}, 1000)

// need to work out what we can expose for K8s
Expand Down Expand Up @@ -1183,5 +1246,41 @@ module.exports = {
err.statusCode = err.response.statusCode
throw err
}
},

// Broker Agent
startBrokerAgent: async (broker) => {
createMQTTTopicAgent(broker)
},
stopBrokerAgent: async (broker) => {
try {
await this._k8sApi.deleteNamespacedService(`mqtt-schema-agent-${broker.Team.hashid.toLowerCase()}-${broker.hashid.toLowerCase()}`, this._namespace)
await this._k8sApi.deleteNamespacedPod(`mqtt-schema-agent-${broker.Team.hashid.toLowerCase()}-${broker.hashid.toLowerCase()}`, this._namespace)
} catch (err) {
this._app.log.error(`[k8s] Error deleting MQTT Agent ${broker.hashid}: ${err.toString()} ${err.statusCode}`)
}
},
getBrokerAgentState: async (broker) => {
try {
const status = got.get(`http://mqtt-schema-agent-${broker.Team.hashid.toLowerCase()}-${broker.hashid.toLowerCase()}.${this._namespace}:3500/api/v1/status`).json()
return status
} catch (err) {
return { error: 'error_getting_status', message: err.toString() }
}
},
sendBrokerAgentCommand: async (broker, command) => {
if (command === 'start' || command === 'restart') {
try {
await got.post(`http://mqtt-schema-agent-${broker.Team.hashid.toLowerCase()}-${broker.hashid.toLowerCase()}.${this._namespace}:3500/api/v1/commands/start`)
} catch (err) {

}
} else if (command === 'stop') {
try {
await got.post(`http://mqtt-schema-agent-${broker.Team.hashid.toLowerCase()}-${broker.hashid.toLowerCase()}.${this._namespace}:3500/api/v1/commands/stop`)
} catch (err) {

}
}
}
}
59 changes: 58 additions & 1 deletion templates.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,67 @@ const persistentVolumeClaimTemplate = {
}
}

const mqttSchemaAgentPodTemplate = {
apiVersion: 'v1',
kind: 'Pod',
metadata: {

},
spec: {
containers: [
{
name: 'mqtt-schema-agent',
// image: 'flowfuse/mqtt-schema-agent',
imagePullPolicy: 'Always',
securityContext: {
allowPrivilegeEscalation: false
},
env: [
{ name: 'TZ', value: 'Europe/London' }
],
ports: [
{ name: 'web', containerPort: 3500, protocol: 'TCP' }
],
resources: {
requests: {
// 10th of a core
cpu: '100m',
memory: '128Mi'
},
limits: {
cpu: '100m',
memory: '128Mi'
}
}
}
]
},
enableServiceLinks: false
}

const mqttSchemaAgentServiceTemplate = {
apiVersion: 'v1',
kind: 'Service',
metadata: {
// name: "k8s-client-test-service"
},
spec: {
type: 'ClusterIP',
selector: {
// name: "k8s-client-test"
},
ports: [
{ name: 'web', port: 3500, protocol: 'TCP' }
]
}
}

module.exports = {
deploymentTemplate,
serviceTemplate,
ingressTemplate,
customIngressTemplate,
persistentVolumeClaimTemplate
persistentVolumeClaimTemplate,
mqttSchemaAgentPodTemplate,
mqttSchemaAgentServiceTemplate
}