Skip to content

Commit

Permalink
Merge pull request #25 from uber/schema-fetch
Browse files Browse the repository at this point in the history
fetch schema job impl
  • Loading branch information
shz117 authored Nov 20, 2018
2 parents 8390653 + 8d5af78 commit c8a79e0
Show file tree
Hide file tree
Showing 19 changed files with 661 additions and 40 deletions.
2 changes: 1 addition & 1 deletion api/schema_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func (handler *SchemaHandler) GetTable(w http.ResponseWriter, r *http.Request) {
// default: errorResponse
// 200: noContentResponse
func (handler *SchemaHandler) AddTable(w http.ResponseWriter, r *http.Request) {

var addTableRequest AddTableRequest
err := ReadRequest(r, &addTableRequest)
if err != nil {
Expand Down Expand Up @@ -184,7 +185,6 @@ func (handler *SchemaHandler) UpdateTableConfig(w http.ResponseWriter, r *http.R
// default: errorResponse
// 200: noContentResponse
func (handler *SchemaHandler) DeleteTable(w http.ResponseWriter, r *http.Request) {

var deleteTableRequest DeleteTableRequest
err := ReadRequest(r, &deleteTableRequest)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion api/schema_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ var _ = ginkgo.Describe("SchemaHandler", func() {

testMetaStore := &mocks.MetaStore{}
var testMemStore *memMocks.MemStore
var schemaHandler *SchemaHandler

ginkgo.BeforeEach(func() {
testMemStore = CreateMemStore(&testTableSchema, 0, nil, nil)
schemaHandler := NewSchemaHandler(testMetaStore)
schemaHandler = NewSchemaHandler(testMetaStore)
testRouter := mux.NewRouter()
schemaHandler.Register(testRouter.PathPrefix("/schema").Subrouter())
testServer = httptest.NewUnstartedServer(WithPanicHandling(testRouter))
Expand Down
13 changes: 13 additions & 0 deletions clients/clients_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package clients_test

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

func TestClients(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Clients Suite")
}
100 changes: 100 additions & 0 deletions clients/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package clients

import (
"encoding/json"
"fmt"
"github.com/uber/aresdb/metastore/common"
"github.com/uber/aresdb/utils"
"io/ioutil"
"net/http"
)

// ControllerClient defines methods to communicate with ares-controller
type ControllerClient interface {
GetSchemaHash(namespace string) (string, error)
GetAllSchema(namespace string) ([]common.Table, error)
}

// ControllerHTTPClient implements ControllerClient over http
type ControllerHTTPClient struct {
c *http.Client
controllerHost string
controllerPort int
headers http.Header
}

// NewControllerHTTPClient returns new ControllerHTTPClient
func NewControllerHTTPClient(controllerHost string, controllerPort int, headers http.Header) *ControllerHTTPClient {
return &ControllerHTTPClient{
c: &http.Client{},
controllerHost: controllerHost,
controllerPort: controllerPort,
headers: headers,
}
}

func (c *ControllerHTTPClient) GetSchemaHash(namespace string) (hash string, err error) {
var req *http.Request
req, err = c.getRequest(namespace, true)
if err != nil {
return
}
var resp *http.Response
resp, err = c.c.Do(req)
if err != nil {
return
}
if resp.StatusCode != http.StatusOK {
err = utils.StackError(nil, fmt.Sprintf("controller client error fetching hash, status code %d", resp.StatusCode))
}

var b []byte
b, err = ioutil.ReadAll(resp.Body)
if err != nil {
return
}
hash = string(b)
return
}

func (c *ControllerHTTPClient) GetAllSchema(namespace string) (tables []common.Table, err error) {
var req *http.Request
req, err = c.getRequest(namespace, false)
if err != nil {
return
}
var resp *http.Response
resp, err = c.c.Do(req)
if err != nil {
return
}
if resp.StatusCode != http.StatusOK {
err = utils.StackError(nil, fmt.Sprintf("controller client error fetching schema, status code %d", resp.StatusCode))
}

var b []byte
b, err = ioutil.ReadAll(resp.Body)
if err != nil {
return
}
err = json.Unmarshal(b, &tables)
if err != nil {
return
}

return
}

func (c *ControllerHTTPClient) getRequest(namespace string, hash bool) (req *http.Request, err error) {
suffix := "tables"
if hash {
suffix = "hash"
}
url := fmt.Sprintf("http://%s:%d/schema/%s/%s", c.controllerHost, c.controllerPort, namespace, suffix)
req, err = http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return
}
req.Header = c.headers
return
}
72 changes: 72 additions & 0 deletions clients/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package clients

import (
"encoding/json"
mux "github.com/gorilla/mux"
"github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/uber/aresdb/metastore/common"
"net/http"
"net/http/httptest"
"strconv"
"strings"
)

var _ = ginkgo.Describe("Controller", func() {
var testServer *httptest.Server
var host string
var port int

headers := http.Header{
"Foo": []string{"bar"},
}

tables := []common.Table{
{
Version: 0,
Name: "test1",
Columns: []common.Column{
{
Name: "col1",
Type: "int32",
},
},
},
}

ginkgo.BeforeEach(func() {
testRouter := mux.NewRouter()
testServer = httptest.NewUnstartedServer(testRouter)
testRouter.HandleFunc("/schema/ns1/tables", func(w http.ResponseWriter, r *http.Request) {
b, _ := json.Marshal(tables)
w.Write(b)
})
testRouter.HandleFunc("/schema/ns1/hash", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("123"))
})
testServer.Start()
hostPort := testServer.Listener.Addr().String()
comps := strings.SplitN(hostPort, ":", 2)
host = comps[0]
port, _ = strconv.Atoi(comps[1])
})

ginkgo.AfterEach(func() {
testServer.Close()
})

ginkgo.It("NewControllerHTTPClient should work", func() {
c := NewControllerHTTPClient(host, port, headers)
Ω(c.controllerPort).Should(Equal(port))
Ω(c.controllerHost).Should(Equal(host))
Ω(c.headers).Should(Equal(headers))

hash, err := c.GetSchemaHash("ns1")
Ω(err).Should(BeNil())
Ω(hash).Should(Equal("123"))

tablesGot, err := c.GetAllSchema("ns1")
Ω(err).Should(BeNil())
Ω(tablesGot).Should(Equal(tables))
})
})
54 changes: 54 additions & 0 deletions clients/mocks/ControllerClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 19 additions & 1 deletion cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/uber-common/bark"
"github.com/uber/aresdb/clients"
"github.com/uber/aresdb/memutils"
)

Expand Down Expand Up @@ -131,7 +132,11 @@ func StartService(cfg common.AresServerConfig, logger bark.Logger, queryLogger b

httpWrappers = append([]utils.HTTPHandlerWrapper{utils.WithMetricsFunc}, httpWrappers...)

schemaHandler.Register(router.PathPrefix("/schema").Subrouter(), httpWrappers...)
schemaRouter := router.PathPrefix("/schema")
if cfg.Cluster.Enable {
schemaRouter = schemaRouter.Methods(http.MethodGet)
}
schemaHandler.Register(schemaRouter.Subrouter(), httpWrappers...)
enumHander.Register(router.PathPrefix("/schema").Subrouter(), httpWrappers...)
dataHandler.Register(router.PathPrefix("/data").Subrouter(), httpWrappers...)
queryHandler.Register(router.PathPrefix("/query").Subrouter(), httpWrappers...)
Expand All @@ -152,6 +157,19 @@ func StartService(cfg common.AresServerConfig, logger bark.Logger, queryLogger b
batchStatsReporter := memstore.NewBatchStatsReporter(5*60, memStore, metaStore)
go batchStatsReporter.Run()

if cfg.Cluster.Enable {
if cfg.Cluster.ClusterName == "" {
logger.Fatal("Missing cluster name")
}
controllerClientCfg := cfg.Clients.Controller
if controllerClientCfg == nil {
logger.Fatal("Missing controller client config", err)
}
controllerClient := clients.NewControllerHTTPClient(controllerClientCfg.Host, controllerClientCfg.Port, controllerClientCfg.Headers)
schemaFetchJob := metastore.NewSchemaFetchJob(5*60, metaStore, metastore.NewTableSchameValidator(), controllerClient, cfg.Cluster.ClusterName, "")
go schemaFetchJob.Run()
}

utils.GetLogger().Infof("Starting HTTP server on port %d with max connection %d", *port, cfg.HTTP.MaxConnections)
utils.LimitServe(*port, handlers.CORS(allowOrigins, allowHeaders, allowMethods)(router), cfg.HTTP)
batchStatsReporter.Stop()
Expand Down
23 changes: 23 additions & 0 deletions common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

package common

import (
"net/http"
)

// TimezoneConfig is the static config for timezone column support
type TimezoneConfig struct {
// table to lookup timezone columns
Expand Down Expand Up @@ -41,6 +45,23 @@ type HTTPConfig struct {
WriteTimeOutInSeconds int `yaml:"write_time_out_in_seconds"`
}

// ControllerConfig is the config for ares-controller client
type ControllerConfig struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
Headers http.Header `yaml:"headers"`
}

// ClientsConfig is the config for all clients
type ClientsConfig struct {
Controller *ControllerConfig `yaml:"controller,omitempty"`
}

type ClusterConfig struct {
Enable bool `yaml:"enable"`
ClusterName string `yaml:"cluster_name"`
}

// AresServerConfig is config specific for ares server.
type AresServerConfig struct {
// HTTP port for serving.
Expand All @@ -64,4 +85,6 @@ type AresServerConfig struct {
Query QueryConfig `yaml:"query"`
DiskStore DiskStoreConfig `yaml:"disk_store"`
HTTP HTTPConfig `yaml:"http"`
Cluster ClusterConfig `yaml:"cluster"`
Clients ClientsConfig `yaml:"clients"`
}
14 changes: 14 additions & 0 deletions config/ares.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,18 @@ http:
read_time_out_in_seconds: 20
write_time_out_in_seconds: 300 # 5 minutes to write the result

clients:
# example controller client configs
controller:
host: localhost
port: 6708
headers:
RPC-Caller:
- aresdb
RPC-Service:
- ares-controller

cluster:
enable: false
cluster_name: ""

Loading

0 comments on commit c8a79e0

Please sign in to comment.