Skip to content

Commit

Permalink
contrib/globalsign/mgo: Integration for the MGO MongoDB Client (#308)
Browse files Browse the repository at this point in the history
* contrib/globalsign/mgo: Initial commit of Mongo DB support

* contrib/globalsign/mgo: Added basic support for Find and Iter.

* contrib/globalsign/mgo: Adding comments to exported functions.

* contrib/globalsign/mgo: added support for Bulk.

* contrib/globalsign/mgo: Completed instrumenting the Collection API.

* contrib/globalsign/mgo: Removed unneeded wrappers for tracing mgo.Bulk

* contrib/globalsign/mgo: UpsertId wrapper called the wrong function.

* contrib/globalsign/mgo: prepended all types with `Wrap`.

* contrib/globalsign/mgo: Improving comments.

* contrib/globalsign/mgo: Added unit tests.

* contrib/globalsign/mgo: Additional unit tests.

* contrib/globalsign/mgo: Adding mongo to the circle ci configuration.

* .circleci: changed the mongo image to the circleci official image.

* contrib/globalsign/mgo: added linter supression rules for FindId, RemoveId, UpdateId, and UpsertId

* contrib/globalsign/mgo: removed unneeded string format.

* contrib/globalsign/mgo: Fixing names of types.

* contrib/globalsign/mgo: adding package comments

* contrib/globalsign/mgo: fixing resource name setting.

* contrib/globalsign/mgo: changing all op and resource names to `mongodb.query`

* contrib/globalsign/mgo: remove Wrap from test names

* contrib/globalsign/mgo: moving context into mongoConfig

* contrib/globalsign/mgo: fixed a but in the integration tests.

* contrib/globalsign/mgo: moved spantype to ext.

* contrib/globalsign/mgo: some code clean up and reformatting per PR comments.

* contrib/globalsign/mgo: Changed MongoOption to DialOption

* contrib/globalsign/mgo: Changed MongoOption to DialOption

* contrib/globalsign/mgo: Moved Collection to its own file.

* contrib/globalsign/mgo: removed DropAllIndexes to maintain compatibility with old MGO versions.

* contrib/globalsign/mgo: Added coverage for Query.

* contrib/globalsign/mgo: Added coverage for Pipe.

* contrib/globalsign/mgo: covering Collection.(Pipe|NewIter).

* contrib/globalsign/mgo: moved Query into a separate file.

* contrib/globalsign/mgo: removing DropAllIndexes for compatibility reasons.

* contrib/globalsign/mgo: removing query field from Query struct

* contrib/globalsign/mgo: added some metadata to the MongoDB spans.

* contrib/globalsign/mgo: because dropallindexes was removed, expected spans is 6

* contrib/globalsign/mgo: had the wrong name for the parent span.

* contrib/globalsign/mgo: had the wrong name for the parent span.
  • Loading branch information
Erich Ess authored Aug 15, 2018
1 parent 5a26e12 commit 1f24f8b
Show file tree
Hide file tree
Showing 8 changed files with 979 additions and 0 deletions.
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ jobs:
DD_APM_ENABLED: "true"
DD_BIND_HOST: "0.0.0.0"
DD_API_KEY: invalid_key_but_this_is_fine
- image: circleci/mongo:latest-ram
- image: memcached:1.5.9

steps:
Expand Down
202 changes: 202 additions & 0 deletions contrib/globalsign/mgo/collection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package mgo

import (
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
)

// Collection provides a mgo.Collection along with
// data used for APM Tracing.
type Collection struct {
*mgo.Collection
cfg mongoConfig
}

// Create invokes and traces Collection.Create
func (c *Collection) Create(info *mgo.CollectionInfo) error {
span := newChildSpanFromContext(c.cfg)
err := c.Collection.Create(info)
span.Finish(tracer.WithError(err))
return err
}

// DropCollection invokes and traces Collection.DropCollection
func (c *Collection) DropCollection() error {
span := newChildSpanFromContext(c.cfg)
err := c.Collection.DropCollection()
span.Finish(tracer.WithError(err))
return err
}

// EnsureIndexKey invokes and traces Collection.EnsureIndexKey
func (c *Collection) EnsureIndexKey(key ...string) error {
span := newChildSpanFromContext(c.cfg)
err := c.Collection.EnsureIndexKey(key...)
span.Finish(tracer.WithError(err))
return err
}

// EnsureIndex invokes and traces Collection.EnsureIndex
func (c *Collection) EnsureIndex(index mgo.Index) error {
span := newChildSpanFromContext(c.cfg)
err := c.Collection.EnsureIndex(index)
span.Finish(tracer.WithError(err))
return err
}

// DropIndex invokes and traces Collection.DropIndex
func (c *Collection) DropIndex(key ...string) error {
span := newChildSpanFromContext(c.cfg)
err := c.Collection.DropIndex(key...)
span.Finish(tracer.WithError(err))
return err
}

// DropIndexName invokes and traces Collection.DropIndexName
func (c *Collection) DropIndexName(name string) error {
span := newChildSpanFromContext(c.cfg)
err := c.Collection.DropIndexName(name)
span.Finish(tracer.WithError(err))
return err
}

// Indexes invokes and traces Collection.Indexes
func (c *Collection) Indexes() (indexes []mgo.Index, err error) {
span := newChildSpanFromContext(c.cfg)
indexes, err = c.Collection.Indexes()
span.Finish(tracer.WithError(err))
return indexes, err
}

// Insert invokes and traces Collectin.Insert
func (c *Collection) Insert(docs ...interface{}) error {
span := newChildSpanFromContext(c.cfg)
err := c.Collection.Insert(docs...)
span.Finish(tracer.WithError(err))
return err
}

// Find invokes and traces Collection.Find
func (c *Collection) Find(query interface{}) *Query {
return &Query{
Query: c.Collection.Find(query),
cfg: c.cfg,
}
}

// FindId invokes and traces Collection.FindId
func (c *Collection) FindId(id interface{}) *Query { // nolint
return &Query{
Query: c.Collection.FindId(id),
cfg: c.cfg,
}
}

// Count invokes and traces Collection.Count
func (c *Collection) Count() (n int, err error) {
span := newChildSpanFromContext(c.cfg)
n, err = c.Collection.Count()
span.Finish(tracer.WithError(err))
return n, err
}

// Bulk creates a trace ready wrapper around Collection.Bulk
func (c *Collection) Bulk() *Bulk {
return &Bulk{
Bulk: c.Collection.Bulk(),
cfg: c.cfg,
}
}

// NewIter invokes and traces Collection.Iter
func (c *Collection) NewIter(session *mgo.Session, firstBatch []bson.Raw, cursorId int64, err error) *Iter { // nolint
return &Iter{
Iter: c.Collection.NewIter(session, firstBatch, cursorId, err),
cfg: c.cfg,
}
}

// Pipe invokes and traces Collection.Pipe
func (c *Collection) Pipe(pipeline interface{}) *Pipe {
return &Pipe{
Pipe: c.Collection.Pipe(pipeline),
cfg: c.cfg,
}
}

// Update invokes and traces Collection.Update
func (c *Collection) Update(selector interface{}, update interface{}) error {
span := newChildSpanFromContext(c.cfg)
err := c.Collection.Update(selector, update)
span.Finish(tracer.WithError(err))
return err
}

// UpdateId invokes and traces Collection.UpdateId
func (c *Collection) UpdateId(id interface{}, update interface{}) error { // nolint
span := newChildSpanFromContext(c.cfg)
err := c.Collection.UpdateId(id, update)
span.Finish(tracer.WithError(err))
return err
}

// UpdateAll invokes and traces Collection.UpdateAll
func (c *Collection) UpdateAll(selector interface{}, update interface{}) (info *mgo.ChangeInfo, err error) {
span := newChildSpanFromContext(c.cfg)
info, err = c.Collection.UpdateAll(selector, update)
span.Finish(tracer.WithError(err))
return info, err
}

// Upsert invokes and traces Collection.Upsert
func (c *Collection) Upsert(selector interface{}, update interface{}) (info *mgo.ChangeInfo, err error) {
span := newChildSpanFromContext(c.cfg)
info, err = c.Collection.Upsert(selector, update)
span.Finish(tracer.WithError(err))
return info, err
}

// UpsertId invokes and traces Collection.UpsertId
func (c *Collection) UpsertId(id interface{}, update interface{}) (info *mgo.ChangeInfo, err error) { // nolint
span := newChildSpanFromContext(c.cfg)
info, err = c.Collection.UpsertId(id, update)
span.Finish(tracer.WithError(err))
return info, err
}

// Remove invokes and traces Collection.Remove
func (c *Collection) Remove(selector interface{}) error {
span := newChildSpanFromContext(c.cfg)
err := c.Collection.Remove(selector)
span.Finish(tracer.WithError(err))
return err
}

// RemoveId invokes and traces Collection.RemoveId
func (c *Collection) RemoveId(id interface{}) error { // nolint
span := newChildSpanFromContext(c.cfg)
err := c.Collection.RemoveId(id)
span.Finish(tracer.WithError(err))
return err
}

// RemoveAll invokes and traces Collection.RemoveAll
func (c *Collection) RemoveAll(selector interface{}) (info *mgo.ChangeInfo, err error) {
span := newChildSpanFromContext(c.cfg)
info, err = c.Collection.RemoveAll(selector)
span.Finish(tracer.WithError(err))
return info, err
}

// Repair invokes and traces Collection.Repair
func (c *Collection) Repair() *Iter {
span := newChildSpanFromContext(c.cfg)
iter := c.Collection.Repair()
span.Finish()
return &Iter{
Iter: iter,
cfg: c.cfg,
}
}
144 changes: 144 additions & 0 deletions contrib/globalsign/mgo/mgo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Package mgo provides functions and types which allow tracing of the MGO MongoDB client (https://github.com/globalsign/mgo)
package mgo // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/globalsign/mgo"

import (
"strings"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

"github.com/globalsign/mgo"
)

// Dial opens a connection to a MongoDB server and configures it
// for tracing.
func Dial(url string, opts ...DialOption) (*Session, error) {
session, err := mgo.Dial(url)
s := &Session{Session: session}

defaults(&s.cfg)
for _, fn := range opts {
fn(&s.cfg)
}

// Record metadata so that it can be added to recorded traces
s.cfg.tags["hosts"] = strings.Join(session.LiveServers(), ", ")
info, _ := session.BuildInfo()
s.cfg.tags["mgo_version"] = info.Version

return s, err
}

// Session is an mgo.Session instance that will be traced.
type Session struct {
*mgo.Session
cfg mongoConfig
}

func newChildSpanFromContext(config mongoConfig) ddtrace.Span {
span, _ := tracer.StartSpanFromContext(
config.ctx,
"mongodb.query",
tracer.SpanType(ext.SpanTypeMongoDB),
tracer.ServiceName(config.serviceName),
tracer.ResourceName("mongodb.query"))

for key, value := range config.tags {
span.SetTag(key, value)
}

return span
}

// Run invokes and traces Session.Run
func (s *Session) Run(cmd interface{}, result interface{}) (err error) {
span := newChildSpanFromContext(s.cfg)
err = s.Session.Run(cmd, result)
span.Finish(tracer.WithError(err))
return
}

// Database is an mgo.Database along with the data necessary for tracing.
type Database struct {
*mgo.Database
cfg mongoConfig
}

// DB returns a new database for this Session.
func (s *Session) DB(name string) *Database {
dbCfg := mongoConfig{
ctx: s.cfg.ctx,
serviceName: s.cfg.serviceName,
tags: s.cfg.tags,
}

dbCfg.tags["database"] = name
return &Database{
Database: s.Session.DB(name),
cfg: dbCfg,
}
}

// C returns a new Collection from this Database.
func (db *Database) C(name string) *Collection {
return &Collection{
Collection: db.Database.C(name),
cfg: db.cfg,
}
}

// Iter is an mgo.Iter instance that will be traced.
type Iter struct {
*mgo.Iter

cfg mongoConfig
}

// Next invokes and traces Iter.Next
func (iter *Iter) Next(result interface{}) bool {
span := newChildSpanFromContext(iter.cfg)
r := iter.Iter.Next(result)
span.Finish()
return r
}

// For invokes and traces Iter.For
func (iter *Iter) For(result interface{}, f func() error) (err error) {
span := newChildSpanFromContext(iter.cfg)
err = iter.Iter.For(result, f)
span.Finish(tracer.WithError(err))
return err
}

// All invokes and traces Iter.All
func (iter *Iter) All(result interface{}) (err error) {
span := newChildSpanFromContext(iter.cfg)
err = iter.Iter.All(result)
span.Finish(tracer.WithError(err))
return err
}

// Close invokes and traces Iter.Close
func (iter *Iter) Close() (err error) {
span := newChildSpanFromContext(iter.cfg)
err = iter.Iter.Close()
span.Finish(tracer.WithError(err))
return err
}

// Bulk is an mgo.Bulk instance that will be traced.
type Bulk struct {
*mgo.Bulk

cfg mongoConfig
}

// Run invokes and traces Bulk.Run
func (b *Bulk) Run() (result *mgo.BulkResult, err error) {
span := newChildSpanFromContext(b.cfg)
result, err = b.Bulk.Run()
span.Finish(tracer.WithError(err))

return result, err
}
Loading

0 comments on commit 1f24f8b

Please sign in to comment.