Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: rename file and clean up #8996

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func (c *client) UpdateOption(option opt.DynamicOption, value any) error {
}
case opt.EnableTSOFollowerProxy:
if c.inner.getServiceMode() != pdpb.ServiceMode_PD_SVC_MODE {
return errors.New("[pd] tso follower proxy is only supported in PD mode")
return errors.New("[pd] tso follower proxy is only supported when PD provides TSO")
}
enable, ok := value.(bool)
if !ok {
Expand Down Expand Up @@ -526,7 +526,7 @@ func (c *client) GetLocalTS(ctx context.Context, _ string) (physical int64, logi

// GetMinTS implements the TSOClient interface.
func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, err error) {
// Handle compatibility issue in case of PD/PD service doesn't support GetMinTS API.
// Handle compatibility issue in case of PD doesn't support GetMinTS API.
serviceMode := c.inner.getServiceMode()
switch serviceMode {
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
Expand Down
2 changes: 1 addition & 1 deletion client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (ci *clientInner) doRequest(
if readErr != nil {
logFields = append(logFields, zap.NamedError("read-body-error", err))
} else {
// PD service will return a JSON body containing the detailed error message
// PD will return a JSON body containing the detailed error message
// when the status code is not `http.StatusOK` 200.
bs = bytes.TrimSpace(bs)
logFields = append(logFields, zap.ByteString("body", bs))
Expand Down
33 changes: 22 additions & 11 deletions client/inner_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,31 @@
// If we are using TSO server proxy, we always use PD_SVC_MODE.
newMode = pdpb.ServiceMode_PD_SVC_MODE
}

if newMode == c.serviceMode {
return
}
log.Info("[pd] changing service mode",
zap.String("old-mode", c.serviceMode.String()),
zap.String("new-mode", newMode.String()))
log.Info("[pd] changing TSO provider",
zap.String("old", convertToString(c.serviceMode)),
zap.String("new", convertToString(newMode)))
c.resetTSOClientLocked(newMode)
oldMode := c.serviceMode
c.serviceMode = newMode
log.Info("[pd] service mode changed",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to keep one of the two logs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, but I don't want to expose the API mode. Any idea?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if newMode != pdpb.ServiceMode_PD_SVC_MODE {
log.Info("[pd] changing from non-microservice to microservice")
}else{
log.Info("[pd] changing from microservice to non-microservice")
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it similar to line 87 and line 91?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resetTSOClientLocked and setServiceMode may be called at different places?

zap.String("old-mode", oldMode.String()),
zap.String("new-mode", newMode.String()))
log.Info("[pd] TSO provider changed",
zap.String("old", convertToString(oldMode)),
zap.String("new", convertToString(newMode)))
}

func convertToString(mode pdpb.ServiceMode) string {
switch mode {
case pdpb.ServiceMode_PD_SVC_MODE:
return "pd"
case pdpb.ServiceMode_API_SVC_MODE:
return "tso server"
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
return "unknown"
default:
return "invalid"

Check warning on line 93 in client/inner_client.go

View check run for this annotation

Codecov / codecov/patch

client/inner_client.go#L92-L93

Added lines #L92 - L93 were not covered by tests
}
}

// Reset a new TSO client.
Expand All @@ -102,9 +114,8 @@
newTSOCli = tso.NewClient(c.ctx, c.option,
newTSOSvcDiscovery, &tso.MSStreamBuilderFactory{})
if err := newTSOSvcDiscovery.Init(); err != nil {
log.Error("[pd] failed to initialize tso service discovery. keep the current service mode",
log.Error("[pd] failed to initialize tso service discovery",
zap.Strings("svr-urls", c.svrUrls),
zap.String("current-mode", c.serviceMode.String()),
zap.Error(err))
return
}
Expand All @@ -119,12 +130,12 @@
oldTSOClient.Close()
// Replace the old TSO service discovery if needed.
oldTSOSvcDiscovery := c.tsoSvcDiscovery
// If newTSOSvcDiscovery is nil, that's expected, as it means we are switching to PD mode and
// If newTSOSvcDiscovery is nil, that's expected, as it means we are switching to non-microservice env and
// no tso microservice discovery is needed.
c.tsoSvcDiscovery = newTSOSvcDiscovery
// Close the old TSO service discovery safely after both the old client and service discovery are replaced.
if oldTSOSvcDiscovery != nil {
// We are switching from PD service mode to PD mode, so delete the old tso microservice discovery.
// We are switching from microservice env to non-microservice env, so delete the old tso microservice discovery.
oldTSOSvcDiscovery.Close()
}
}
Expand Down
6 changes: 3 additions & 3 deletions client/servicediscovery/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@
_ TSOEventSource = (*serviceDiscovery)(nil)
)

// serviceDiscovery is the service discovery client of PD/PD service which is quorum based
// serviceDiscovery is the service discovery client of PD which is quorum based
type serviceDiscovery struct {
isInitialized bool

Expand Down Expand Up @@ -502,7 +502,7 @@
log.Info("[pd] init cluster id", zap.Uint64("cluster-id", c.clusterID))

// We need to update the keyspace ID before we discover and update the service mode
// so that TSO in API mode can be initialized with the correct keyspace ID.
// so that TSO in PD can be initialized with the correct keyspace ID.
if c.keyspaceID == constants.NullKeyspaceID && c.updateKeyspaceIDFunc != nil {
if err := c.initRetry(c.updateKeyspaceIDFunc); err != nil {
return err
Expand Down Expand Up @@ -864,7 +864,7 @@
clusterInfo, err := c.getClusterInfo(c.ctx, leaderURL, c.option.Timeout)
if err != nil {
if strings.Contains(err.Error(), "Unimplemented") {
// If the method is not supported, we set it to pd mode.
// If the method is not supported, we fallback to non-microservice env.

Check warning on line 867 in client/servicediscovery/service_discovery.go

View check run for this annotation

Codecov / codecov/patch

client/servicediscovery/service_discovery.go#L867

Added line #L867 was not covered by tests
// TODO: it's a hack way to solve the compatibility issue.
// we need to remove this after all maintained version supports the method.
if c.serviceModeUpdateCb != nil {
Expand Down
3 changes: 0 additions & 3 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,6 @@ func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGro

// GetKeyspaceConfigByKind returns the keyspace config for the given user kind.
func (m *GroupManager) GetKeyspaceConfigByKind(userKind endpoint.UserKind) (map[string]string, error) {
// when server is not in API mode, we don't need to return the keyspace config
if m == nil {
return map[string]string{}, nil
}
Expand Down Expand Up @@ -418,7 +417,6 @@ var failpointOnce sync.Once

// UpdateKeyspaceForGroup updates the keyspace field for the keyspace group.
func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupID string, keyspaceID uint32, mutation int) error {
// when server is not in API mode, we don't need to update the keyspace for keyspace group
if m == nil {
return nil
}
Expand Down Expand Up @@ -477,7 +475,6 @@ func (m *GroupManager) updateKeyspaceForGroupLocked(userKind endpoint.UserKind,

// UpdateKeyspaceGroup updates the keyspace group.
func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUserKind, newUserKind endpoint.UserKind, keyspaceID uint32) error {
// when server is not in API mode, we don't need to update the keyspace group
if m == nil {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (gta *GlobalTSOAllocator) Reset() {
}

// primaryElectionLoop is used to maintain the TSO primary election and TSO's
// running allocator. It is only used in API mode.
// running allocator. It is only used in microservice env.
func (gta *GlobalTSOAllocator) primaryElectionLoop() {
defer logutil.LogPanic()
defer gta.wg.Done()
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/keypath/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func svcRootPath(svcName string) string {
return path.Join(constant.MicroserviceRootPath, c, svcName)
}

// LegacyRootPath returns the root path of legacy pd service.
// LegacyRootPath returns the root path of legacy PD.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to replace legacy with non-microservice env?

Copy link
Member Author

@rleungx rleungx Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function name uses "legacy", I just keep using it.

// Path: /pd/{cluster_id}
func LegacyRootPath() string {
return path.Join(pdRootPath, strconv.FormatUint(ClusterID(), 10))
Expand Down
4 changes: 2 additions & 2 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1179,7 +1179,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
// TODO: Due to the accuracy requirements of the API "/regions/check/xxx",
// region stats needs to be collected in API mode.
// region stats needs to be collected in microservice env.
// We need to think of a better way to reduce this part of the cost in the future.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.MiscRunner.RunTask(
Expand Down Expand Up @@ -1250,7 +1250,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
ratelimit.CollectRegionStatsAsync,
func(ctx context.Context) {
// TODO: Due to the accuracy requirements of the API "/regions/check/xxx",
// region stats needs to be collected in API mode.
// region stats needs to be collected in microservice env.
// We need to think of a better way to reduce this part of the cost in the future.
cluster.Collect(ctx, c, region)
},
Expand Down
2 changes: 1 addition & 1 deletion server/config/service_middleware_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (
defaultEnableGRPCRateLimitMiddleware = true
)

// ServiceMiddlewareConfig is the configuration for PD Service middleware.
// ServiceMiddlewareConfig is the configuration for PD middleware.
type ServiceMiddlewareConfig struct {
AuditConfig `json:"audit"`
RateLimitConfig `json:"rate-limit"`
Expand Down
2 changes: 1 addition & 1 deletion server/config/service_middleware_persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewServiceMiddlewarePersistOptions(cfg *ServiceMiddlewareConfig) *ServiceMi
return o
}

// GetAuditConfig returns pd service middleware configurations.
// GetAuditConfig returns PD middleware configurations.
func (o *ServiceMiddlewarePersistOptions) GetAuditConfig() *AuditConfig {
return o.audit.Load().(*AuditConfig)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/mcs/tso/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func TestForwardOnlyTSONoScheduling(t *testing.T) {
testutil.StatusOK(re), testutil.StringContain(re, "Reset ts successfully"), testutil.WithHeader(re, apiutil.XForwardedToMicroserviceHeader, "true"))
re.NoError(err)

// If close tso server, it should try forward to tso server, but return error in pd service mode.
// If close tso server, it should try forward to tso server, but return error in non-serverless env.
ttc.Destroy()
err = testutil.CheckPostJSON(tests.TestDialClient, fmt.Sprintf("%s/%s", urlPrefix, "admin/reset-ts"), input,
testutil.Status(re, http.StatusInternalServerError), testutil.StringContain(re, "[PD:apiutil:ErrRedirect]redirect failed"))
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ func TestUpgradingPDAndTSOClusters(t *testing.T) {
pdLeader := pdCluster.GetServer(leaderName)
backendEndpoints := pdLeader.GetAddr()

// Create a pd client in PD mode to let the API leader to forward requests to the TSO cluster.
// Create a PD client in microservice env to let the PD leader to forward requests to the TSO cluster.
re.NoError(failpoint.Enable("github.com/tikv/pd/client/servicediscovery/usePDServiceMode", "return(true)"))
pdClient, err := pd.NewClientWithContext(context.Background(),
caller.TestComponent,
Expand Down
2 changes: 1 addition & 1 deletion tools/pd-ctl/pdctl/command/config_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const (
ruleBundlePrefix = "pd/api/v1/config/placement-rule"
pdServerPrefix = "pd/api/v1/config/pd-server"
serviceMiddlewareConfigPrefix = "pd/api/v1/service-middleware/config"
// flagFromPD has no influence for pd mode, but it is useful for us to debug in pd service mode.
// flagFromPD is useful for us to debug.
flagFromPD = "from_pd"
)

Expand Down
2 changes: 1 addition & 1 deletion tools/pd-ctl/pdctl/command/keyspace_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func showKeyspaceNameCommandFunc(cmd *cobra.Command, args []string) {
}
resp, err := doRequest(cmd, url, http.MethodGet, http.Header{})
// Retry without the force_refresh_group_id if the keyspace group manager is not initialized.
// This can happen when PD is not running in API mode.
// This can happen when PD is not running in microservice env.
if err != nil && refreshGroupID && strings.Contains(err.Error(), handlers.GroupManagerUninitializedErr) {
resp, err = doRequest(cmd, fmt.Sprintf("%s/%s", keyspacePrefix, args[0]), http.MethodGet, http.Header{})
}
Expand Down
2 changes: 1 addition & 1 deletion tools/pd-ctl/tests/keyspace/keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ func TestShowKeyspaceGroupPrimary(t *testing.T) {
re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop"))
}

func TestInPDMode(t *testing.T) {
func TestCmdWithoutKeyspaceGroupInitialized(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
Loading