Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

feat(security): add ReplicaSession interceptor #135

Merged
merged 21 commits into from
Sep 24, 2020
3 changes: 3 additions & 0 deletions configuration/pegasus.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ enable_perf_counter = false
perf_counter_tags = cluster=onebox,app=unit_test
push_counter_interval_secs = 10
meta_query_timeout = 5000
jaas_conf = configuration/pegasus_jaas.conf
service_name = pegasus_tst
service_fqdn = pegasus
10 changes: 10 additions & 0 deletions configuration/pegasus_jaas.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
client {
com.sun.security.auth.module.Krb5LoginModule required

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this config format support comment? Can you write a comment of description on top of each config item?
Furthermore, does it mean that every of our users should have this file in their project?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the pegasus_jaas.conf. So we can add it in later pull request. And I will refactor it in a better way at that time.

debug=true
useTicketCache=true
useKeyTab=true
keyTab=""
renewTGT=true
principal=""
storeKey=true;
};
102 changes: 99 additions & 3 deletions src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ public class ClientOptions {
public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY = "push_counter_interval_secs";
public static final String PEGASUS_META_QUERY_TIMEOUT_KEY = "meta_query_timeout";
public static final String PEGASUS_ENABLE_AUTH_KEY = "enable_auth";
public static final String PEGASUS_SERVICE_NAME_KEY = "service_name";
public static final String PEGASUS_SERVICE_FQDN_KEY = "service_fqdn";
public static final String PEGASUS_JAAS_CONF_KEY = "jaas_conf";

public static final String DEFAULT_META_SERVERS =
"127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603";
Expand All @@ -57,6 +60,9 @@ public class ClientOptions {
public static final boolean DEFAULT_ENABLE_WRITE_LIMIT = true;
public static final Duration DEFAULT_META_QUERY_TIMEOUT = Duration.ofMillis(5000);
public static final boolean DEFAULT_ENABLE_AUTH = false;
public static final String DEFAULT_SERVICE_NAME = "";
public static final String DEFAULT_SERVICE_FQDN = "";
public static final String DEFAULT_JAAS_CONF = "configuration/pegasus_jaas.conf";

private final String metaServers;
private final Duration operationTimeout;
Expand All @@ -67,6 +73,9 @@ public class ClientOptions {
private final boolean enableWriteLimit;
private final Duration metaQueryTimeout;
private final boolean enableAuth;
private final String serviceName;
private final String serviceFQDN;
private final String jaasConf;

protected ClientOptions(Builder builder) {
this.metaServers = builder.metaServers;
Expand All @@ -78,6 +87,9 @@ protected ClientOptions(Builder builder) {
this.enableWriteLimit = builder.enableWriteLimit;
this.metaQueryTimeout = builder.metaQueryTimeout;
this.enableAuth = builder.enableAuth;
this.serviceName = builder.serviceName;
this.serviceFQDN = builder.serviceFQDN;
this.jaasConf = builder.jaasConf;
}

protected ClientOptions(ClientOptions original) {
Expand All @@ -89,7 +101,10 @@ protected ClientOptions(ClientOptions original) {
this.falconPushInterval = original.getFalconPushInterval();
this.enableWriteLimit = original.isWriteLimitEnabled();
this.metaQueryTimeout = original.getMetaQueryTimeout();
this.enableAuth = original.enableAuth;
this.enableAuth = original.isEnableAuth();
this.serviceName = original.getServiceName();
this.serviceFQDN = original.getServiceFQDN();
this.jaasConf = original.getJaasConf();
}

/**
Expand Down Expand Up @@ -150,6 +165,9 @@ public static ClientOptions create(String configPath) throws PException {
Duration.ofMillis(
config.getLong(PEGASUS_META_QUERY_TIMEOUT_KEY, DEFAULT_META_QUERY_TIMEOUT.toMillis()));
boolean enableAuth = config.getBoolean(PEGASUS_ENABLE_AUTH_KEY, DEFAULT_ENABLE_AUTH);
String serviceName = config.getString(PEGASUS_SERVICE_NAME_KEY, DEFAULT_SERVICE_NAME);
String serviceFQDN = config.getString(PEGASUS_SERVICE_FQDN_KEY, DEFAULT_SERVICE_FQDN);
String jaasConf = config.getString(PEGASUS_JAAS_CONF_KEY, DEFAULT_JAAS_CONF);

return ClientOptions.builder()
.metaServers(metaList)
Expand All @@ -160,6 +178,9 @@ public static ClientOptions create(String configPath) throws PException {
.falconPushInterval(pushIntervalSecs)
.metaQueryTimeout(metaQueryTimeout)
.enableAuth(enableAuth)
.serviceName(serviceName)
.serviceFQDN(serviceFQDN)
.jaasConf(jaasConf)
.build();
}

Expand All @@ -178,7 +199,10 @@ public boolean equals(Object options) {
&& this.falconPushInterval.toMillis() == clientOptions.falconPushInterval.toMillis()
&& this.enableWriteLimit == clientOptions.enableWriteLimit
&& this.metaQueryTimeout.toMillis() == clientOptions.metaQueryTimeout.toMillis()
&& this.enableAuth == clientOptions.enableAuth;
&& this.enableAuth == clientOptions.enableAuth
&& this.serviceName == clientOptions.serviceName
&& this.serviceFQDN == clientOptions.serviceFQDN
&& this.jaasConf == clientOptions.jaasConf;
}
return false;
}
Expand Down Expand Up @@ -206,6 +230,12 @@ public String toString() {
+ metaQueryTimeout.toMillis()
+ ", enableAuth="
+ enableAuth
+ ", serviceName="
+ serviceName
+ ", serviceFQDN="
+ serviceFQDN
+ ", jaasConf="
+ jaasConf
+ '}';
}

Expand All @@ -220,6 +250,9 @@ public static class Builder {
private boolean enableWriteLimit = DEFAULT_ENABLE_WRITE_LIMIT;
private Duration metaQueryTimeout = DEFAULT_META_QUERY_TIMEOUT;
private boolean enableAuth = DEFAULT_ENABLE_AUTH;
private String serviceName = DEFAULT_SERVICE_NAME;
private String serviceFQDN = DEFAULT_SERVICE_FQDN;
private String jaasConf = DEFAULT_JAAS_CONF;

protected Builder() {}

Expand Down Expand Up @@ -334,6 +367,39 @@ public Builder enableAuth(boolean enableAuth) {
return this;
}

/**
* service name. Defaults to {@literal ""}, see {@link #DEFAULT_SERVICE_NAME}.
*
* @param serviceName
* @return {@code this}
*/
public Builder serviceName(String serviceName) {
this.serviceName = serviceName;
return this;
}

/**
* service FQDN. Defaults to {@literal ""}, see {@link #DEFAULT_SERVICE_FQDN}.
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
*
* @param serviceFQDN
* @return {@code this}
*/
public Builder serviceFQDN(String serviceFQDN) {
this.serviceFQDN = serviceFQDN;
return this;
}

/**
* jaas configuration file. Defaults to {@literal ""}, see {@link #DEFAULT_JAAS_CONF}.
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
*
* @param jaasConf
* @return {@code this}
*/
public Builder jaasConf(String jaasConf) {
this.jaasConf = jaasConf;
return this;
}

/**
* Create a new instance of {@link ClientOptions}.
*
Expand Down Expand Up @@ -362,7 +428,10 @@ public ClientOptions.Builder mutate() {
.falconPushInterval(getFalconPushInterval())
.enableWriteLimit(isWriteLimitEnabled())
.metaQueryTimeout(getMetaQueryTimeout())
.enableAuth(isEnableAuth());
.enableAuth(isEnableAuth())
.serviceName(getServiceName())
.serviceFQDN(getServiceFQDN())
.jaasConf(getJaasConf());
return builder;
}

Expand Down Expand Up @@ -451,4 +520,31 @@ public Duration getMetaQueryTimeout() {
public boolean isEnableAuth() {
return enableAuth;
}

/**
* service name. Defaults to {@literal ""}.
*
* @return service name.
*/
public String getServiceName() {
return serviceName;
}

/**
* service fqdn. Defaults to {@literal ""}.
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
*
* @return service fqdn.
*/
public String getServiceFQDN() {
return serviceFQDN;
}

/**
* jaas configuration file. Defaults to {@literal "configuration/pegasus_jaas.conf"}.
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
*
* @return jaas configuration file.
*/
public String getJaasConf() {
return jaasConf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public class ClusterManager extends Cluster {
private EventLoopGroup tableGroup; // group used for handle table logic
private String[] metaList;
private MetaSession metaSession;
private boolean enableAuth;

private static final String osName;

Expand All @@ -56,7 +55,12 @@ public ClusterManager(ClientOptions opts) throws IllegalArgumentException {
replicaGroup = getEventLoopGroupInstance(opts.getAsyncWorkers());
metaGroup = getEventLoopGroupInstance(1);
tableGroup = getEventLoopGroupInstance(1);
enableAuth = opts.isEnableAuth();
if (opts.isEnableAuth()) {
ReplicaSessionHook hook =
new SecurityReplicaSessionHook(
opts.getJaasConf(), opts.getServiceName(), opts.getServiceFQDN());
ReplicaSessionHookManager.instance().addHook(hook);
}

metaList = opts.getMetaServers().split(",");
// the constructor of meta session is depend on the replicaSessions,
Expand Down Expand Up @@ -84,10 +88,7 @@ public ReplicaSession getReplicaSession(rpc_address address) {
if (ss != null) return ss;
ss =
new ReplicaSession(
address,
replicaGroup,
max(operationTimeout, ClientOptions.MIN_SOCK_CONNECT_TIMEOUT),
enableAuth);
address, replicaGroup, max(operationTimeout, ClientOptions.MIN_SOCK_CONNECT_TIMEOUT));
replicaSessions.put(address, ss);
return ss;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,32 @@
import com.xiaomi.infra.pegasus.base.error_code;
import com.xiaomi.infra.pegasus.operator.negotiation_operator;
import com.xiaomi.infra.pegasus.rpc.ReplicationException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import javax.security.auth.Subject;
import javax.security.sasl.Sasl;
import org.slf4j.Logger;

public class Negotiation {
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(Negotiation.class);
private negotiation_status status;
private ReplicaSession session;
private static final List<String> expectedMechanisms =
new ArrayList<String>(Collections.singletonList("GSSAPI"));
private String serviceName; // used for SASL authentication
private String serviceFqdn; // name used for SASL authentication
private final HashMap<String, Object> props = new HashMap<String, Object>();
private final Subject subject;

public Negotiation(ReplicaSession session) {
public Negotiation(
ReplicaSession session, Subject subject, String serviceName, String serviceFqdn) {
this.session = session;
this.subject = subject;
this.serviceName = serviceName;
this.serviceFqdn = serviceFqdn;
this.props.put(Sasl.QOP, "auth");
}

public void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,9 @@ public enum ConnState {
DISCONNECTED
}

public ReplicaSession(
rpc_address address, EventLoopGroup rpcGroup, int socketTimeout, boolean enableAuth) {
public ReplicaSession(rpc_address address, EventLoopGroup rpcGroup, int socketTimeout) {
this.address = address;
this.rpcGroup = rpcGroup;
this.enableAuth = enableAuth;

final ReplicaSession this_ = this;
boot = new Bootstrap();
Expand Down Expand Up @@ -74,7 +72,7 @@ public ReplicaSession(
EventLoopGroup rpcGroup,
int socketTimeout,
MessageResponseFilter filter) {
this(address, rpcGroup, socketTimeout, false);
this(address, rpcGroup, socketTimeout);
this.filter = filter;
}

Expand Down Expand Up @@ -209,20 +207,13 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception {
}
}

private void startNegotiation(Channel activeChannel) {
if (enableAuth) {
negotiation = new Negotiation(this);
negotiation.start();
} else {
markSessionConnected(activeChannel);
}
}

private void markSessionConnected(Channel activeChannel) {
VolatileFields newCache = new VolatileFields();
newCache.state = ConnState.CONNECTED;
newCache.nettyChannel = activeChannel;

ReplicaSessionHookManager.instance().onConnected(this);

synchronized (pendingSend) {
if (fields.state != ConnState.CONNECTING) {
// this session may have been closed or connected already
Expand Down Expand Up @@ -379,7 +370,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("Channel {} for session {} is active", ctx.channel().toString(), name());
startNegotiation(ctx.channel());
markSessionConnected(ctx.channel());
}

@Override
Expand Down Expand Up @@ -432,8 +423,6 @@ static final class VolatileFields {
private final rpc_address address;
private Bootstrap boot;
private EventLoopGroup rpcGroup;
private boolean enableAuth;
private Negotiation negotiation;

// Session will be actively closed if all the rpcs across `sessionResetTimeWindowMs`
// are timed out, in that case we suspect that the server is unavailable.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.xiaomi.infra.pegasus.rpc.async;

public interface ReplicaSessionHook {
// The behavior when a rpc session is connected.
void onConnected(ReplicaSession session);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.xiaomi.infra.pegasus.rpc.async;

import java.util.ArrayList;
import java.util.List;

public class ReplicaSessionHookManager {
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
private List<ReplicaSessionHook> hooks = new ArrayList<>();
private static ReplicaSessionHookManager instance = new ReplicaSessionHookManager();

public static ReplicaSessionHookManager instance() {
return instance;
}

void addHook(ReplicaSessionHook hook) {
hooks.add(hook);
}

public void onConnected(ReplicaSession session) {
for (ReplicaSessionHook hook : hooks) {
hook.onConnected(session);
}
}
}
Loading