Skip to content

Commit

Permalink
Merge branch 'master' into backoff-setting
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Jan 22, 2025
2 parents 0c2b5a2 + 6cadbe7 commit b117d30
Show file tree
Hide file tree
Showing 16 changed files with 39 additions and 31 deletions.
4 changes: 2 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func (c *client) UpdateOption(option opt.DynamicOption, value any) error {
}
case opt.EnableTSOFollowerProxy:
if c.inner.getServiceMode() != pdpb.ServiceMode_PD_SVC_MODE {
return errors.New("[pd] tso follower proxy is only supported in PD mode")
return errors.New("[pd] tso follower proxy is only supported when PD provides TSO")
}
enable, ok := value.(bool)
if !ok {
Expand Down Expand Up @@ -526,7 +526,7 @@ func (c *client) GetLocalTS(ctx context.Context, _ string) (physical int64, logi

// GetMinTS implements the TSOClient interface.
func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, err error) {
// Handle compatibility issue in case of PD/PD service doesn't support GetMinTS API.
// Handle compatibility issue in case of PD doesn't support GetMinTS API.
serviceMode := c.inner.getServiceMode()
switch serviceMode {
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
Expand Down
2 changes: 1 addition & 1 deletion client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (ci *clientInner) doRequest(
if readErr != nil {
logFields = append(logFields, zap.NamedError("read-body-error", err))
} else {
// PD service will return a JSON body containing the detailed error message
// PD will return a JSON body containing the detailed error message
// when the status code is not `http.StatusOK` 200.
bs = bytes.TrimSpace(bs)
logFields = append(logFields, zap.ByteString("body", bs))
Expand Down
33 changes: 22 additions & 11 deletions client/inner_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,31 @@ func (c *innerClient) setServiceMode(newMode pdpb.ServiceMode) {
// If we are using TSO server proxy, we always use PD_SVC_MODE.
newMode = pdpb.ServiceMode_PD_SVC_MODE
}

if newMode == c.serviceMode {
return
}
log.Info("[pd] changing service mode",
zap.String("old-mode", c.serviceMode.String()),
zap.String("new-mode", newMode.String()))
log.Info("[pd] changing TSO provider",
zap.String("old", convertToString(c.serviceMode)),
zap.String("new", convertToString(newMode)))
c.resetTSOClientLocked(newMode)
oldMode := c.serviceMode
c.serviceMode = newMode
log.Info("[pd] service mode changed",
zap.String("old-mode", oldMode.String()),
zap.String("new-mode", newMode.String()))
log.Info("[pd] TSO provider changed",
zap.String("old", convertToString(oldMode)),
zap.String("new", convertToString(newMode)))
}

func convertToString(mode pdpb.ServiceMode) string {
switch mode {
case pdpb.ServiceMode_PD_SVC_MODE:
return "pd"
case pdpb.ServiceMode_API_SVC_MODE:
return "tso server"
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
return "unknown"
default:
return "invalid"
}
}

// Reset a new TSO client.
Expand All @@ -116,9 +128,8 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) {
newTSOCli = tso.NewClient(c.ctx, c.option,
newTSOSvcDiscovery, &tso.MSStreamBuilderFactory{})
if err := newTSOSvcDiscovery.Init(); err != nil {
log.Error("[pd] failed to initialize tso service discovery. keep the current service mode",
log.Error("[pd] failed to initialize tso service discovery",
zap.Strings("svr-urls", c.svrUrls),
zap.String("current-mode", c.serviceMode.String()),
zap.Error(err))
return
}
Expand All @@ -133,12 +144,12 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) {
oldTSOClient.Close()
// Replace the old TSO service discovery if needed.
oldTSOSvcDiscovery := c.tsoSvcDiscovery
// If newTSOSvcDiscovery is nil, that's expected, as it means we are switching to PD mode and
// If newTSOSvcDiscovery is nil, that's expected, as it means we are switching to non-microservice env and
// no tso microservice discovery is needed.
c.tsoSvcDiscovery = newTSOSvcDiscovery
// Close the old TSO service discovery safely after both the old client and service discovery are replaced.
if oldTSOSvcDiscovery != nil {
// We are switching from PD service mode to PD mode, so delete the old tso microservice discovery.
// We are switching from microservice env to non-microservice env, so delete the old tso microservice discovery.
oldTSOSvcDiscovery.Close()
}
}
Expand Down
6 changes: 3 additions & 3 deletions client/servicediscovery/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ type UpdateKeyspaceIDFunc func() error

var _ ServiceDiscovery = (*serviceDiscovery)(nil)

// serviceDiscovery is the service discovery client of PD/PD service which is quorum based
// serviceDiscovery is the service discovery client of PD which is quorum based
type serviceDiscovery struct {
isInitialized bool

Expand Down Expand Up @@ -490,7 +490,7 @@ func (c *serviceDiscovery) Init() error {
log.Info("[pd] init cluster id", zap.Uint64("cluster-id", c.clusterID))

// We need to update the keyspace ID before we discover and update the service mode
// so that TSO in API mode can be initialized with the correct keyspace ID.
// so that TSO in PD can be initialized with the correct keyspace ID.
if c.keyspaceID == constants.NullKeyspaceID && c.updateKeyspaceIDFunc != nil {
if err := c.initRetry(c.updateKeyspaceIDFunc); err != nil {
return err
Expand Down Expand Up @@ -854,7 +854,7 @@ func (c *serviceDiscovery) checkServiceModeChanged() error {
clusterInfo, err := c.getClusterInfo(c.ctx, leaderURL, c.option.Timeout)
if err != nil {
if strings.Contains(err.Error(), "Unimplemented") {
// If the method is not supported, we set it to pd mode.
// If the method is not supported, we fallback to non-microservice env.
// TODO: it's a hack way to solve the compatibility issue.
// we need to remove this after all maintained version supports the method.
c.callbacks.onServiceModeUpdate(pdpb.ServiceMode_PD_SVC_MODE)
Expand Down
3 changes: 0 additions & 3 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,6 @@ func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGro

// GetKeyspaceConfigByKind returns the keyspace config for the given user kind.
func (m *GroupManager) GetKeyspaceConfigByKind(userKind endpoint.UserKind) (map[string]string, error) {
// when server is not in API mode, we don't need to return the keyspace config
if m == nil {
return map[string]string{}, nil
}
Expand Down Expand Up @@ -418,7 +417,6 @@ var failpointOnce sync.Once

// UpdateKeyspaceForGroup updates the keyspace field for the keyspace group.
func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupID string, keyspaceID uint32, mutation int) error {
// when server is not in API mode, we don't need to update the keyspace for keyspace group
if m == nil {
return nil
}
Expand Down Expand Up @@ -477,7 +475,6 @@ func (m *GroupManager) updateKeyspaceForGroupLocked(userKind endpoint.UserKind,

// UpdateKeyspaceGroup updates the keyspace group.
func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUserKind, newUserKind endpoint.UserKind, keyspaceID uint32) error {
// when server is not in API mode, we don't need to update the keyspace group
if m == nil {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (gta *GlobalTSOAllocator) Reset() {
}

// primaryElectionLoop is used to maintain the TSO primary election and TSO's
// running allocator. It is only used in API mode.
// running allocator. It is only used in microservice env.
func (gta *GlobalTSOAllocator) primaryElectionLoop() {
defer logutil.LogPanic()
defer gta.wg.Done()
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/keypath/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func svcRootPath(svcName string) string {
return path.Join(constant.MicroserviceRootPath, c, svcName)
}

// LegacyRootPath returns the root path of legacy pd service.
// LegacyRootPath returns the root path of legacy PD.
// Path: /pd/{cluster_id}
func LegacyRootPath() string {
return path.Join(pdRootPath, strconv.FormatUint(ClusterID(), 10))
Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1180,7 +1180,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
// TODO: Due to the accuracy requirements of the API "/regions/check/xxx",
// region stats needs to be collected in API mode.
// region stats needs to be collected in microservice env.
// We need to think of a better way to reduce this part of the cost in the future.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.MiscRunner.RunTask(
Expand Down Expand Up @@ -1251,7 +1251,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
ratelimit.CollectRegionStatsAsync,
func(ctx context.Context) {
// TODO: Due to the accuracy requirements of the API "/regions/check/xxx",
// region stats needs to be collected in API mode.
// region stats needs to be collected in microservice env.
// We need to think of a better way to reduce this part of the cost in the future.
cluster.Collect(ctx, c, region)
},
Expand Down
2 changes: 1 addition & 1 deletion server/config/service_middleware_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (
defaultEnableGRPCRateLimitMiddleware = true
)

// ServiceMiddlewareConfig is the configuration for PD Service middleware.
// ServiceMiddlewareConfig is the configuration for PD middleware.
type ServiceMiddlewareConfig struct {
AuditConfig `json:"audit"`
RateLimitConfig `json:"rate-limit"`
Expand Down
2 changes: 1 addition & 1 deletion server/config/service_middleware_persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewServiceMiddlewarePersistOptions(cfg *ServiceMiddlewareConfig) *ServiceMi
return o
}

// GetAuditConfig returns pd service middleware configurations.
// GetAuditConfig returns PD middleware configurations.
func (o *ServiceMiddlewarePersistOptions) GetAuditConfig() *AuditConfig {
return o.audit.Load().(*AuditConfig)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/mcs/tso/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func TestForwardOnlyTSONoScheduling(t *testing.T) {
testutil.StatusOK(re), testutil.StringContain(re, "Reset ts successfully"), testutil.WithHeader(re, apiutil.XForwardedToMicroserviceHeader, "true"))
re.NoError(err)

// If close tso server, it should try forward to tso server, but return error in pd service mode.
// If close tso server, it should try forward to tso server, but return error in non-serverless env.
ttc.Destroy()
err = testutil.CheckPostJSON(tests.TestDialClient, fmt.Sprintf("%s/%s", urlPrefix, "admin/reset-ts"), input,
testutil.Status(re, http.StatusInternalServerError), testutil.StringContain(re, "[PD:apiutil:ErrRedirect]redirect failed"))
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ func TestUpgradingPDAndTSOClusters(t *testing.T) {
pdLeader := pdCluster.GetServer(leaderName)
backendEndpoints := pdLeader.GetAddr()

// Create a pd client in PD mode to let the API leader to forward requests to the TSO cluster.
// Create a PD client in microservice env to let the PD leader to forward requests to the TSO cluster.
re.NoError(failpoint.Enable("github.com/tikv/pd/client/servicediscovery/usePDServiceMode", "return(true)"))
pdClient, err := pd.NewClientWithContext(context.Background(),
caller.TestComponent,
Expand Down
2 changes: 1 addition & 1 deletion tools/pd-ctl/pdctl/command/config_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const (
ruleBundlePrefix = "pd/api/v1/config/placement-rule"
pdServerPrefix = "pd/api/v1/config/pd-server"
serviceMiddlewareConfigPrefix = "pd/api/v1/service-middleware/config"
// flagFromPD has no influence for pd mode, but it is useful for us to debug in pd service mode.
// flagFromPD is useful for us to debug.
flagFromPD = "from_pd"
)

Expand Down
2 changes: 1 addition & 1 deletion tools/pd-ctl/pdctl/command/keyspace_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func showKeyspaceNameCommandFunc(cmd *cobra.Command, args []string) {
}
resp, err := doRequest(cmd, url, http.MethodGet, http.Header{})
// Retry without the force_refresh_group_id if the keyspace group manager is not initialized.
// This can happen when PD is not running in API mode.
// This can happen when PD is not running in microservice env.
if err != nil && refreshGroupID && strings.Contains(err.Error(), handlers.GroupManagerUninitializedErr) {
resp, err = doRequest(cmd, fmt.Sprintf("%s/%s", keyspacePrefix, args[0]), http.MethodGet, http.Header{})
}
Expand Down
2 changes: 1 addition & 1 deletion tools/pd-ctl/tests/keyspace/keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ func TestShowKeyspaceGroupPrimary(t *testing.T) {
re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop"))
}

func TestInPDMode(t *testing.T) {
func TestCmdWithoutKeyspaceGroupInitialized(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down

0 comments on commit b117d30

Please sign in to comment.