Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Merge pull request #23 from mkuculyma/issue-fix#7
Browse files Browse the repository at this point in the history
Small tests added
  • Loading branch information
Maria Rolbiecka authored Nov 4, 2016
2 parents 8c4c9d8 + 031eab0 commit 248fbc6
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 61 deletions.
4 changes: 1 addition & 3 deletions examples/tasks/pcm-file.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
"/intel/pcm/L3MISS": {},
"/intel/pcm/PhysIPC": {},
"/intel/pcm/PhysIPC%": {},
"/intel/pcm/Proc_Energy_Joules": {},
"/intel/pcm/READ": {},
"/intel/pcm/SKT0": {},
"/intel/pcm/TEMP": {},
"/intel/pcm/TIME_ticks": {},
"/intel/pcm/WRITE": {}
Expand All @@ -35,7 +33,7 @@
"process": null,
"publish": [
{
"plugin_name": "file",
"plugin_name": "mock-file",
"config": {
"file": "/tmp/published_pcm"
}
Expand Down
110 changes: 63 additions & 47 deletions pcm/pcm.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ import (
"sync"
"time"

"io"

"math"

"github.com/intelsdi-x/snap-plugin-utilities/ns"
"github.com/intelsdi-x/snap/control/plugin"
"github.com/intelsdi-x/snap/control/plugin/cpolicy"
Expand All @@ -40,7 +44,7 @@ const (
// Name of plugin
name = "pcm"
// Version of plugin
version = 8
version = 9
// Type of plugin
pluginType = plugin.CollectorPluginType
)
Expand All @@ -52,15 +56,15 @@ func Meta() *plugin.PluginMeta {
// PCM
type PCM struct {
keys []string
data map[string]interface{}
data map[string]float64
mutex *sync.RWMutex
}

func (p *PCM) Keys() []string {
return p.keys
}

func (p *PCM) Data() map[string]interface{} {
func (p *PCM) Data() map[string]float64 {
return p.data
}

Expand Down Expand Up @@ -96,7 +100,16 @@ func (p *PCM) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) {
}

func NewPCMCollector() (*PCM, error) {
pcm := &PCM{mutex: &sync.RWMutex{}, data: map[string]interface{}{}}
pcm := &PCM{mutex: &sync.RWMutex{}, data: map[string]float64{}}

err := pcm.run()
if err != nil {
return nil, err
}

return pcm, nil
}
func (pcm *PCM) run() error {
var cmd *exec.Cmd
if path := os.Getenv("SNAP_PCM_PATH"); path != "" {
cmd = exec.Command(filepath.Join(path, "pcm.x"), "/csv", "-nc", "-r", "1")
Expand All @@ -111,53 +124,16 @@ func NewPCMCollector() (*PCM, error) {

cmdReader, err := cmd.StdoutPipe()
if err != nil {
fmt.Fprintln(os.Stderr, "Error creating StdoutPipe", err)
return nil, err
return fmt.Errorf("Error creating StdoutPipe %v", err)
}
// read the data from stdout
scanner := bufio.NewScanner(cmdReader)
go func() {
first := true
for scanner.Scan() {
if first {
first = false
continue
}
if len(pcm.keys) == 0 {
pcm.mutex.Lock()
keys := strings.Split(strings.TrimSuffix(scanner.Text(), ";"), ";")
//skip the date and time fields
pcm.keys = make([]string, len(keys[2:]))
for i, k := range keys[2:] {
// removes all spaces from metric key
metricKey := ns.ReplaceNotAllowedCharsInNamespacePart(k)
pcm.keys[i] = fmt.Sprintf("/intel/pcm/%s", metricKey)
}
pcm.mutex.Unlock()
continue
}

pcm.mutex.Lock()
datal := strings.Split(strings.TrimSuffix(scanner.Text(), ";"), ";")
for i, d := range datal[2:] {
v, err := strconv.ParseFloat(strings.TrimSpace(d), 64)
if err == nil {
pcm.data[pcm.keys[i]] = v
} else {
fmt.Fprintln(os.Stderr, "Invalid metric value", err)
pcm.data[pcm.keys[i]] = nil
}
}
pcm.mutex.Unlock()
// fmt.Fprintf(os.Stderr, "data >>> %+v\n", pcm.data)
// fmt.Fprintf(os.Stdout, "data >>> %+v\n", pcm.data)
}
go func() {
pcm.parse(cmdReader)
}()

err = cmd.Start()
if err != nil {
fmt.Fprintln(os.Stderr, "Error starting pcm", err)
return nil, err
return fmt.Errorf("Error starting pcm %v", err)
}

// we need to wait until we have our metric types
Expand All @@ -170,7 +146,7 @@ func NewPCMCollector() (*PCM, error) {
break
}
if time.Since(st) > time.Second*2 {
return nil, fmt.Errorf("Timed out waiting for metrics from pcm")
return fmt.Errorf("Timed out waiting for metrics from pcm")
}
}

Expand All @@ -181,5 +157,45 @@ func NewPCMCollector() (*PCM, error) {
// return nil, err
// }

return pcm, nil
return nil
}

func (pcm *PCM) parse(reader io.Reader) {
// read the data from stdout
scanner := bufio.NewScanner(reader)
first := true
for scanner.Scan() {
if first {
first = false
continue
}
if len(pcm.keys) == 0 {
pcm.mutex.Lock()
keys := strings.Split(strings.TrimSuffix(scanner.Text(), ";"), ";")
//skip the date and time fields
pcm.keys = make([]string, len(keys[2:]))
for i, k := range keys[2:] {
// removes all spaces from metric key
metricKey := ns.ReplaceNotAllowedCharsInNamespacePart(k)
pcm.keys[i] = fmt.Sprintf("/intel/pcm/%s", metricKey)
}
pcm.mutex.Unlock()
continue
}

pcm.mutex.Lock()
datal := strings.Split(strings.TrimSuffix(scanner.Text(), ";"), ";")
for i, d := range datal[2:] {
v, err := strconv.ParseFloat(strings.TrimSpace(d), 64)
if err == nil {
pcm.data[pcm.keys[i]] = v
} else {
fmt.Fprintln(os.Stderr, "Invalid metric value", err)
pcm.data[pcm.keys[i]] = math.NaN()
}
}
pcm.mutex.Unlock()
// fmt.Fprintf(os.Stderr, "data >>> %+v\n", pcm.data)
// fmt.Fprintf(os.Stdout, "data >>> %+v\n", pcm.data)
}
}
88 changes: 77 additions & 11 deletions pcm/pcm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,48 @@ limitations under the License.
package pcm

import (
"strings"
"sync"
"testing"

"github.com/intelsdi-x/snap/control/plugin"
"github.com/intelsdi-x/snap/control/plugin/cpolicy"
"github.com/intelsdi-x/snap/core"
. "github.com/smartystreets/goconvey/convey"
)

var mockCmdOut = `System;;;;;;;;;;;;;;;;;;;;;System Core C-States;;;;System Pack C-States;;;Socket0;;;;;;;;;;;;;SKT0 Core C-State;;;;SKT0 Package C-State;;;
Date;Time;EXEC;IPC;FREQ;AFREQ;L3MISS;L2MISS;L3HIT;L2HIT;L3MPI;L2MPI;READ;WRITE;INST;ACYC;TIME(ticks);PhysIPC;PhysIPC%;INSTnom;INSTnom%;C0res%;C1res%;C3res%;C6res%;C3res%;C6res%;C7res%;EXEC;IPC;FREQ;AFREQ;L3MISS;L2MISS;L3HIT;L2HIT;L3MPI;L2MPI;READ;WRITE;TEMP;C0res%;C1res%;C3res%;C6res%;C3res%;C6res%;C7res%;
2016-11-02;14:10:27.600;0.0175;1.13;0.0155;0.84;0.0925;0.617;0.85;0.335;0.000237;0.00158;0.0361;0.0157;391;347;2.8e+03;2.26;56.4;0.0349;0.873;1.85;2.13;1.14;94.9;0.382;76.6;0;0.0175;1.13;0.0155;0.84;0.0925;0.617;0.85;0.335;0.000237;0.00158;0.0361;0.0157;N/A;1.85;2.13;1.14;94.9;0.382;76.6;0;
`

var refMap = map[string]float64{
"/intel/pcm/L3MPI": 0.000237,
"/intel/pcm/INST": 391,
"/intel/pcm/ACYC": 347,
"/intel/pcm/PhysIPC%": 56.4,
"/intel/pcm/C0res%": 1.85,
"/intel/pcm/C6res%": 76.6,
"/intel/pcm/EXEC": 0.0175,
"/intel/pcm/L2MISS": 0.617,
"/intel/pcm/C7res%": 0,
"/intel/pcm/AFREQ": 0.84,
"/intel/pcm/L3HIT": 0.85,
"/intel/pcm/L2MPI": 0.00158,
"/intel/pcm/READ": 0.0361,
"/intel/pcm/WRITE": 0.0157,
"/intel/pcm/TIME_ticks": 2800,
"/intel/pcm/INSTnom%": 0.873,
"/intel/pcm/IPC": 1.13,
"/intel/pcm/L3MISS": 0.0925,
"/intel/pcm/PhysIPC": 2.26,
"/intel/pcm/INSTnom": 0.0349,
"/intel/pcm/C1res%": 2.13,
"/intel/pcm/C3res%": 0.382,
"/intel/pcm/FREQ": 0.0155,
"/intel/pcm/L2HIT": 0.335,
}

func TestPCMPlugin(t *testing.T) {
Convey("Meta should return metadata for the plugin", t, func() {
meta := Meta()
Expand All @@ -38,17 +73,11 @@ func TestPCMPlugin(t *testing.T) {
})

Convey("Create PCM Collector", t, func() {
pcmCol, err := NewPCMCollector()
Convey("So pcmCol should not be nil", func() {
So(pcmCol, ShouldNotBeNil)
})
Convey("So err should be nil", func() {
So(err, ShouldBeNil)
})
Convey("So pcmCol should be of PCM type", func() {
So(pcmCol, ShouldHaveSameTypeAs, &PCM{})
})
configPolicy, err := pcmCol.GetConfigPolicy()
pcm := &PCM{mutex: &sync.RWMutex{}, data: map[string]float64{}}
reader := strings.NewReader(mockCmdOut)
pcm.parse(reader)

configPolicy, err := pcm.GetConfigPolicy()
Convey("pcmCol.GetConfigPolicy() should return a config policy", func() {
Convey("So config policy should not be nil", func() {
So(configPolicy, ShouldNotBeNil)
Expand All @@ -60,5 +89,42 @@ func TestPCMPlugin(t *testing.T) {
So(configPolicy, ShouldHaveSameTypeAs, &cpolicy.ConfigPolicy{})
})
})

Convey("Given valid static metric namespace collect metrics", func() {

var mockMts []plugin.MetricType

for key := range refMap {
mockMts = append(mockMts, plugin.MetricType{Namespace_: core.NewNamespace(strings.Split(strings.TrimPrefix(key, "/"), "/")...)})
}

So(func() { pcm.CollectMetrics(mockMts) }, ShouldNotPanic)
result, err := pcm.CollectMetrics(mockMts)
So(len(result), ShouldEqual, 24)
So(err, ShouldBeNil)

m := make(map[string]float64, len(result))

for _, r := range result {
m[r.Namespace().String()] = r.Data().(float64)
}

So(m, ShouldResemble, refMap)
})

Convey("Get metric types", func() {
mts, err := pcm.GetMetricTypes(plugin.ConfigType{})
So(err, ShouldBeNil)
So(len(mts), ShouldEqual, 46)

namespaces := []string{}
for _, m := range mts {
namespaces = append(namespaces, m.Namespace().String())
}

for k := range refMap {
So(namespaces, ShouldContain, k)
}
})
})
}

0 comments on commit 248fbc6

Please sign in to comment.