Skip to content

Commit 2541b53

Browse files
committed
feat: add __shardingkey to 'ReplacingMergeTree' tables, generate by sorting columns
1 parent 2481213 commit 2541b53

File tree

1 file changed

+53
-19
lines changed

1 file changed

+53
-19
lines changed

output/clickhouse.go

Lines changed: 53 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,25 @@ func (c *ClickHouse) initSchema() (err error) {
447447
if conn, _, err = sc.NextGoodReplica(c.cfg.Clickhouse.Ctx, 0); err != nil {
448448
return
449449
}
450+
// Check distributed metric table
451+
if chCfg := &c.cfg.Clickhouse; chCfg.Cluster != "" {
452+
withDistTable := false
453+
info, e := c.getDistTbls(c.TableName, chCfg.Cluster)
454+
if e != nil {
455+
return e
456+
}
457+
c.distMetricTbls = make([]string, 0)
458+
for _, i := range info {
459+
c.distMetricTbls = append(c.distMetricTbls, i.name)
460+
if i.cluster == c.cfg.Clickhouse.Cluster {
461+
withDistTable = true
462+
}
463+
}
464+
if !withDistTable {
465+
err = errors.Newf("Please create distributed table for %s in cluster '%s'.", c.TableName, c.cfg.Clickhouse.Cluster)
466+
return
467+
}
468+
}
450469
if err = c.ensureShardingkey(conn, c.TableName, c.taskCfg.Parser); err != nil {
451470
return
452471
}
@@ -503,25 +522,25 @@ func (c *ClickHouse) initSchema() (err error) {
503522
}
504523
util.Logger.Info(fmt.Sprintf("Prepare sql=> %s", c.prepareSQL), zap.String("task", c.taskCfg.Name))
505524

506-
// Check distributed metric table
507-
if chCfg := &c.cfg.Clickhouse; chCfg.Cluster != "" {
508-
withDistTable := false
509-
info, e := c.getDistTbls(c.TableName, chCfg.Cluster)
510-
if e != nil {
511-
return e
512-
}
513-
c.distMetricTbls = make([]string, 0)
514-
for _, i := range info {
515-
c.distMetricTbls = append(c.distMetricTbls, i.name)
516-
if i.cluster == c.cfg.Clickhouse.Cluster {
517-
withDistTable = true
518-
}
519-
}
520-
if !withDistTable {
521-
err = errors.Newf("Please create distributed table for %s in cluster '%s'.", c.TableName, c.cfg.Clickhouse.Cluster)
522-
return
523-
}
524-
}
525+
// // Check distributed metric table
526+
// if chCfg := &c.cfg.Clickhouse; chCfg.Cluster != "" {
527+
// withDistTable := false
528+
// info, e := c.getDistTbls(c.TableName, chCfg.Cluster)
529+
// if e != nil {
530+
// return e
531+
// }
532+
// c.distMetricTbls = make([]string, 0)
533+
// for _, i := range info {
534+
// c.distMetricTbls = append(c.distMetricTbls, i.name)
535+
// if i.cluster == c.cfg.Clickhouse.Cluster {
536+
// withDistTable = true
537+
// }
538+
// }
539+
// if !withDistTable {
540+
// err = errors.Newf("Please create distributed table for %s in cluster '%s'.", c.TableName, c.cfg.Clickhouse.Cluster)
541+
// return
542+
// }
543+
// }
525544
return nil
526545
}
527546

@@ -757,6 +776,21 @@ func (c *ClickHouse) ensureShardingkey(conn *pool.Conn, tblName string, parser s
757776
if err = conn.Exec(query); err != nil {
758777
return
759778
}
779+
780+
// var distTables []DistTblInfo
781+
// distTables, err = c.getDistTbls(tblName, c.cfg.Clickhouse.Cluster)
782+
// if err != nil {
783+
// return
784+
// }
785+
786+
for _, distTbl := range c.distMetricTbls {
787+
query := fmt.Sprintf("ALTER TABLE `%s`.`%s` %s ADD COLUMN IF NOT EXISTS `__shardingkey` Int64",
788+
c.dbName, distTbl, onCluster)
789+
util.Logger.Info(fmt.Sprintf("executing sql=> %s", query), zap.String("task", c.taskCfg.Name))
790+
if err = conn.Exec(query); err != nil {
791+
return
792+
}
793+
}
760794
}
761795
return
762796
}

0 commit comments

Comments
 (0)