diff --git a/configuration/pegasus.properties b/configuration/pegasus.properties index 37ad264f..29dbe772 100644 --- a/configuration/pegasus.properties +++ b/configuration/pegasus.properties @@ -5,3 +5,5 @@ enable_perf_counter = false perf_counter_tags = cluster=onebox,app=unit_test push_counter_interval_secs = 10 meta_query_timeout = 5000 +service_name = "" +service_fqdn = "" diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java b/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java index 949dfc29..be9e6b23 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java @@ -46,6 +46,8 @@ public class ClientOptions { public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY = "push_counter_interval_secs"; public static final String PEGASUS_META_QUERY_TIMEOUT_KEY = "meta_query_timeout"; public static final String PEGASUS_ENABLE_AUTH_KEY = "enable_auth"; + public static final String PEGASUS_SERVICE_NAME_KEY = "service_name"; + public static final String PEGASUS_SERVICE_FQDN_KEY = "service_fqdn"; public static final String DEFAULT_META_SERVERS = "127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603"; @@ -57,6 +59,8 @@ public class ClientOptions { public static final boolean DEFAULT_ENABLE_WRITE_LIMIT = true; public static final Duration DEFAULT_META_QUERY_TIMEOUT = Duration.ofMillis(5000); public static final boolean DEFAULT_ENABLE_AUTH = false; + public static final String DEFAULT_SERVICE_NAME = ""; + public static final String DEFAULT_SERVICE_FQDN = ""; private final String metaServers; private final Duration operationTimeout; @@ -67,6 +71,8 @@ public class ClientOptions { private final boolean enableWriteLimit; private final Duration metaQueryTimeout; private final boolean enableAuth; + private final String serviceName; + private final String serviceFQDN; protected ClientOptions(Builder builder) { this.metaServers = builder.metaServers; @@ -78,6 +84,8 @@ protected ClientOptions(Builder builder) { this.enableWriteLimit = builder.enableWriteLimit; this.metaQueryTimeout = builder.metaQueryTimeout; this.enableAuth = builder.enableAuth; + this.serviceName = builder.serviceName; + this.serviceFQDN = builder.serviceFQDN; } protected ClientOptions(ClientOptions original) { @@ -89,7 +97,9 @@ protected ClientOptions(ClientOptions original) { this.falconPushInterval = original.getFalconPushInterval(); this.enableWriteLimit = original.isWriteLimitEnabled(); this.metaQueryTimeout = original.getMetaQueryTimeout(); - this.enableAuth = original.enableAuth; + this.enableAuth = original.isEnableAuth(); + this.serviceName = original.getServiceName(); + this.serviceFQDN = original.getServiceFQDN(); } /** @@ -150,6 +160,8 @@ public static ClientOptions create(String configPath) throws PException { Duration.ofMillis( config.getLong(PEGASUS_META_QUERY_TIMEOUT_KEY, DEFAULT_META_QUERY_TIMEOUT.toMillis())); boolean enableAuth = config.getBoolean(PEGASUS_ENABLE_AUTH_KEY, DEFAULT_ENABLE_AUTH); + String serviceName = config.getString(PEGASUS_SERVICE_NAME_KEY, DEFAULT_SERVICE_NAME); + String serviceFQDN = config.getString(PEGASUS_SERVICE_FQDN_KEY, DEFAULT_SERVICE_FQDN); return ClientOptions.builder() .metaServers(metaList) @@ -160,6 +172,8 @@ public static ClientOptions create(String configPath) throws PException { .falconPushInterval(pushIntervalSecs) .metaQueryTimeout(metaQueryTimeout) .enableAuth(enableAuth) + .serviceName(serviceName) + .serviceFQDN(serviceFQDN) .build(); } @@ -178,7 +192,9 @@ public boolean equals(Object options) { && this.falconPushInterval.toMillis() == clientOptions.falconPushInterval.toMillis() && this.enableWriteLimit == clientOptions.enableWriteLimit && this.metaQueryTimeout.toMillis() == clientOptions.metaQueryTimeout.toMillis() - && this.enableAuth == clientOptions.enableAuth; + && this.enableAuth == clientOptions.enableAuth + && this.serviceName == clientOptions.serviceName + && this.serviceFQDN == clientOptions.serviceFQDN; } return false; } @@ -206,6 +222,10 @@ public String toString() { + metaQueryTimeout.toMillis() + ", enableAuth=" + enableAuth + + ", serviceName=" + + serviceName + + ", serviceFQDN=" + + serviceFQDN + '}'; } @@ -220,6 +240,8 @@ public static class Builder { private boolean enableWriteLimit = DEFAULT_ENABLE_WRITE_LIMIT; private Duration metaQueryTimeout = DEFAULT_META_QUERY_TIMEOUT; private boolean enableAuth = DEFAULT_ENABLE_AUTH; + private String serviceName = DEFAULT_SERVICE_NAME; + private String serviceFQDN = DEFAULT_SERVICE_FQDN; protected Builder() {} @@ -334,6 +356,29 @@ public Builder enableAuth(boolean enableAuth) { return this; } + /** + * service name. Defaults to {@literal ""}, see {@link #DEFAULT_SERVICE_NAME}. + * + * @param serviceName + * @return {@code this} + */ + public Builder serviceName(String serviceName) { + this.serviceName = serviceName; + return this; + } + + /** + * service full qualified domain name. Defaults to {@literal ""}, see {@link + * #DEFAULT_SERVICE_FQDN}. + * + * @param serviceFQDN + * @return {@code this} + */ + public Builder serviceFQDN(String serviceFQDN) { + this.serviceFQDN = serviceFQDN; + return this; + } + /** * Create a new instance of {@link ClientOptions}. * @@ -362,7 +407,9 @@ public ClientOptions.Builder mutate() { .falconPushInterval(getFalconPushInterval()) .enableWriteLimit(isWriteLimitEnabled()) .metaQueryTimeout(getMetaQueryTimeout()) - .enableAuth(isEnableAuth()); + .enableAuth(isEnableAuth()) + .serviceName(getServiceName()) + .serviceFQDN(getServiceFQDN()); return builder; } @@ -451,4 +498,22 @@ public Duration getMetaQueryTimeout() { public boolean isEnableAuth() { return enableAuth; } + + /** + * service name. Defaults to {@literal ""}. + * + * @return service name. + */ + public String getServiceName() { + return serviceName; + } + + /** + * service full qualified domain name. Defaults to {@literal ""}. + * + * @return service full qualified domain name. + */ + public String getServiceFQDN() { + return serviceFQDN; + } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java index 5ccefb94..471eb8f7 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java @@ -11,6 +11,7 @@ import com.xiaomi.infra.pegasus.rpc.Cluster; import com.xiaomi.infra.pegasus.rpc.InternalTableOptions; import com.xiaomi.infra.pegasus.rpc.ReplicationException; +import com.xiaomi.infra.pegasus.rpc.interceptor.ReplicaSessionInterceptorManager; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; @@ -34,7 +35,7 @@ public class ClusterManager extends Cluster { private EventLoopGroup tableGroup; // group used for handle table logic private String[] metaList; private MetaSession metaSession; - private boolean enableAuth; + private ReplicaSessionInterceptorManager sessionInterceptorManager; private static final String osName; @@ -56,7 +57,7 @@ public ClusterManager(ClientOptions opts) throws IllegalArgumentException { replicaGroup = getEventLoopGroupInstance(opts.getAsyncWorkers()); metaGroup = getEventLoopGroupInstance(1); tableGroup = getEventLoopGroupInstance(1); - enableAuth = opts.isEnableAuth(); + sessionInterceptorManager = new ReplicaSessionInterceptorManager(opts); metaList = opts.getMetaServers().split(","); // the constructor of meta session is depend on the replicaSessions, @@ -87,7 +88,7 @@ public ReplicaSession getReplicaSession(rpc_address address) { address, replicaGroup, max(operationTimeout, ClientOptions.MIN_SOCK_CONNECT_TIMEOUT), - enableAuth); + sessionInterceptorManager); replicaSessions.put(address, ss); return ss; } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java index 6f82f48c..83e4be48 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java @@ -6,15 +6,27 @@ import com.xiaomi.infra.pegasus.base.error_code; import com.xiaomi.infra.pegasus.operator.negotiation_operator; import com.xiaomi.infra.pegasus.rpc.ReplicationException; +import java.util.HashMap; +import javax.security.auth.Subject; +import javax.security.sasl.Sasl; import org.slf4j.Logger; public class Negotiation { private static final Logger logger = org.slf4j.LoggerFactory.getLogger(Negotiation.class); private negotiation_status status; private ReplicaSession session; + private String serviceName; // used for SASL authentication + private String serviceFqdn; // name used for SASL authentication + private final HashMap props = new HashMap(); + private final Subject subject; - public Negotiation(ReplicaSession session) { + public Negotiation( + ReplicaSession session, Subject subject, String serviceName, String serviceFqdn) { this.session = session; + this.subject = subject; + this.serviceName = serviceName; + this.serviceFqdn = serviceFqdn; + this.props.put(Sasl.QOP, "auth"); } public void start() { diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java index 106f8bd5..775d8150 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java @@ -6,6 +6,7 @@ import com.xiaomi.infra.pegasus.base.error_code.error_types; import com.xiaomi.infra.pegasus.base.rpc_address; import com.xiaomi.infra.pegasus.operator.client_operator; +import com.xiaomi.infra.pegasus.rpc.interceptor.ReplicaSessionInterceptorManager; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.socket.SocketChannel; @@ -39,10 +40,13 @@ public enum ConnState { } public ReplicaSession( - rpc_address address, EventLoopGroup rpcGroup, int socketTimeout, boolean enableAuth) { + rpc_address address, + EventLoopGroup rpcGroup, + int socketTimeout, + ReplicaSessionInterceptorManager interceptorManager) { this.address = address; this.rpcGroup = rpcGroup; - this.enableAuth = enableAuth; + this.interceptorManager = interceptorManager; final ReplicaSession this_ = this; boot = new Bootstrap(); @@ -74,7 +78,7 @@ public ReplicaSession( EventLoopGroup rpcGroup, int socketTimeout, MessageResponseFilter filter) { - this(address, rpcGroup, socketTimeout, false); + this(address, rpcGroup, socketTimeout, (ReplicaSessionInterceptorManager) null); this.filter = filter; } @@ -209,20 +213,13 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception { } } - private void startNegotiation(Channel activeChannel) { - if (enableAuth) { - negotiation = new Negotiation(this); - negotiation.start(); - } else { - markSessionConnected(activeChannel); - } - } - private void markSessionConnected(Channel activeChannel) { VolatileFields newCache = new VolatileFields(); newCache.state = ConnState.CONNECTED; newCache.nettyChannel = activeChannel; + interceptorManager.onConnected(this); + synchronized (pendingSend) { if (fields.state != ConnState.CONNECTING) { // this session may have been closed or connected already @@ -379,7 +376,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info("Channel {} for session {} is active", ctx.channel().toString(), name()); - startNegotiation(ctx.channel()); + markSessionConnected(ctx.channel()); } @Override @@ -432,8 +429,7 @@ static final class VolatileFields { private final rpc_address address; private Bootstrap boot; private EventLoopGroup rpcGroup; - private boolean enableAuth; - private Negotiation negotiation; + private ReplicaSessionInterceptorManager interceptorManager; // Session will be actively closed if all the rpcs across `sessionResetTimeWindowMs` // are timed out, in that case we suspect that the server is unavailable. diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java index 0c348b0f..783045fb 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java @@ -16,7 +16,7 @@ import com.xiaomi.infra.pegasus.rpc.InternalTableOptions; import com.xiaomi.infra.pegasus.rpc.ReplicationException; import com.xiaomi.infra.pegasus.rpc.Table; -import com.xiaomi.infra.pegasus.rpc.interceptor.InterceptorManager; +import com.xiaomi.infra.pegasus.rpc.interceptor.TableInterceptorManager; import io.netty.channel.ChannelFuture; import io.netty.util.concurrent.EventExecutor; import java.util.ArrayList; @@ -51,7 +51,7 @@ static final class TableConfiguration { AtomicBoolean inQuerying_; long lastQueryTime_; int backupRequestDelayMs; - private InterceptorManager interceptorManager; + private TableInterceptorManager interceptorManager; public TableHandler(ClusterManager mgr, String name, InternalTableOptions internalTableOptions) throws ReplicationException { @@ -109,7 +109,7 @@ public TableHandler(ClusterManager mgr, String name, InternalTableOptions intern inQuerying_ = new AtomicBoolean(false); lastQueryTime_ = 0; - this.interceptorManager = new InterceptorManager(internalTableOptions.tableOptions()); + this.interceptorManager = new TableInterceptorManager(internalTableOptions.tableOptions()); } public ReplicaConfiguration getReplicaConfig(int index) { diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/ReplicaSessionInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/ReplicaSessionInterceptor.java new file mode 100644 index 00000000..8cdc74bd --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/ReplicaSessionInterceptor.java @@ -0,0 +1,8 @@ +package com.xiaomi.infra.pegasus.rpc.interceptor; + +import com.xiaomi.infra.pegasus.rpc.async.ReplicaSession; + +public interface ReplicaSessionInterceptor { + // The behavior when a rpc session is connected. + void onConnected(ReplicaSession session); +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/ReplicaSessionInterceptorManager.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/ReplicaSessionInterceptorManager.java new file mode 100644 index 00000000..c1dbce48 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/ReplicaSessionInterceptorManager.java @@ -0,0 +1,24 @@ +package com.xiaomi.infra.pegasus.rpc.interceptor; + +import com.xiaomi.infra.pegasus.client.ClientOptions; +import com.xiaomi.infra.pegasus.rpc.async.ReplicaSession; +import java.util.ArrayList; +import java.util.List; + +public class ReplicaSessionInterceptorManager { + private List interceptors = new ArrayList<>(); + + public ReplicaSessionInterceptorManager(ClientOptions options) { + if (options.isEnableAuth()) { + ReplicaSessionInterceptor securityInterceptor = + new SecurityReplicaSessionInterceptor(options.getServiceName(), options.getServiceFQDN()); + interceptors.add(securityInterceptor); + } + } + + public void onConnected(ReplicaSession session) { + for (ReplicaSessionInterceptor interceptor : interceptors) { + interceptor.onConnected(session); + } + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/SecurityReplicaSessionInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/SecurityReplicaSessionInterceptor.java new file mode 100644 index 00000000..a7a0a4c6 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/SecurityReplicaSessionInterceptor.java @@ -0,0 +1,44 @@ +package com.xiaomi.infra.pegasus.rpc.interceptor; + +import com.sun.security.auth.callback.TextCallbackHandler; +import com.xiaomi.infra.pegasus.rpc.async.Negotiation; +import com.xiaomi.infra.pegasus.rpc.async.ReplicaSession; +import javax.security.auth.Subject; +import javax.security.auth.login.LoginContext; +import javax.security.auth.login.LoginException; +import org.slf4j.Logger; + +public class SecurityReplicaSessionInterceptor implements ReplicaSessionInterceptor { + private static final Logger logger = + org.slf4j.LoggerFactory.getLogger(SecurityReplicaSessionInterceptor.class); + + private String serviceName; + private String serviceFqdn; + private Subject subject; + private LoginContext loginContext; + + public SecurityReplicaSessionInterceptor(String serviceName, String serviceFqdn) + throws IllegalArgumentException { + this.serviceName = serviceName; + this.serviceFqdn = serviceFqdn; + + try { + loginContext = new LoginContext("client", new TextCallbackHandler()); + loginContext.login(); + + subject = loginContext.getSubject(); + if (subject == null) { + throw new LoginException("subject is null"); + } + } catch (LoginException le) { + throw new IllegalArgumentException("login failed", le); + } + + logger.info("login succeed, as user {}", subject.getPrincipals().toString()); + } + + public void onConnected(ReplicaSession session) { + Negotiation negotiation = new Negotiation(session, subject, serviceName, serviceFqdn); + negotiation.start(); + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManager.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/TableInterceptorManager.java similarity index 92% rename from src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManager.java rename to src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/TableInterceptorManager.java index fd404f32..30bffcde 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManager.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/TableInterceptorManager.java @@ -7,11 +7,11 @@ import java.util.ArrayList; import java.util.List; -public class InterceptorManager { +public class TableInterceptorManager { private List interceptors = new ArrayList<>(); - public InterceptorManager(TableOptions options) { + public TableInterceptorManager(TableOptions options) { if (options.enableBackupRequest()) { interceptors.add(new BackupRequestInterceptor(options.backupRequestDelayMs())); } diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/NegotiationTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/NegotiationTest.java index b99de766..8564d171 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/NegotiationTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/NegotiationTest.java @@ -10,7 +10,7 @@ public class NegotiationTest { @Test public void testStart() { - Negotiation negotiation = new Negotiation(null); + Negotiation negotiation = new Negotiation(null, null, "", ""); Negotiation mockNegotiation = Mockito.spy(negotiation); Mockito.doNothing().when(mockNegotiation).send(any(), any()); diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java index 1b4c17f8..7ff682a2 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java @@ -15,6 +15,7 @@ import com.xiaomi.infra.pegasus.operator.rrdb_put_operator; import com.xiaomi.infra.pegasus.rpc.KeyHasher; import com.xiaomi.infra.pegasus.rpc.async.ReplicaSession.ConnState; +import com.xiaomi.infra.pegasus.rpc.interceptor.ReplicaSessionInterceptorManager; import com.xiaomi.infra.pegasus.tools.Toollet; import com.xiaomi.infra.pegasus.tools.Tools; import io.netty.channel.EventLoopGroup; @@ -279,7 +280,8 @@ public void testSessionConnectTimeout() throws InterruptedException { long start = System.currentTimeMillis(); EventLoopGroup rpcGroup = new NioEventLoopGroup(4); - ReplicaSession rs = new ReplicaSession(addr, rpcGroup, 1000, false); + ReplicaSession rs = + new ReplicaSession(addr, rpcGroup, 1000, (ReplicaSessionInterceptorManager) null); rs.tryConnect().awaitUninterruptibly(); long end = System.currentTimeMillis(); Assert.assertEquals((end - start) / 1000, 1); // ensure connect failed within 1sec