Skip to content

Commit

Permalink
update mapper
Browse files Browse the repository at this point in the history
Signed-off-by: wbc6080 <[email protected]>
  • Loading branch information
wbc6080 committed Apr 25, 2024
1 parent 73b6e63 commit dec698e
Show file tree
Hide file tree
Showing 16 changed files with 324 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ ENV GO111MODULE=on \

COPY . .

RUN CGO_ENABLED=0 GOOS=linux go build -tags Nostream -o main cmd/main.go # If you want to enable stream data processing, you need to change the tag to stream.
RUN CGO_ENABLED=0 GOOS=linux go build -o main cmd/main.go


FROM ubuntu:18.04
Expand All @@ -17,4 +17,4 @@ RUN mkdir -p kubeedge
COPY --from=builder /build/main kubeedge/
COPY ./config.yaml kubeedge/

WORKDIR kubeedge
WORKDIR kubeedge
35 changes: 35 additions & 0 deletions mappers/kubeedge-v1.17.0/onvif-mapper/Dockerfile_stream
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
FROM golang:1.20.10-bullseye AS builder

WORKDIR /build

ENV GO111MODULE=on \
GOPROXY=https://goproxy.cn,direct

COPY . .

RUN apt-get update && \
apt-get install -y bzip2 curl upx-ucl gcc-aarch64-linux-gnu libc6-dev-arm64-cross gcc-arm-linux-gnueabi libc6-dev-armel-cross libva-dev libva-drm2 libx11-dev libvdpau-dev libxext-dev libsdl1.2-dev libxcb1-dev libxau-dev libxdmcp-dev yasm

RUN curl -sLO https://ffmpeg.org/releases/ffmpeg-4.1.6.tar.bz2 && \
tar -jx --strip-components=1 -f ffmpeg-4.1.6.tar.bz2 && \
./configure && make && \
make install

RUN GOOS=linux go build -o main cmd/main.go

FROM ubuntu:18.04

RUN mkdir -p kubeedge

RUN apt-get update && \
apt-get install -y bzip2 curl upx-ucl gcc-aarch64-linux-gnu libc6-dev-arm64-cross gcc-arm-linux-gnueabi libc6-dev-armel-cross libva-dev libva-drm2 libx11-dev libvdpau-dev libxext-dev libsdl1.2-dev libxcb1-dev libxau-dev libxdmcp-dev yasm

RUN curl -sLO https://ffmpeg.org/releases/ffmpeg-4.1.6.tar.bz2 && \
tar -jx --strip-components=1 -f ffmpeg-4.1.6.tar.bz2 && \
./configure && make && \
make install

COPY --from=builder /build/main kubeedge/
COPY ./config.yaml kubeedge/

WORKDIR kubeedge
48 changes: 30 additions & 18 deletions mappers/kubeedge-v1.17.0/onvif-mapper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ Supported functions:

steps:

1. Run onvif mapper

There are two ways to run onvif mapper:

a). Start locally
- Install the dependences:
```
sudo apt-get update &&
Expand All @@ -18,27 +23,34 @@ steps:
./configure && make &&
sudo make install
```
This may take about 5 minutes to download and build all dependencies
- Compile mapper project:
There are currently two ways to compile and run the mapper project, namely building the mapper image or locally compile.

1. Build onvif mapper image
This may take about 5 minutes to download and build all dependencies.
- Locally compile

You can compile and run the mapper code directly:
```
docker build -t [YOUR MAPPER IMAGE NAME] .
go run cmd/main.go --v <log level,like 3> --config-file <path to config yaml>
```
After successfully building the onvif mapper image, you can deploy the mapper in the cluster through deployment or other methods.
A sample configuration file for mapper deployment is provided in the **resource** directory.
2. locally compile

If you want to debug locally first, you can also compile and run the mapper code directly:
b). Start using a container image
- Build onvif mapper image:
```
go run -tags stream cmd/main.go --v <log level,like 3> --config-file <path to config yaml>
docker build -f Dockerfile_stream -t [YOUR MAPPER IMAGE NAME] .
```
- Build and submit the device yaml file:
It may take about 8 minutes to build the docker image

- Deploy onvif mapper container:

After successfully building the onvif mapper image, you can deploy the mapper in the cluster through deployment or other methods.
A sample configuration file for mapper deployment is provided in the **resource** directory.

2. Build and submit the device yaml file:

After successfully deploying onvif mapper, users can build the device-instance and device-model configuration files according to the
characteristics of the user edge onvif device, and execute the following commands to submit to the kubeedge cluster:
```
kubectl apply -f <path to device model or device instance yaml>
```

Build the device-instance and device-model configuration files according to the characteristics of the user
edge onvif device, and execute the following commands to submit to the kubeedge cluster:
```
kubectl apply -f <path to device model or device instance yaml>
```
An example device-model and device-instance configuration file for onvif device is provided in the resource directory.
3. View log:

Users can view the logs of the mapper container to determine whether the edge device is managed correctly.
2 changes: 1 addition & 1 deletion mappers/kubeedge-v1.17.0/onvif-mapper/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (

"k8s.io/klog/v2"

"github.com/kubeedge/onvif/device"
"github.com/kubeedge/mapper-framework/pkg/common"
"github.com/kubeedge/mapper-framework/pkg/config"
"github.com/kubeedge/mapper-framework/pkg/grpcclient"
"github.com/kubeedge/mapper-framework/pkg/grpcserver"
"github.com/kubeedge/mapper-framework/pkg/httpserver"
"github.com/kubeedge/onvif/device"
)

func main() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (

"k8s.io/klog/v2"

"github.com/kubeedge/mapper-framework/pkg/common"
"github.com/kubeedge/onvif/driver"
"github.com/kubeedge/mapper-framework/pkg/common"
)

func DataHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) {
Expand Down
108 changes: 108 additions & 0 deletions mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/mysql/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
Copyright 2024 The KubeEdge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package mysql

import (
"database/sql"
"encoding/json"
"fmt"
"os"
"time"

_ "github.com/go-sql-driver/mysql"
"k8s.io/klog/v2"

"github.com/kubeedge/mapper-framework/pkg/common"
)

var (
DB *sql.DB
)

type DataBaseConfig struct {
MySQLClientConfig *MySQLClientConfig `json:"mysqlClientConfig"`
}

type MySQLClientConfig struct {
Addr string `json:"addr,omitempty"`
Database string `json:"database,omitempty"`
UserName string `json:"userName,omitempty"`
}

func NewDataBaseClient(config json.RawMessage) (*DataBaseConfig, error) {
configdata := new(MySQLClientConfig)
err := json.Unmarshal(config, configdata)
if err != nil {
return nil, err
}
return &DataBaseConfig{
MySQLClientConfig: configdata,
}, nil
}

func (d *DataBaseConfig) InitDbClient() error {
password := os.Getenv("PASSWORD")
usrName := d.MySQLClientConfig.UserName
addr := d.MySQLClientConfig.Addr
dataBase := d.MySQLClientConfig.Database
dataSourceName := fmt.Sprintf("%s:%s@tcp(%s)/%s", usrName, password, addr, dataBase)
var err error
DB, err = sql.Open("mysql", dataSourceName)
if err != nil {
return fmt.Errorf("connection to %s of mysql faild with err:%v", dataBase, err)
}

return nil
}

func (d *DataBaseConfig) CloseSession() {
err := DB.Close()
if err != nil {
klog.Errorf("close mysql failed with err:%v", err)
}
}

func (d *DataBaseConfig) AddData(data *common.DataModel) error {
deviceName := data.DeviceName
propertyName := data.PropertyName
namespace := data.Namespace
tableName := namespace + "/" + deviceName + "/" + propertyName
datatime := time.Unix(data.TimeStamp/1e3, 0).Format("2006-01-02 15:04:05")

createTable := fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s` (id INT AUTO_INCREMENT PRIMARY KEY, ts DATETIME NOT NULL,field TEXT)", tableName)
_, err := DB.Exec(createTable)
if err != nil {
return fmt.Errorf("create tabe into mysql failed with err:%v", err)
}

stmt, err := DB.Prepare(fmt.Sprintf("INSERT INTO `%s` (ts,field) VALUES (?,?)", tableName))
if err != nil {
return fmt.Errorf("prepare parament failed with err:%v", err)
}
defer func(stmt *sql.Stmt) {
err := stmt.Close()
if err != nil {
klog.Errorf("close mysql's statement failed with err:%v", err)
}
}(stmt)
_, err = stmt.Exec(datatime, data.Value)
if err != nil {
return fmt.Errorf("insert data into msyql failed with err:%v", err)
}

return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
Copyright 2024 The KubeEdge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package mysql

import (
"context"
"time"

"k8s.io/klog/v2"

"github.com/kubeedge/onvif/driver"
"github.com/kubeedge/mapper-framework/pkg/common"
)

func DataHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) {
dbConfig, err := NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.MySQLClientConfig)
if err != nil {
klog.Errorf("new database client error: %v", err)
return
}
err = dbConfig.InitDbClient()
if err != nil {
klog.Errorf("init redis database client err: %v", err)
return
}
reportCycle := time.Duration(twin.Property.ReportCycle)
if reportCycle == 0 {
reportCycle = common.DefaultReportCycle
}
ticker := time.NewTicker(reportCycle)
go func() {
for {
select {
case <-ticker.C:
deviceData, err := client.GetDeviceData(visitorConfig)
if err != nil {
klog.Errorf("publish error: %v", err)
continue
}
sData, err := common.ConvertToString(deviceData)
if err != nil {
klog.Errorf("Failed to convert publish method data : %v", err)
continue
}
dataModel.SetValue(sData)
dataModel.SetTimeStamp()

err = dbConfig.AddData(dataModel)
if err != nil {
klog.Errorf("mysql database add data error: %v", err)
return
}
case <-ctx.Done():
dbConfig.CloseSession()
return
}
}
}()
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (

"k8s.io/klog/v2"

"github.com/kubeedge/mapper-framework/pkg/common"
"github.com/kubeedge/onvif/driver"
"github.com/kubeedge/mapper-framework/pkg/common"
)

func DataHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (

"k8s.io/klog/v2"

"github.com/kubeedge/mapper-framework/pkg/common"
"github.com/kubeedge/onvif/driver"
"github.com/kubeedge/mapper-framework/pkg/common"
)

func DataHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) {
Expand Down
17 changes: 7 additions & 10 deletions mappers/kubeedge-v1.17.0/onvif-mapper/data/stream/handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
//go:build stream
// +build stream

/*
Copyright 2024 The KubeEdge Authors.
Expand Down Expand Up @@ -49,22 +46,22 @@ func StreamHandler(twin *common.Twin, client *driver.CustomizedClient, visitorCo
visitorConfigData, err := json.Marshal(visitorConfig.VisitorConfigData)
err = json.Unmarshal(visitorConfigData, &streamConfig)
if err != nil {
klog.Errorf("Unmarshal streamConfigs error: %v", err)
return err
return fmt.Errorf("Unmarshal streamConfigs error: %v", err)
}

switch twin.PropertyName {
// Currently, the function of saving frames and saving videos is built-in according to the configuration.
// Other functions can be expanded here.
case "saveFrame":
case common.SaveFrame:
err = SaveFrame(streamURI.(string), streamConfig.OutputDir, streamConfig.Format, streamConfig.FrameCount, streamConfig.FrameInterval)
case "saveVideo":
case common.SaveVideo:
err = SaveVideo(streamURI.(string), streamConfig.OutputDir, streamConfig.Format, streamConfig.FrameCount, streamConfig.VideoNum)
default:
err = fmt.Errorf("cannot find the processing method for the corresponding Property %s of the stream data", twin.PropertyName)
}
if err == nil {
klog.V(2).Infof("Successfully processed streaming data by %s", twin.PropertyName)
if err != nil {
return err
}
return err
klog.V(2).Infof("Successfully processed streaming data by %s", twin.PropertyName)
return nil
}
Loading

0 comments on commit dec698e

Please sign in to comment.