Skip to content

Commit

Permalink
Merge pull request vitessio#4385 from tinyspeck/vtctl-support-vschema…
Browse files Browse the repository at this point in the history
…-ddls

vtctl support for vschema ddls
  • Loading branch information
sougou authored Dec 6, 2018
2 parents fb8c46f + e47bed0 commit 3b2294b
Show file tree
Hide file tree
Showing 6 changed files with 288 additions and 180 deletions.
3 changes: 3 additions & 0 deletions doc/vtctlReference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1119,8 +1119,11 @@ Applies the VTGate routing schema to the provided keyspace. Shows the result aft
| :-------- | :--------- | :--------- |
| cells | string | If specified, limits the rebuild to the cells, after upload. Ignored if skipRebuild is set. |
| skip_rebuild | Boolean | If set, do no rebuild the SrvSchema objects. |
| dry-run | Boolean | Shows the proposed change without executing it |
| vschema | string | Identifies the VTGate routing schema |
| vschema_file | string | Identifies the VTGate routing schema file |
| sql | string | Identifies a VSchema DDL SQL statement |
| sql_file | string | Identifies a VSchema DDL SQL statement |


#### Arguments
Expand Down
162 changes: 162 additions & 0 deletions go/vt/topotools/vschema_ddl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
Copyright 2018 The Vitess Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package topotools

import (
"reflect"

"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"

vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// ApplyVSchemaDDL applies the given DDL statement to the vschema
// keyspace definition and returns the modified keyspace object.
func ApplyVSchemaDDL(ksName string, ks *vschemapb.Keyspace, ddl *sqlparser.DDL) (*vschemapb.Keyspace, error) {
if ks == nil {
ks = new(vschemapb.Keyspace)
}

if ks.Tables == nil {
ks.Tables = map[string]*vschemapb.Table{}
}

if ks.Vindexes == nil {
ks.Vindexes = map[string]*vschemapb.Vindex{}
}

var tableName string
var table *vschemapb.Table
if !ddl.Table.IsEmpty() {
tableName = ddl.Table.Name.String()
table, _ = ks.Tables[tableName]
}

switch ddl.Action {
case sqlparser.CreateVindexStr:
name := ddl.VindexSpec.Name.String()
if _, ok := ks.Vindexes[name]; ok {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s already exists in keyspace %s", name, ksName)
}

// Make sure the keyspace has the sharded bit set to true
// if this is the first vindex defined in the keyspace.
if len(ks.Vindexes) == 0 {
ks.Sharded = true
}

owner, params := ddl.VindexSpec.ParseParams()
ks.Vindexes[name] = &vschemapb.Vindex{
Type: ddl.VindexSpec.Type.String(),
Params: params,
Owner: owner,
}

return ks, nil

case sqlparser.AddColVindexStr:
// Support two cases:
//
// 1. The vindex type / params / owner are specified. If the
// named vindex doesn't exist, create it. If it does exist,
// require the parameters to match.
//
// 2. The vindex type is not specified. Make sure the vindex
// already exists.
spec := ddl.VindexSpec
name := spec.Name.String()
if !spec.Type.IsEmpty() {
owner, params := spec.ParseParams()
if vindex, ok := ks.Vindexes[name]; ok {
if vindex.Type != spec.Type.String() {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s defined with type %s not %s", name, vindex.Type, spec.Type.String())
}
if vindex.Owner != owner {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s defined with owner %s not %s", name, vindex.Owner, owner)
}
if (len(vindex.Params) != 0 || len(params) != 0) && !reflect.DeepEqual(vindex.Params, params) {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s defined with different parameters", name)
}
} else {
// Make sure the keyspace has the sharded bit set to true
// if this is the first vindex defined in the keyspace.
if len(ks.Vindexes) == 0 {
ks.Sharded = true
}
ks.Vindexes[name] = &vschemapb.Vindex{
Type: spec.Type.String(),
Params: params,
Owner: owner,
}
}
} else {
if _, ok := ks.Vindexes[name]; !ok {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s does not exist in keyspace %s", name, ksName)
}
}

// If this is the first vindex being defined on the table, create
// the empty table record
if table == nil {
table = &vschemapb.Table{
ColumnVindexes: make([]*vschemapb.ColumnVindex, 0, 4),
}
}

// Make sure there isn't already a vindex with the same name on
// this table.
for _, vindex := range table.ColumnVindexes {
if vindex.Name == name {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s already defined on table %s", name, tableName)
}
}

columns := make([]string, len(ddl.VindexCols), len(ddl.VindexCols))
for i, col := range ddl.VindexCols {
columns[i] = col.String()
}
table.ColumnVindexes = append(table.ColumnVindexes, &vschemapb.ColumnVindex{
Name: name,
Columns: columns,
})
ks.Tables[tableName] = table

return ks, nil

case sqlparser.DropColVindexStr:
spec := ddl.VindexSpec
name := spec.Name.String()
if table == nil {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "table %s.%s not defined in vschema", ksName, tableName)
}

for i, colVindex := range table.ColumnVindexes {
if colVindex.Name == name {
table.ColumnVindexes = append(table.ColumnVindexes[:i], table.ColumnVindexes[i+1:]...)
if len(table.ColumnVindexes) == 0 {
delete(ks.Tables, tableName)
}
return ks, nil
}
}
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s not defined in table %s.%s", name, ksName, tableName)
}

return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected vindex ddl operation %s", ddl.Action)
}
96 changes: 77 additions & 19 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/schemamanager"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
Expand Down Expand Up @@ -390,7 +391,7 @@ var commands = []commandGroup{
"<keyspace>",
"Displays the VTGate routing schema."},
{"ApplyVSchema", commandApplyVSchema,
"{-vschema=<vschema> || -vschema_file=<vschema file>} [-cells=c1,c2,...] [-skip_rebuild] <keyspace>",
"{-vschema=<vschema> || -vschema_file=<vschema file> || -sql=<sql> || -sql_file=<sql file>} [-cells=c1,c2,...] [-skip_rebuild] [-dry-run] <keyspace>",
"Applies the VTGate routing schema to the provided keyspace. Shows the result after application."},
{"RebuildVSchemaGraph", commandRebuildVSchemaGraph,
"[-cells=c1,c2,...]",
Expand Down Expand Up @@ -2122,6 +2123,9 @@ func commandRebuildVSchemaGraph(ctx context.Context, wr *wrangler.Wrangler, subF
func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
vschema := subFlags.String("vschema", "", "Identifies the VTGate routing schema")
vschemaFile := subFlags.String("vschema_file", "", "Identifies the VTGate routing schema file")
sql := subFlags.String("sql", "", "A vschema ddl SQL statement (e.g. `add vindex`, `alter table t add vindex hash(id)`, etc)")
sqlFile := subFlags.String("sql_file", "", "A vschema ddl SQL statement (e.g. `add vindex`, `alter table t add vindex hash(id)`, etc)")
dryRun := subFlags.Bool("dry-run", false, "If set, do not save the altered vschema, simply echo to console.")
skipRebuild := subFlags.Bool("skip_rebuild", false, "If set, do no rebuild the SrvSchema objects.")
var cells flagutil.StringListValue
subFlags.Var(&cells, "cells", "If specified, limits the rebuild to the cells, after upload. Ignored if skipRebuild is set.")
Expand All @@ -2132,34 +2136,88 @@ func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *f
if subFlags.NArg() != 1 {
return fmt.Errorf("the <keyspace> argument is required for the ApplyVSchema command")
}
if (*vschema == "") == (*vschemaFile == "") {
return fmt.Errorf("either the vschema or vschemaFile flag must be specified when calling the ApplyVSchema command")
keyspace := subFlags.Arg(0)

var vs *vschemapb.Keyspace
var err error

sqlMode := (*sql != "") != (*sqlFile != "")
jsonMode := (*vschema != "") != (*vschemaFile != "")

if sqlMode && jsonMode {
return fmt.Errorf("only one of the sql, sql_file, vschema, or vschema_file flags may be specified when calling the ApplyVSchema command")
}
var schema []byte
if *vschemaFile != "" {
var err error
schema, err = ioutil.ReadFile(*vschemaFile)

if !sqlMode && !jsonMode {
return fmt.Errorf("one of the sql, sql_file, vschema, or vschema_file flags must be specified when calling the ApplyVSchema command")
}

if sqlMode {
if *sqlFile != "" {
sqlBytes, err := ioutil.ReadFile(*sqlFile)
if err != nil {
return err
}
*sql = string(sqlBytes)
}

stmt, err := sqlparser.Parse(*sql)
if err != nil {
return fmt.Errorf("error parsing vschema statement `%s`: %v", *sql, err)
}
ddl, ok := stmt.(*sqlparser.DDL)
if !ok {
return fmt.Errorf("error parsing vschema statement `%s`: not a ddl statement", *sql)
}

vs, err = wr.TopoServer().GetVSchema(ctx, keyspace)
if err != nil {
if topo.IsErrType(err, topo.NoNode) {
vs = &vschemapb.Keyspace{}
} else {
return err
}
}

vs, err = topotools.ApplyVSchemaDDL(keyspace, vs, ddl)
if err != nil {
return err
}

} else {
schema = []byte(*vschema)
}
var vs vschemapb.Keyspace
err := json2.Unmarshal(schema, &vs)
if err != nil {
return err
}
keyspace := subFlags.Arg(0)
if err := wr.TopoServer().SaveVSchema(ctx, keyspace, &vs); err != nil {
return err
// json mode
var schema []byte
if *vschemaFile != "" {
var err error
schema, err = ioutil.ReadFile(*vschemaFile)
if err != nil {
return err
}
} else {
schema = []byte(*vschema)
}

vs = &vschemapb.Keyspace{}
err := json2.Unmarshal(schema, vs)
if err != nil {
return err
}
}

b, err := json2.MarshalIndentPB(&vs, " ")
b, err := json2.MarshalIndentPB(vs, " ")
if err != nil {
wr.Logger().Errorf("Failed to marshal VSchema for display: %v", err)
} else {
wr.Logger().Printf("Uploaded VSchema object:\n%s\nIf this is not what you expected, check the input data (as JSON parsing will skip unexpected fields).\n", b)
wr.Logger().Printf("New VSchema object:\n%s\nIf this is not what you expected, check the input data (as JSON parsing will skip unexpected fields).\n", b)
}

if *dryRun {
wr.Logger().Printf("Dry run: Skipping update of VSchema\n")
return nil
}

if err := wr.TopoServer().SaveVSchema(ctx, keyspace, vs); err != nil {
return err
}

if *skipRebuild {
Expand Down
Loading

0 comments on commit 3b2294b

Please sign in to comment.