Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

feat(security): add ReplicaSession interceptor #135

Merged
merged 21 commits into from
Sep 24, 2020
2 changes: 2 additions & 0 deletions configuration/pegasus.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ enable_perf_counter = false
perf_counter_tags = cluster=onebox,app=unit_test
push_counter_interval_secs = 10
meta_query_timeout = 5000
service_name = ""
service_fqdn = ""
71 changes: 68 additions & 3 deletions src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public class ClientOptions {
public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY = "push_counter_interval_secs";
public static final String PEGASUS_META_QUERY_TIMEOUT_KEY = "meta_query_timeout";
public static final String PEGASUS_ENABLE_AUTH_KEY = "enable_auth";
public static final String PEGASUS_SERVICE_NAME_KEY = "service_name";
public static final String PEGASUS_SERVICE_FQDN_KEY = "service_fqdn";

public static final String DEFAULT_META_SERVERS =
"127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603";
Expand All @@ -57,6 +59,8 @@ public class ClientOptions {
public static final boolean DEFAULT_ENABLE_WRITE_LIMIT = true;
public static final Duration DEFAULT_META_QUERY_TIMEOUT = Duration.ofMillis(5000);
public static final boolean DEFAULT_ENABLE_AUTH = false;
public static final String DEFAULT_SERVICE_NAME = "";
public static final String DEFAULT_SERVICE_FQDN = "";

private final String metaServers;
private final Duration operationTimeout;
Expand All @@ -67,6 +71,8 @@ public class ClientOptions {
private final boolean enableWriteLimit;
private final Duration metaQueryTimeout;
private final boolean enableAuth;
private final String serviceName;
private final String serviceFQDN;

protected ClientOptions(Builder builder) {
this.metaServers = builder.metaServers;
Expand All @@ -78,6 +84,8 @@ protected ClientOptions(Builder builder) {
this.enableWriteLimit = builder.enableWriteLimit;
this.metaQueryTimeout = builder.metaQueryTimeout;
this.enableAuth = builder.enableAuth;
this.serviceName = builder.serviceName;
this.serviceFQDN = builder.serviceFQDN;
}

protected ClientOptions(ClientOptions original) {
Expand All @@ -89,7 +97,9 @@ protected ClientOptions(ClientOptions original) {
this.falconPushInterval = original.getFalconPushInterval();
this.enableWriteLimit = original.isWriteLimitEnabled();
this.metaQueryTimeout = original.getMetaQueryTimeout();
this.enableAuth = original.enableAuth;
this.enableAuth = original.isEnableAuth();
this.serviceName = original.getServiceName();
this.serviceFQDN = original.getServiceFQDN();
}

/**
Expand Down Expand Up @@ -150,6 +160,8 @@ public static ClientOptions create(String configPath) throws PException {
Duration.ofMillis(
config.getLong(PEGASUS_META_QUERY_TIMEOUT_KEY, DEFAULT_META_QUERY_TIMEOUT.toMillis()));
boolean enableAuth = config.getBoolean(PEGASUS_ENABLE_AUTH_KEY, DEFAULT_ENABLE_AUTH);
String serviceName = config.getString(PEGASUS_SERVICE_NAME_KEY, DEFAULT_SERVICE_NAME);
String serviceFQDN = config.getString(PEGASUS_SERVICE_FQDN_KEY, DEFAULT_SERVICE_FQDN);

return ClientOptions.builder()
.metaServers(metaList)
Expand All @@ -160,6 +172,8 @@ public static ClientOptions create(String configPath) throws PException {
.falconPushInterval(pushIntervalSecs)
.metaQueryTimeout(metaQueryTimeout)
.enableAuth(enableAuth)
.serviceName(serviceName)
.serviceFQDN(serviceFQDN)
.build();
}

Expand All @@ -178,7 +192,9 @@ public boolean equals(Object options) {
&& this.falconPushInterval.toMillis() == clientOptions.falconPushInterval.toMillis()
&& this.enableWriteLimit == clientOptions.enableWriteLimit
&& this.metaQueryTimeout.toMillis() == clientOptions.metaQueryTimeout.toMillis()
&& this.enableAuth == clientOptions.enableAuth;
&& this.enableAuth == clientOptions.enableAuth
&& this.serviceName == clientOptions.serviceName
&& this.serviceFQDN == clientOptions.serviceFQDN;
}
return false;
}
Expand Down Expand Up @@ -206,6 +222,10 @@ public String toString() {
+ metaQueryTimeout.toMillis()
+ ", enableAuth="
+ enableAuth
+ ", serviceName="
+ serviceName
+ ", serviceFQDN="
+ serviceFQDN
+ '}';
}

Expand All @@ -220,6 +240,8 @@ public static class Builder {
private boolean enableWriteLimit = DEFAULT_ENABLE_WRITE_LIMIT;
private Duration metaQueryTimeout = DEFAULT_META_QUERY_TIMEOUT;
private boolean enableAuth = DEFAULT_ENABLE_AUTH;
private String serviceName = DEFAULT_SERVICE_NAME;
private String serviceFQDN = DEFAULT_SERVICE_FQDN;

protected Builder() {}

Expand Down Expand Up @@ -334,6 +356,29 @@ public Builder enableAuth(boolean enableAuth) {
return this;
}

/**
* service name. Defaults to {@literal ""}, see {@link #DEFAULT_SERVICE_NAME}.
*
* @param serviceName
* @return {@code this}
*/
public Builder serviceName(String serviceName) {
this.serviceName = serviceName;
return this;
}

/**
* service full qualified domain name. Defaults to {@literal ""}, see {@link
* #DEFAULT_SERVICE_FQDN}.
*
* @param serviceFQDN
* @return {@code this}
*/
public Builder serviceFQDN(String serviceFQDN) {
this.serviceFQDN = serviceFQDN;
return this;
}

/**
* Create a new instance of {@link ClientOptions}.
*
Expand Down Expand Up @@ -362,7 +407,9 @@ public ClientOptions.Builder mutate() {
.falconPushInterval(getFalconPushInterval())
.enableWriteLimit(isWriteLimitEnabled())
.metaQueryTimeout(getMetaQueryTimeout())
.enableAuth(isEnableAuth());
.enableAuth(isEnableAuth())
.serviceName(getServiceName())
.serviceFQDN(getServiceFQDN());
return builder;
}

Expand Down Expand Up @@ -451,4 +498,22 @@ public Duration getMetaQueryTimeout() {
public boolean isEnableAuth() {
return enableAuth;
}

/**
* service name. Defaults to {@literal ""}.
*
* @return service name.
*/
public String getServiceName() {
return serviceName;
}

/**
* service full qualified domain name. Defaults to {@literal ""}.
*
* @return service full qualified domain name.
*/
public String getServiceFQDN() {
return serviceFQDN;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.xiaomi.infra.pegasus.rpc.Cluster;
import com.xiaomi.infra.pegasus.rpc.InternalTableOptions;
import com.xiaomi.infra.pegasus.rpc.ReplicationException;
import com.xiaomi.infra.pegasus.rpc.interceptor.ReplicaSessionInterceptorManager;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
Expand All @@ -34,7 +35,7 @@ public class ClusterManager extends Cluster {
private EventLoopGroup tableGroup; // group used for handle table logic
private String[] metaList;
private MetaSession metaSession;
private boolean enableAuth;
private ReplicaSessionInterceptorManager sessionInterceptorManager;

private static final String osName;

Expand All @@ -56,7 +57,7 @@ public ClusterManager(ClientOptions opts) throws IllegalArgumentException {
replicaGroup = getEventLoopGroupInstance(opts.getAsyncWorkers());
metaGroup = getEventLoopGroupInstance(1);
tableGroup = getEventLoopGroupInstance(1);
enableAuth = opts.isEnableAuth();
sessionInterceptorManager = new ReplicaSessionInterceptorManager(opts);

metaList = opts.getMetaServers().split(",");
// the constructor of meta session is depend on the replicaSessions,
Expand Down Expand Up @@ -87,7 +88,7 @@ public ReplicaSession getReplicaSession(rpc_address address) {
address,
replicaGroup,
max(operationTimeout, ClientOptions.MIN_SOCK_CONNECT_TIMEOUT),
enableAuth);
sessionInterceptorManager);
replicaSessions.put(address, ss);
return ss;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,27 @@
import com.xiaomi.infra.pegasus.base.error_code;
import com.xiaomi.infra.pegasus.operator.negotiation_operator;
import com.xiaomi.infra.pegasus.rpc.ReplicationException;
import java.util.HashMap;
import javax.security.auth.Subject;
import javax.security.sasl.Sasl;
import org.slf4j.Logger;

public class Negotiation {
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(Negotiation.class);
private negotiation_status status;
private ReplicaSession session;
private String serviceName; // used for SASL authentication
private String serviceFqdn; // name used for SASL authentication
private final HashMap<String, Object> props = new HashMap<String, Object>();
private final Subject subject;

public Negotiation(ReplicaSession session) {
public Negotiation(
ReplicaSession session, Subject subject, String serviceName, String serviceFqdn) {
this.session = session;
this.subject = subject;
this.serviceName = serviceName;
this.serviceFqdn = serviceFqdn;
this.props.put(Sasl.QOP, "auth");
}

public void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.xiaomi.infra.pegasus.base.error_code.error_types;
import com.xiaomi.infra.pegasus.base.rpc_address;
import com.xiaomi.infra.pegasus.operator.client_operator;
import com.xiaomi.infra.pegasus.rpc.interceptor.ReplicaSessionInterceptorManager;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
Expand Down Expand Up @@ -39,10 +40,13 @@ public enum ConnState {
}

public ReplicaSession(
rpc_address address, EventLoopGroup rpcGroup, int socketTimeout, boolean enableAuth) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can pass ClientOptions here. Each ReplicaSession creates its ReplicaSessionInterceptorManager separately.

  public ReplicaSession(rpc_address address, EventLoopGroup rpcGroup, int socketTimeout, ClientOptions opts) {
    interceptors = new ReplicaSessionInterceptorManager(opts);
    enableAuth = opts.enableAuth();
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should create a ReplicaSessionInterceptorManager for each ReplicationSession. Each ClusterManager has a ReplicaSessionInterceptorManager is enough

rpc_address address,
EventLoopGroup rpcGroup,
int socketTimeout,
ReplicaSessionInterceptorManager interceptorManager) {
this.address = address;
this.rpcGroup = rpcGroup;
this.enableAuth = enableAuth;
this.interceptorManager = interceptorManager;

final ReplicaSession this_ = this;
boot = new Bootstrap();
Expand Down Expand Up @@ -74,7 +78,7 @@ public ReplicaSession(
EventLoopGroup rpcGroup,
int socketTimeout,
MessageResponseFilter filter) {
this(address, rpcGroup, socketTimeout, false);
this(address, rpcGroup, socketTimeout, (ReplicaSessionInterceptorManager) null);
this.filter = filter;
}

Expand Down Expand Up @@ -209,20 +213,13 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception {
}
}

private void startNegotiation(Channel activeChannel) {
if (enableAuth) {
negotiation = new Negotiation(this);
negotiation.start();
} else {
markSessionConnected(activeChannel);
}
}

private void markSessionConnected(Channel activeChannel) {
VolatileFields newCache = new VolatileFields();
newCache.state = ConnState.CONNECTED;
newCache.nettyChannel = activeChannel;

interceptorManager.onConnected(this);

synchronized (pendingSend) {
if (fields.state != ConnState.CONNECTING) {
// this session may have been closed or connected already
Expand Down Expand Up @@ -379,7 +376,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("Channel {} for session {} is active", ctx.channel().toString(), name());
startNegotiation(ctx.channel());
markSessionConnected(ctx.channel());
}

@Override
Expand Down Expand Up @@ -432,8 +429,7 @@ static final class VolatileFields {
private final rpc_address address;
private Bootstrap boot;
private EventLoopGroup rpcGroup;
private boolean enableAuth;
private Negotiation negotiation;
private ReplicaSessionInterceptorManager interceptorManager;

// Session will be actively closed if all the rpcs across `sessionResetTimeWindowMs`
// are timed out, in that case we suspect that the server is unavailable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.xiaomi.infra.pegasus.rpc.InternalTableOptions;
import com.xiaomi.infra.pegasus.rpc.ReplicationException;
import com.xiaomi.infra.pegasus.rpc.Table;
import com.xiaomi.infra.pegasus.rpc.interceptor.InterceptorManager;
import com.xiaomi.infra.pegasus.rpc.interceptor.TableInterceptorManager;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.EventExecutor;
import java.util.ArrayList;
Expand Down Expand Up @@ -51,7 +51,7 @@ static final class TableConfiguration {
AtomicBoolean inQuerying_;
long lastQueryTime_;
int backupRequestDelayMs;
private InterceptorManager interceptorManager;
private TableInterceptorManager interceptorManager;

public TableHandler(ClusterManager mgr, String name, InternalTableOptions internalTableOptions)
throws ReplicationException {
Expand Down Expand Up @@ -109,7 +109,7 @@ public TableHandler(ClusterManager mgr, String name, InternalTableOptions intern
inQuerying_ = new AtomicBoolean(false);
lastQueryTime_ = 0;

this.interceptorManager = new InterceptorManager(internalTableOptions.tableOptions());
this.interceptorManager = new TableInterceptorManager(internalTableOptions.tableOptions());
}

public ReplicaConfiguration getReplicaConfig(int index) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.xiaomi.infra.pegasus.rpc.interceptor;

import com.xiaomi.infra.pegasus.rpc.async.ReplicaSession;

public interface ReplicaSessionInterceptor {
// The behavior when a rpc session is connected.
void onConnected(ReplicaSession session);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.xiaomi.infra.pegasus.rpc.interceptor;

import com.xiaomi.infra.pegasus.client.ClientOptions;
import com.xiaomi.infra.pegasus.rpc.async.ReplicaSession;
import java.util.ArrayList;
import java.util.List;

public class ReplicaSessionInterceptorManager {
private List<ReplicaSessionInterceptor> interceptors = new ArrayList<>();

public ReplicaSessionInterceptorManager(ClientOptions options) {
if (options.isEnableAuth()) {
ReplicaSessionInterceptor securityInterceptor =
new SecurityReplicaSessionInterceptor(options.getServiceName(), options.getServiceFQDN());
interceptors.add(securityInterceptor);
}
}

public void onConnected(ReplicaSession session) {
for (ReplicaSessionInterceptor interceptor : interceptors) {
interceptor.onConnected(session);
}
}
}
Loading