Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregation Framework & Query Disk Use Support for mongoexport #531

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 59 additions & 9 deletions mongoexport/mongoexport.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,18 +184,26 @@ func (exp *MongoExport) validateSettings() error {
return fmt.Errorf("cannot use --forceTableScan when specifying --query")
}

if exp.InputOpts.Query != "" && exp.InputOpts.QueryFile != "" {
return fmt.Errorf("either --query or --queryFile can be specified as a query option")
if exp.InputOpts.Query != "" && exp.InputOpts.QueryFile != "" && exp.InputOpts.Pipeline != "" {
return fmt.Errorf("either --query, --queryFile, or --pipeline can be specified as a query option")
}

if exp.InputOpts != nil && exp.InputOpts.HasQuery() {
content, err := exp.InputOpts.GetQuery()
if err != nil {
return err
}
_, err2 := getObjectFromByteArg(content)
if err2 != nil {
return err2

if exp.InputOpts.Pipeline != "" {
_, err2 := getArrayFromByteArg(content)
if err2 != nil {
return err2
}
} else {
_, err2 := getObjectFromByteArg(content)
if err2 != nil {
return err2
}
}
}

Expand Down Expand Up @@ -256,6 +264,7 @@ func makeFieldSelector(fields string) bson.M {
// It always returns Limit if there is a limit, assuming that in general
// limits will less then the total possible.
// If there is a query and no limit then it returns 0, because it's too expensive to count the query.
// If there is an aggregation pipeline and no limit then it returns 0, because it's too expensive to count the results.
// If the collection is a view then it returns 0, because it is too expensive to count the view.
// Otherwise it returns the count minus the skip
func (exp *MongoExport) getCount() (int64, error) {
Expand All @@ -269,6 +278,9 @@ func (exp *MongoExport) getCount() (int64, error) {
if exp.InputOpts != nil && exp.InputOpts.Query != "" {
return 0, nil
}
if exp.InputOpts != nil && exp.InputOpts.Pipeline != "" {
return 0, nil
}
coll := session.Database(exp.ToolOptions.Namespace.DB).Collection(exp.ToolOptions.Namespace.Collection)

if exp.collInfo.IsView() {
Expand Down Expand Up @@ -298,6 +310,7 @@ func (exp *MongoExport) getCount() (int64, error) {
// associated session, so that it can be closed once the cursor is used up.
func (exp *MongoExport) getCursor() (*mongo.Cursor, error) {
findOpts := mopt.Find()
aggregationOpts := mopt.Aggregate()

if exp.InputOpts != nil && exp.InputOpts.Sort != "" {
sortD, err := getSortFromArg(exp.InputOpts.Sort)
Expand All @@ -309,15 +322,23 @@ func (exp *MongoExport) getCursor() (*mongo.Cursor, error) {
}

query := bson.D{}
pipeline := bson.A{}
if exp.InputOpts != nil && exp.InputOpts.HasQuery() {
var err error
content, err := exp.InputOpts.GetQuery()
if err != nil {
return nil, err
}
err = bson.UnmarshalExtJSON(content, false, &query)
if err != nil {
return nil, fmt.Errorf("error parsing query as Extended JSON: %v", err)
if exp.InputOpts.Pipeline != "" {
err = bson.UnmarshalExtJSON(content, false, &pipeline)
if err != nil {
return nil, fmt.Errorf("error parsing query as Extended JSON: %v", err)
}
} else {
err = bson.UnmarshalExtJSON(content, false, &query)
if err != nil {
return nil, fmt.Errorf("error parsing query as Extended JSON: %v", err)
}
}
}

Expand Down Expand Up @@ -366,11 +387,20 @@ func (exp *MongoExport) getCursor() (*mongo.Cursor, error) {
findOpts.SetLimit(exp.InputOpts.Limit)
}

if exp.InputOpts != nil && exp.InputOpts.AllowDiskUse {
findOpts.SetAllowDiskUse(true)
aggregationOpts.SetAllowDiskUse(true)
}

if len(exp.OutputOpts.Fields) > 0 {
findOpts.SetProjection(makeFieldSelector(exp.OutputOpts.Fields))
}

return coll.Find(nil, query, findOpts)
if exp.InputOpts.Pipeline != "" {
return coll.Aggregate(nil, pipeline, aggregationOpts)
} else {
return coll.Find(nil, query, findOpts)
}
}

// verifyCollectionExists checks if the collection exists. If it does, a copy of the collection info will be cached
Expand Down Expand Up @@ -529,6 +559,26 @@ func getObjectFromByteArg(queryRaw []byte) (map[string]interface{}, error) {
return parsedJSON, nil
}

// getArrayFromByteArg takes an object in extended JSON, and converts it to an object that
// can be passed straight to db.collection.find(...) as a query or sort criteria.
// Returns an error if the string is not valid JSON
func getArrayFromByteArg(queryRaw []byte) ([]map[string]interface{}, error) {
parsedJSON := []map[string]interface{}{}
err := json.Unmarshal(queryRaw, &parsedJSON)
if err != nil {
return nil, fmt.Errorf("query '%v' is not valid JSON: %v", queryRaw, err)
}

for _, stage := range parsedJSON {
err = bsonutil.ConvertLegacyExtJSONDocumentToBSON(stage)
if err != nil {
return nil, err
}
}

return parsedJSON, nil
}

// getSortFromArg takes a sort specification in JSON and returns it as a bson.D
// object which preserves the ordering of the keys as they appear in the input.
func getSortFromArg(queryRaw string) (bson.D, error) {
Expand Down
8 changes: 6 additions & 2 deletions mongoexport/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ func (*OutputFormatOptions) Name() string {
type InputOptions struct {
Query string `long:"query" value-name:"<json>" short:"q" description:"query filter, as a JSON string, e.g., '{x:{$gt:1}}'"`
QueryFile string `long:"queryFile" value-name:"<filename>" description:"path to a file containing a query filter (JSON)"`
Pipeline string `long:"pipeline" value-name:"<json>" short:"p" description:"aggregation pipeline, as a JSON array, e.g., '[$match: {x:{$gt:1}}]'"`
SlaveOk bool `long:"slaveOk" short:"k" hidden:"true" description:"allow secondary reads if available" default-mask:"-"`
ReadPreference string `long:"readPreference" value-name:"<string>|<json>" description:"specify either a preference mode (e.g. 'nearest') or a preference json object (e.g. '{mode: \"nearest\", tagSets: [{a: \"b\"}], maxStalenessSeconds: 123}')"`
ForceTableScan bool `long:"forceTableScan" description:"force a table scan (do not use $snapshot or hint _id). Deprecated since this is default behavior on WiredTiger"`
Skip int64 `long:"skip" value-name:"<count>" description:"number of documents to skip"`
Limit int64 `long:"limit" value-name:"<count>" description:"limit the number of documents to export"`
Sort string `long:"sort" value-name:"<json>" description:"sort order, as a JSON string, e.g. '{x:1}'"`
AssertExists bool `long:"assertExists" description:"if specified, export fails if the collection does not exist"`
AllowDiskUse bool `long:"allowDiskUse" description:"if specified, allows server to use temporary files on disk to store data exceeding the 100MB system memory limit"`
}

// Name returns a human-readable group name for input options.
Expand All @@ -77,7 +79,7 @@ func (*InputOptions) Name() string {
}

func (inputOptions *InputOptions) HasQuery() bool {
return inputOptions.Query != "" || inputOptions.QueryFile != ""
return inputOptions.Query != "" || inputOptions.QueryFile != "" || inputOptions.Pipeline != ""
}

func (inputOptions *InputOptions) GetQuery() ([]byte, error) {
Expand All @@ -89,8 +91,10 @@ func (inputOptions *InputOptions) GetQuery() ([]byte, error) {
err = fmt.Errorf("error reading queryFile: %s", err)
}
return content, err
} else if inputOptions.Pipeline != "" {
return []byte(inputOptions.Pipeline), nil
}
panic("GetQuery can return valid values only for query or queryFile input")
panic("GetQuery can return valid values only for query, queryFile, or pipeline input")
}

// Options represents all possible options that can be used to configure mongoexport.
Expand Down