Skip to content

Commit

Permalink
add usbcamera-dmi
Browse files Browse the repository at this point in the history
Signed-off-by: luomengY <[email protected]>
  • Loading branch information
luomengY committed Dec 25, 2023
1 parent 47e60b6 commit 5865e78
Show file tree
Hide file tree
Showing 62 changed files with 8,867 additions and 4 deletions.
20 changes: 20 additions & 0 deletions mappers/usbcamera-dmi/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM golang:1.17-alpine AS builder

WORKDIR /build

ENV GO111MODULE=on \
GOPROXY=https://goproxy.cn,direct

COPY . .

RUN CGO_ENABLED=0 GOOS=linux go build -gcflags "all=-N -l" -o main cmd/main.go


FROM ubuntu:16.04

RUN mkdir -p kubeedge

COPY --from=builder /build/main kubeedge/
COPY ./config.yaml kubeedge/

WORKDIR kubeedge
34 changes: 34 additions & 0 deletions mappers/usbcamera-dmi/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
SHELL := /bin/bash

curr_dir := $(patsubst %/,%,$(dir $(abspath $(lastword $(MAKEFILE_LIST)))))
rest_args := $(wordlist 2, $(words $(MAKECMDGOALS)), $(MAKECMDGOALS))
$(eval $(rest_args):;@:)

help:
#
# Usage:
# make generate : generate a mapper based on a template.
# make mapper {mapper-name} <action> <parameter>: execute mapper building process.
#
# Actions:
# - mod, m : download code dependencies.
# - lint, l : verify code via go fmt and `golangci-lint`.
# - build, b : compile code.
# - package, p : package docker image.
# - clean, c : clean output binary.
#
# Parameters:
# ARM : true or undefined
# ARM64 : true or undefined
#
# Example:
# - make mapper modbus ARM64=true : execute `build` "modbus" mapper for ARM64.
# - make mapper modbus test : execute `test` "modbus" mapper.
@echo

make_rules := $(shell ls $(curr_dir)/hack/make-rules | sed 's/.sh//g')
$(make_rules):
@$(curr_dir)/hack/make-rules/$@.sh $(rest_args)

.DEFAULT_GOAL := help
.PHONY: $(make_rules) build test package
67 changes: 67 additions & 0 deletions mappers/usbcamera-dmi/cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package main

import (
"errors"
"os"

"k8s.io/klog/v2"

"github.com/kubeedge/usb/device"

Check failure on line 9 in mappers/usbcamera-dmi/cmd/main.go

View workflow job for this annotation

GitHub Actions / Multiple build

no required module provides package github.com/kubeedge/usb/device; to add it:
"github.com/kubeedge/usb/pkg/common"

Check failure on line 10 in mappers/usbcamera-dmi/cmd/main.go

View workflow job for this annotation

GitHub Actions / Multiple build

no required module provides package github.com/kubeedge/usb/pkg/common; to add it:
"github.com/kubeedge/usb/pkg/config"

Check failure on line 11 in mappers/usbcamera-dmi/cmd/main.go

View workflow job for this annotation

GitHub Actions / Multiple build

no required module provides package github.com/kubeedge/usb/pkg/config; to add it:
"github.com/kubeedge/usb/pkg/grpcserver"

Check failure on line 12 in mappers/usbcamera-dmi/cmd/main.go

View workflow job for this annotation

GitHub Actions / Multiple build

no required module provides package github.com/kubeedge/usb/pkg/grpcserver; to add it:
"github.com/kubeedge/usb/pkg/httpserver"

Check failure on line 13 in mappers/usbcamera-dmi/cmd/main.go

View workflow job for this annotation

GitHub Actions / Multiple build

no required module provides package github.com/kubeedge/usb/pkg/httpserver; to add it:
"github.com/kubeedge/usb/pkg/util/grpcclient"

Check failure on line 14 in mappers/usbcamera-dmi/cmd/main.go

View workflow job for this annotation

GitHub Actions / Multiple build

no required module provides package github.com/kubeedge/usb/pkg/util/grpcclient; to add it:
"github.com/kubeedge/usb/pkg/util/parse"

Check failure on line 15 in mappers/usbcamera-dmi/cmd/main.go

View workflow job for this annotation

GitHub Actions / Multiple build

no required module provides package github.com/kubeedge/usb/pkg/util/parse; to add it:
)

func main() {
var err error
var c config.Config

klog.InitFlags(nil)
defer klog.Flush()

if err = c.Parse(); err != nil {
klog.Fatal(err)
os.Exit(1)
}
klog.Infof("config: %+v", c)

grpcclient.Init(&c)

// start grpc server
grpcServer := grpcserver.NewServer(
grpcserver.Config{
SockPath: c.GrpcServer.SocketPath,
Protocol: common.ProtocolCustomized,
},
device.NewDevPanel(),
)

panel := device.NewDevPanel()
err = panel.DevInit(&c)
if err != nil && !errors.Is(err, parse.ErrEmptyData) {
klog.Fatal(err)
}
klog.Infoln("devInit finished")

// register to edgecore
// if dev init mode is register, mapper's dev will init when registry to edgecore
if c.DevInit.Mode != common.DevInitModeRegister {
klog.Infoln("======dev init mode is not register, will register to edgecore")
if _, _, err = grpcclient.RegisterMapper(&c, false); err != nil {
klog.Fatal(err)
}
klog.Infoln("registerMapper finished")
}
go panel.DevStart()

httpServer := httpserver.NewRestServer(panel)
go httpServer.StartServer()

defer grpcServer.Stop()
if err = grpcServer.Start(); err != nil {
klog.Fatal(err)
}
}
12 changes: 12 additions & 0 deletions mappers/usbcamera-dmi/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
grpc_server:
socket_path: /etc/kubeedge/usb.sock
common:
name: Usb-mapper
version: v1.13.0
api_version: v1.0.0
protocol: camera-usb # TODO add your protocol name
address: 127.0.0.1
edgecore_sock: /etc/kubeedge/dmi.sock
dev_init:
mode: register

115 changes: 115 additions & 0 deletions mappers/usbcamera-dmi/data/dbmethod/influxdb2/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package influxdb2

import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
"time"

"k8s.io/klog/v2"

influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/kubeedge/usb/pkg/common"
)

type DataBaseConfig struct {
Influxdb2ClientConfig *Influxdb2ClientConfig `json:"influxdb2ClientConfig,omitempty"`
Influxdb2DataConfig *Influxdb2DataConfig `json:"influxdb2DataConfig,omitempty"`
}

type Influxdb2ClientConfig struct {
Url string `json:"url,omitempty"`
Org string `json:"org,omitempty"`
Bucket string `json:"bucket,omitempty"`
}

type Influxdb2DataConfig struct {
Measurement string `json:"measurement,omitempty"`
Tag map[string]string `json:"tag,omitempty"`
FieldKey string `json:"fieldKey,omitempty"`
}

func NewDataBaseClient(clientConfig json.RawMessage, dataConfig json.RawMessage) (*DataBaseConfig, error) {
// parse influx database config data
influxdb2ClientConfig := new(Influxdb2ClientConfig)
influxdb2DataConfig := new(Influxdb2DataConfig)
err := json.Unmarshal(clientConfig, influxdb2ClientConfig)
if err != nil {
return nil, err
}
err = json.Unmarshal(dataConfig, influxdb2DataConfig)
if err != nil {
return nil, err
}
return &DataBaseConfig{
Influxdb2ClientConfig: influxdb2ClientConfig,
Influxdb2DataConfig: influxdb2DataConfig,
}, nil
}

func (d *DataBaseConfig) InitDbClient() influxdb2.Client {
var usrtoken string
usrtoken = os.Getenv("TOKEN")
client := influxdb2.NewClient(d.Influxdb2ClientConfig.Url, usrtoken)

return client
}

func (d *DataBaseConfig) CloseSession(client influxdb2.Client) {
client.Close()
}

func (d *DataBaseConfig) AddData(data *common.DataModel, client influxdb2.Client) error {
// write device data to influx database
orgName := d.Influxdb2ClientConfig.Org
bucketName := d.Influxdb2ClientConfig.Bucket
ctx := context.Background()
errMark := "not found"
org, err := client.OrganizationsAPI().FindOrganizationByName(ctx, orgName)
if err != nil {
if strings.Contains(err.Error(), errMark) {
org, err := client.OrganizationsAPI().CreateOrganizationWithName(ctx, orgName)
if err != nil {
return fmt.Errorf("create organization '%s' faild with err:%v", orgName, err)
}
_, err = client.BucketsAPI().CreateBucketWithName(ctx, org, bucketName)
if err != nil {
return fmt.Errorf("create bucket '%s' faild with err:%v", bucketName, err)
}
} else {
return fmt.Errorf("find organization '%s' faild with err:%v", orgName, err)
}
} else {
buckets, err := client.BucketsAPI().FindBucketsByOrgName(ctx, orgName)
if err != nil {
return fmt.Errorf("find buckets faild with err:%v", err)
}
var flag = false
for _, bucket := range *buckets {
if bucket.Name == bucketName {
flag = true
break
}
}
if !flag {
_, err = client.BucketsAPI().CreateBucketWithName(ctx, org, bucketName)
if err != nil {
return fmt.Errorf("create bucket '%s' faild with err:%v", bucketName, err)
}
}
}
writeAPI := client.WriteAPIBlocking(d.Influxdb2ClientConfig.Org, d.Influxdb2ClientConfig.Bucket)
p := influxdb2.NewPoint(d.Influxdb2DataConfig.Measurement,
d.Influxdb2DataConfig.Tag,
map[string]interface{}{d.Influxdb2DataConfig.FieldKey: data.Value},
time.Now())
// write point immediately
err = writeAPI.WritePoint(context.Background(), p)
if err != nil {
klog.V(4).Info("Exit AddData")
return err
}
return nil
}
48 changes: 48 additions & 0 deletions mappers/usbcamera-dmi/data/dbmethod/redis/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package redis

import (
"github.com/kubeedge/usb/pkg/common"
"github.com/kubeedge/usb/pkg/global"
)

type DataBaseConfig struct {
}

func NewDataBaseClient() (global.DataBaseClient, error) {
return &DataBaseConfig{}, nil
}

func (d *DataBaseConfig) InitDbClient() error {
//TODO implement me
panic("implement me")
}

func (d *DataBaseConfig) CloseSession() {
//TODO implement me
panic("implement me")
}

func (d *DataBaseConfig) AddData(data *common.DataModel) {
//TODO implement me
panic("implement me")
}

func (d *DataBaseConfig) GetDataByDeviceName(deviceName string) ([]*common.DataModel, error) {
//TODO implement me
panic("implement me")
}

func (d *DataBaseConfig) GetPropertyDataByDeviceName(deviceName string, propertyData string) ([]*common.DataModel, error) {
//TODO implement me
panic("implement me")
}

func (d *DataBaseConfig) GetDataByTimeRange(start int64, end int64) ([]*common.DataModel, error) {
//TODO implement me
panic("implement me")
}

func (d *DataBaseConfig) DeleteDataByTimeRange(start int64, end int64) ([]*common.DataModel, error) {
//TODO implement me
panic("implement me")
}
73 changes: 73 additions & 0 deletions mappers/usbcamera-dmi/data/publish/http/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package http

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
"time"

"k8s.io/klog/v2"

"github.com/kubeedge/usb/pkg/common"
"github.com/kubeedge/usb/pkg/global"
)

type PushMethod struct {
HTTP *HTTPConfig `json:"http"`
}

type HTTPConfig struct {
HostName string `json:"hostName,omitempty"`
Port int `json:"port,omitempty"`
RequestPath string `json:"requestPath,omitempty"`
Timeout int `json:"timeout,omitempty"`
}

func NewDataPanel(config json.RawMessage) (global.DataPanel, error) {
httpConfig := new(HTTPConfig)
err := json.Unmarshal(config, httpConfig)
if err != nil {
return nil, err
}
return &PushMethod{
HTTP: httpConfig,
}, nil
}

func (pm *PushMethod) InitPushMethod() error {
klog.V(1).Info("Init HTTP")
return nil
}

func (pm *PushMethod) Push(data *common.DataModel) {
klog.V(2).Info("Publish device data by HTTP")

targetUrl := pm.HTTP.HostName + ":" + strconv.Itoa(pm.HTTP.Port) + pm.HTTP.RequestPath
payload := data.PropertyName + "=" + data.Value
formatTimeStr := time.Unix(data.TimeStamp/1e3, 0).Format("2006-01-02 15:04:05")
currentTime := "&time" + "=" + formatTimeStr
payload += currentTime

klog.V(3).Infof("Publish %v to %s", payload, targetUrl)

resp, err := http.Post(targetUrl,
"application/x-www-form-urlencoded",
strings.NewReader(payload))

if err != nil {
fmt.Println(err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
// handle error
klog.Errorf("Publish device data by HTTP failed, err = %v", err)
return
}
klog.V(1).Info("############### Message published. ###############")
klog.V(3).Infof("HTTP reviced %s", string(body))

}
Loading

0 comments on commit 5865e78

Please sign in to comment.