Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

Commit

Permalink
feat(security): add ReplicaSession interceptor (#135)
Browse files Browse the repository at this point in the history
  • Loading branch information
levy5307 authored Sep 24, 2020
1 parent a79ea8c commit 3615cfd
Show file tree
Hide file tree
Showing 12 changed files with 183 additions and 29 deletions.
2 changes: 2 additions & 0 deletions configuration/pegasus.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ enable_perf_counter = false
perf_counter_tags = cluster=onebox,app=unit_test
push_counter_interval_secs = 10
meta_query_timeout = 5000
service_name = ""
service_fqdn = ""
71 changes: 68 additions & 3 deletions src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public class ClientOptions {
public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY = "push_counter_interval_secs";
public static final String PEGASUS_META_QUERY_TIMEOUT_KEY = "meta_query_timeout";
public static final String PEGASUS_ENABLE_AUTH_KEY = "enable_auth";
public static final String PEGASUS_SERVICE_NAME_KEY = "service_name";
public static final String PEGASUS_SERVICE_FQDN_KEY = "service_fqdn";

public static final String DEFAULT_META_SERVERS =
"127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603";
Expand All @@ -57,6 +59,8 @@ public class ClientOptions {
public static final boolean DEFAULT_ENABLE_WRITE_LIMIT = true;
public static final Duration DEFAULT_META_QUERY_TIMEOUT = Duration.ofMillis(5000);
public static final boolean DEFAULT_ENABLE_AUTH = false;
public static final String DEFAULT_SERVICE_NAME = "";
public static final String DEFAULT_SERVICE_FQDN = "";

private final String metaServers;
private final Duration operationTimeout;
Expand All @@ -67,6 +71,8 @@ public class ClientOptions {
private final boolean enableWriteLimit;
private final Duration metaQueryTimeout;
private final boolean enableAuth;
private final String serviceName;
private final String serviceFQDN;

protected ClientOptions(Builder builder) {
this.metaServers = builder.metaServers;
Expand All @@ -78,6 +84,8 @@ protected ClientOptions(Builder builder) {
this.enableWriteLimit = builder.enableWriteLimit;
this.metaQueryTimeout = builder.metaQueryTimeout;
this.enableAuth = builder.enableAuth;
this.serviceName = builder.serviceName;
this.serviceFQDN = builder.serviceFQDN;
}

protected ClientOptions(ClientOptions original) {
Expand All @@ -89,7 +97,9 @@ protected ClientOptions(ClientOptions original) {
this.falconPushInterval = original.getFalconPushInterval();
this.enableWriteLimit = original.isWriteLimitEnabled();
this.metaQueryTimeout = original.getMetaQueryTimeout();
this.enableAuth = original.enableAuth;
this.enableAuth = original.isEnableAuth();
this.serviceName = original.getServiceName();
this.serviceFQDN = original.getServiceFQDN();
}

/**
Expand Down Expand Up @@ -150,6 +160,8 @@ public static ClientOptions create(String configPath) throws PException {
Duration.ofMillis(
config.getLong(PEGASUS_META_QUERY_TIMEOUT_KEY, DEFAULT_META_QUERY_TIMEOUT.toMillis()));
boolean enableAuth = config.getBoolean(PEGASUS_ENABLE_AUTH_KEY, DEFAULT_ENABLE_AUTH);
String serviceName = config.getString(PEGASUS_SERVICE_NAME_KEY, DEFAULT_SERVICE_NAME);
String serviceFQDN = config.getString(PEGASUS_SERVICE_FQDN_KEY, DEFAULT_SERVICE_FQDN);

return ClientOptions.builder()
.metaServers(metaList)
Expand All @@ -160,6 +172,8 @@ public static ClientOptions create(String configPath) throws PException {
.falconPushInterval(pushIntervalSecs)
.metaQueryTimeout(metaQueryTimeout)
.enableAuth(enableAuth)
.serviceName(serviceName)
.serviceFQDN(serviceFQDN)
.build();
}

Expand All @@ -178,7 +192,9 @@ public boolean equals(Object options) {
&& this.falconPushInterval.toMillis() == clientOptions.falconPushInterval.toMillis()
&& this.enableWriteLimit == clientOptions.enableWriteLimit
&& this.metaQueryTimeout.toMillis() == clientOptions.metaQueryTimeout.toMillis()
&& this.enableAuth == clientOptions.enableAuth;
&& this.enableAuth == clientOptions.enableAuth
&& this.serviceName == clientOptions.serviceName
&& this.serviceFQDN == clientOptions.serviceFQDN;
}
return false;
}
Expand Down Expand Up @@ -206,6 +222,10 @@ public String toString() {
+ metaQueryTimeout.toMillis()
+ ", enableAuth="
+ enableAuth
+ ", serviceName="
+ serviceName
+ ", serviceFQDN="
+ serviceFQDN
+ '}';
}

Expand All @@ -220,6 +240,8 @@ public static class Builder {
private boolean enableWriteLimit = DEFAULT_ENABLE_WRITE_LIMIT;
private Duration metaQueryTimeout = DEFAULT_META_QUERY_TIMEOUT;
private boolean enableAuth = DEFAULT_ENABLE_AUTH;
private String serviceName = DEFAULT_SERVICE_NAME;
private String serviceFQDN = DEFAULT_SERVICE_FQDN;

protected Builder() {}

Expand Down Expand Up @@ -334,6 +356,29 @@ public Builder enableAuth(boolean enableAuth) {
return this;
}

/**
* service name. Defaults to {@literal ""}, see {@link #DEFAULT_SERVICE_NAME}.
*
* @param serviceName
* @return {@code this}
*/
public Builder serviceName(String serviceName) {
this.serviceName = serviceName;
return this;
}

/**
* service full qualified domain name. Defaults to {@literal ""}, see {@link
* #DEFAULT_SERVICE_FQDN}.
*
* @param serviceFQDN
* @return {@code this}
*/
public Builder serviceFQDN(String serviceFQDN) {
this.serviceFQDN = serviceFQDN;
return this;
}

/**
* Create a new instance of {@link ClientOptions}.
*
Expand Down Expand Up @@ -362,7 +407,9 @@ public ClientOptions.Builder mutate() {
.falconPushInterval(getFalconPushInterval())
.enableWriteLimit(isWriteLimitEnabled())
.metaQueryTimeout(getMetaQueryTimeout())
.enableAuth(isEnableAuth());
.enableAuth(isEnableAuth())
.serviceName(getServiceName())
.serviceFQDN(getServiceFQDN());
return builder;
}

Expand Down Expand Up @@ -451,4 +498,22 @@ public Duration getMetaQueryTimeout() {
public boolean isEnableAuth() {
return enableAuth;
}

/**
* service name. Defaults to {@literal ""}.
*
* @return service name.
*/
public String getServiceName() {
return serviceName;
}

/**
* service full qualified domain name. Defaults to {@literal ""}.
*
* @return service full qualified domain name.
*/
public String getServiceFQDN() {
return serviceFQDN;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.xiaomi.infra.pegasus.rpc.Cluster;
import com.xiaomi.infra.pegasus.rpc.InternalTableOptions;
import com.xiaomi.infra.pegasus.rpc.ReplicationException;
import com.xiaomi.infra.pegasus.rpc.interceptor.ReplicaSessionInterceptorManager;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
Expand All @@ -34,7 +35,7 @@ public class ClusterManager extends Cluster {
private EventLoopGroup tableGroup; // group used for handle table logic
private String[] metaList;
private MetaSession metaSession;
private boolean enableAuth;
private ReplicaSessionInterceptorManager sessionInterceptorManager;

private static final String osName;

Expand All @@ -56,7 +57,7 @@ public ClusterManager(ClientOptions opts) throws IllegalArgumentException {
replicaGroup = getEventLoopGroupInstance(opts.getAsyncWorkers());
metaGroup = getEventLoopGroupInstance(1);
tableGroup = getEventLoopGroupInstance(1);
enableAuth = opts.isEnableAuth();
sessionInterceptorManager = new ReplicaSessionInterceptorManager(opts);

metaList = opts.getMetaServers().split(",");
// the constructor of meta session is depend on the replicaSessions,
Expand Down Expand Up @@ -87,7 +88,7 @@ public ReplicaSession getReplicaSession(rpc_address address) {
address,
replicaGroup,
max(operationTimeout, ClientOptions.MIN_SOCK_CONNECT_TIMEOUT),
enableAuth);
sessionInterceptorManager);
replicaSessions.put(address, ss);
return ss;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,27 @@
import com.xiaomi.infra.pegasus.base.error_code;
import com.xiaomi.infra.pegasus.operator.negotiation_operator;
import com.xiaomi.infra.pegasus.rpc.ReplicationException;
import java.util.HashMap;
import javax.security.auth.Subject;
import javax.security.sasl.Sasl;
import org.slf4j.Logger;

public class Negotiation {
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(Negotiation.class);
private negotiation_status status;
private ReplicaSession session;
private String serviceName; // used for SASL authentication
private String serviceFqdn; // name used for SASL authentication
private final HashMap<String, Object> props = new HashMap<String, Object>();
private final Subject subject;

public Negotiation(ReplicaSession session) {
public Negotiation(
ReplicaSession session, Subject subject, String serviceName, String serviceFqdn) {
this.session = session;
this.subject = subject;
this.serviceName = serviceName;
this.serviceFqdn = serviceFqdn;
this.props.put(Sasl.QOP, "auth");
}

public void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.xiaomi.infra.pegasus.base.error_code.error_types;
import com.xiaomi.infra.pegasus.base.rpc_address;
import com.xiaomi.infra.pegasus.operator.client_operator;
import com.xiaomi.infra.pegasus.rpc.interceptor.ReplicaSessionInterceptorManager;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
Expand Down Expand Up @@ -39,10 +40,13 @@ public enum ConnState {
}

public ReplicaSession(
rpc_address address, EventLoopGroup rpcGroup, int socketTimeout, boolean enableAuth) {
rpc_address address,
EventLoopGroup rpcGroup,
int socketTimeout,
ReplicaSessionInterceptorManager interceptorManager) {
this.address = address;
this.rpcGroup = rpcGroup;
this.enableAuth = enableAuth;
this.interceptorManager = interceptorManager;

final ReplicaSession this_ = this;
boot = new Bootstrap();
Expand Down Expand Up @@ -74,7 +78,7 @@ public ReplicaSession(
EventLoopGroup rpcGroup,
int socketTimeout,
MessageResponseFilter filter) {
this(address, rpcGroup, socketTimeout, false);
this(address, rpcGroup, socketTimeout, (ReplicaSessionInterceptorManager) null);
this.filter = filter;
}

Expand Down Expand Up @@ -209,20 +213,13 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception {
}
}

private void startNegotiation(Channel activeChannel) {
if (enableAuth) {
negotiation = new Negotiation(this);
negotiation.start();
} else {
markSessionConnected(activeChannel);
}
}

private void markSessionConnected(Channel activeChannel) {
VolatileFields newCache = new VolatileFields();
newCache.state = ConnState.CONNECTED;
newCache.nettyChannel = activeChannel;

interceptorManager.onConnected(this);

synchronized (pendingSend) {
if (fields.state != ConnState.CONNECTING) {
// this session may have been closed or connected already
Expand Down Expand Up @@ -379,7 +376,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("Channel {} for session {} is active", ctx.channel().toString(), name());
startNegotiation(ctx.channel());
markSessionConnected(ctx.channel());
}

@Override
Expand Down Expand Up @@ -432,8 +429,7 @@ static final class VolatileFields {
private final rpc_address address;
private Bootstrap boot;
private EventLoopGroup rpcGroup;
private boolean enableAuth;
private Negotiation negotiation;
private ReplicaSessionInterceptorManager interceptorManager;

// Session will be actively closed if all the rpcs across `sessionResetTimeWindowMs`
// are timed out, in that case we suspect that the server is unavailable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.xiaomi.infra.pegasus.rpc.InternalTableOptions;
import com.xiaomi.infra.pegasus.rpc.ReplicationException;
import com.xiaomi.infra.pegasus.rpc.Table;
import com.xiaomi.infra.pegasus.rpc.interceptor.InterceptorManager;
import com.xiaomi.infra.pegasus.rpc.interceptor.TableInterceptorManager;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.EventExecutor;
import java.util.ArrayList;
Expand Down Expand Up @@ -51,7 +51,7 @@ static final class TableConfiguration {
AtomicBoolean inQuerying_;
long lastQueryTime_;
int backupRequestDelayMs;
private InterceptorManager interceptorManager;
private TableInterceptorManager interceptorManager;

public TableHandler(ClusterManager mgr, String name, InternalTableOptions internalTableOptions)
throws ReplicationException {
Expand Down Expand Up @@ -109,7 +109,7 @@ public TableHandler(ClusterManager mgr, String name, InternalTableOptions intern
inQuerying_ = new AtomicBoolean(false);
lastQueryTime_ = 0;

this.interceptorManager = new InterceptorManager(internalTableOptions.tableOptions());
this.interceptorManager = new TableInterceptorManager(internalTableOptions.tableOptions());
}

public ReplicaConfiguration getReplicaConfig(int index) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.xiaomi.infra.pegasus.rpc.interceptor;

import com.xiaomi.infra.pegasus.rpc.async.ReplicaSession;

public interface ReplicaSessionInterceptor {
// The behavior when a rpc session is connected.
void onConnected(ReplicaSession session);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.xiaomi.infra.pegasus.rpc.interceptor;

import com.xiaomi.infra.pegasus.client.ClientOptions;
import com.xiaomi.infra.pegasus.rpc.async.ReplicaSession;
import java.util.ArrayList;
import java.util.List;

public class ReplicaSessionInterceptorManager {
private List<ReplicaSessionInterceptor> interceptors = new ArrayList<>();

public ReplicaSessionInterceptorManager(ClientOptions options) {
if (options.isEnableAuth()) {
ReplicaSessionInterceptor securityInterceptor =
new SecurityReplicaSessionInterceptor(options.getServiceName(), options.getServiceFQDN());
interceptors.add(securityInterceptor);
}
}

public void onConnected(ReplicaSession session) {
for (ReplicaSessionInterceptor interceptor : interceptors) {
interceptor.onConnected(session);
}
}
}
Loading

0 comments on commit 3615cfd

Please sign in to comment.