Skip to content
This repository was archived by the owner on Mar 31, 2023. It is now read-only.

Commit

Permalink
Improve caching performance for port/ip/vpc manager (#690)
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidLiu506 authored Oct 22, 2021
1 parent 842b9da commit d040632
Show file tree
Hide file tree
Showing 33 changed files with 223 additions and 214 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public class CacheFactory {

@Autowired
private ICacheFactory iCacheFactory;

public final String KEY_DELIMITER = "/";

public <K, V> ICache<K, V> getCache(Class<V> v) {
return iCacheFactory.getCache(v);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,12 @@ public ICacheFactory igniteClientFactoryInstance(){
private IgniteClient getThinIgniteClient() {
ClientConfiguration cfg = new ClientConfiguration();

cfg.setAddresses(host + ":" + port);
/***
* With partition awareness in place, the thin client can directly route queries and operations to the primary nodes that own the data required for the queries.
* This eliminates the bottleneck, allowing the application to scale more easily.
*/
cfg.setAddresses(host + ":" + port)
.setPartitionAwarenessEnabled(true);

if (keyStorePath != null && keyStorePassword != null &&
trustStorePath != null && trustStorePassword != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ free of charge, to any person obtaining a copy of this software and associated d

public class ThreadPoolExecutorConfig {
//Core thread pool size
public static int corePoolSize = 32;
public static int corePoolSize = 64;

//Maximum thread pool size
public static int maximumPoolSize = 128;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,16 @@ free of charge, to any person obtaining a copy of this software and associated d

import java.util.Set;

public class SubnetPortIds {
public class PortIdSubnet {
@JsonProperty("subnet_id")
private String subnetId;

@JsonProperty("port_ids")
private Set<String> portIds;

public SubnetPortIds() {
public PortIdSubnet() {

}

public SubnetPortIds(String subnetId, Set<String> portIds) {
public PortIdSubnet(String subnetId) {
this.subnetId = subnetId;
this.portIds = portIds;
}

public String getSubnetId() {
Expand All @@ -42,12 +38,4 @@ public String getSubnetId() {
public void setSubnetId(String subnetId) {
this.subnetId = subnetId;
}

public Set<String> getPortIds() {
return portIds;
}

public void setPortIds(Set<String> portIds) {
this.portIds = portIds;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ free of charge, to any person obtaining a copy of this software and associated d
*/
package com.futurewei.alcor.portmanager.processor;

import com.futurewei.alcor.common.db.CacheException;
import com.futurewei.alcor.portmanager.entity.SubnetRoute;
import com.futurewei.alcor.portmanager.exception.AllocateIpAddrException;
import com.futurewei.alcor.portmanager.exception.UpdatePortIpException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ free of charge, to any person obtaining a copy of this software and associated d
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -38,33 +37,36 @@ public class NeighborRepository {
private static final String NEIGHBOR_CACHE_NAME_PREFIX = "neighborCache-";

private ICache<String, PortNeighbors> neighborCache;
private ICache<String, String> vpcIdProjectId;
private CacheFactory cacheFactory;

public NeighborRepository(CacheFactory cacheFactory) {
this.cacheFactory = cacheFactory;
this.vpcIdProjectId = cacheFactory.getCache(String.class);
this.neighborCache= cacheFactory.getCache(PortNeighbors.class);
}

private String getNeighborCacheName(String suffix) {
return NEIGHBOR_CACHE_NAME_PREFIX + suffix;
}

public void createNeighbors(Map<String, List<NeighborInfo>> neighbors) throws Exception {
public void createNeighbors(String projectId, Map<String, List<NeighborInfo>> neighbors) throws Exception {
if (neighbors != null) {
for (Map.Entry<String, List<NeighborInfo>> entry : neighbors.entrySet()) {
Map<String, NeighborInfo> neighborMap = entry.getValue()
.stream()
.collect(Collectors.toMap(NeighborInfo::getPortIp, Function.identity()));

CacheConfiguration cfg = CommonUtil.getCacheConfiguration(getNeighborCacheName(entry.getKey()));
.collect(Collectors.toMap(neighbor -> neighbor.getVpcId() + cacheFactory.KEY_DELIMITER + neighbor.getPortIp(), Function.identity()));
CacheConfiguration cfg = CommonUtil.getCacheConfiguration(getNeighborCacheName(projectId));
ICache<String, NeighborInfo> neighborCache = cacheFactory.getCache(NeighborInfo.class, cfg);
neighborCache.putAll(neighborMap);

}
}
}

public void updateNeighbors(PortEntity oldPortEntity, List<NeighborInfo> newNeighbors) throws Exception {
CacheConfiguration cfg = CommonUtil.getCacheConfiguration(getNeighborCacheName(oldPortEntity.getVpcId()));
String vpcId = oldPortEntity.getVpcId();
CacheConfiguration cfg = CommonUtil.getCacheConfiguration(getNeighborCacheName(oldPortEntity.getProjectId()));
ICache<String, NeighborInfo> neighborCache = this.cacheFactory.getCache(
NeighborInfo.class, cfg);

Expand All @@ -75,43 +77,56 @@ public void updateNeighbors(PortEntity oldPortEntity, List<NeighborInfo> newNeig
.collect(Collectors.toList());

for (String oldPortIp: oldPortIps) {
neighborCache.remove(oldPortIp);
neighborCache.remove(vpcId + cacheFactory.KEY_DELIMITER + oldPortIp);
}
}

//Add new neighborInfos
if (newNeighbors != null) {
Map<String, NeighborInfo> neighborMap = newNeighbors
.stream()
.collect(Collectors.toMap(NeighborInfo::getPortIp, Function.identity()));
.collect(Collectors.toMap(neighbor -> neighbor.getVpcId() + cacheFactory.KEY_DELIMITER + neighbor.getPortIp(), Function.identity()));
neighborCache.putAll(neighborMap);
}
}

public void deleteNeighbors(PortEntity portEntity) throws Exception {
if (portEntity.getFixedIps() != null) {
String vpcId = portEntity.getVpcId();
List<String> oldPortIps = portEntity.getFixedIps().stream()
.map(PortEntity.FixedIp::getIpAddress)
.collect(Collectors.toList());

CacheConfiguration cfg = new CacheConfiguration();
cfg.setName(getNeighborCacheName(portEntity.getVpcId()));
cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
CacheConfiguration cfg = CommonUtil.getCacheConfiguration(getNeighborCacheName(portEntity.getProjectId()));
ICache<String, NeighborInfo> neighborCache = this.cacheFactory.getCache(
NeighborInfo.class, cfg);

//Delete old neighborInfos
for (String oldPortIp: oldPortIps) {
neighborCache.remove(oldPortIp);
neighborCache.remove(vpcId + cacheFactory.KEY_DELIMITER + oldPortIp);
}
}
}

@DurationStatistics
public void addProject(String projectId, String vpcId) throws CacheException {
vpcIdProjectId.put(vpcId, projectId);
}

@DurationStatistics
public Map<String, NeighborInfo> getNeighbors(String vpcId) throws CacheException {
CacheConfiguration cfg = CommonUtil.getCacheConfiguration(getNeighborCacheName(vpcId));
if (!vpcIdProjectId.containsKey(vpcId)) {
return new HashMap<>();
}
String projectId = vpcIdProjectId.get(vpcId);
CacheConfiguration cfg = CommonUtil.getCacheConfiguration(getNeighborCacheName(projectId));
ICache<String, NeighborInfo> neighborCache = this.cacheFactory.getCache(
NeighborInfo.class, cfg);
return neighborCache.getAll();
Map<String, Object[]> queryParams = new HashMap<>();
Object[] value = new Object[1];
value[0] = vpcId;
queryParams.put("vpcId", value);
Collection<NeighborInfo> neiborinfo = neighborCache.getAll(queryParams).values();
return neiborinfo.stream().collect(Collectors.toMap(neighbor -> neighbor.getPortIp(), Function.identity()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ free of charge, to any person obtaining a copy of this software and associated d

import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -262,12 +263,19 @@ public PortNeighbors getPortNeighbors(Object arg) throws CacheException {

@DurationStatistics
public synchronized void createPortBulk(List<PortEntity> portEntities, Map<String, List<NeighborInfo>> neighbors) throws Exception {
portEntities.forEach(item -> {
try {
neighborRepository.addProject(item.getProjectId(), item.getVpcId());
} catch (CacheException e) {
e.printStackTrace();
}
});
try (Transaction tx = portCache.getTransaction().start()) {
Map<String, PortEntity> portEntityMap = portEntities
.stream()
.collect(Collectors.toMap(PortEntity::getId, Function.identity()));
portCache.putAll(portEntityMap);
neighborRepository.createNeighbors(neighbors);
neighborRepository.createNeighbors(portEntities.get(0).getProjectId(), neighbors);
subnetPortsRepository.addSubnetPortIds(portEntities);
tx.commit();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ free of charge, to any person obtaining a copy of this software and associated d
import com.futurewei.alcor.common.db.CacheFactory;
import com.futurewei.alcor.common.db.ICache;
import com.futurewei.alcor.common.stats.DurationStatistics;
import com.futurewei.alcor.portmanager.entity.SubnetPortIds;
import com.futurewei.alcor.portmanager.entity.PortIdSubnet;
import com.futurewei.alcor.portmanager.exception.FixedIpsInvalid;
import com.futurewei.alcor.web.entity.port.PortEntity;
import org.slf4j.Logger;
Expand All @@ -33,15 +33,15 @@ public class SubnetPortsRepository {
private static final String GATEWAY_PORT_DEVICE_OWNER = "network:router_interface";

private CacheFactory cacheFactory;
private ICache<String, SubnetPortIds> subnetPortIdsCache;
private ICache<String, PortIdSubnet> subnetPortIdsCache;

public SubnetPortsRepository(CacheFactory cacheFactory) {
this.cacheFactory = cacheFactory;
this.subnetPortIdsCache= cacheFactory.getCache(SubnetPortIds.class);
this.subnetPortIdsCache= cacheFactory.getCache(PortIdSubnet.class);
}

private List<SubnetPortIds> getSubnetPortIds(List<PortEntity> portEntities) {
Map<String, SubnetPortIds> subnetPortIdsMap = new HashMap<>();
public void addSubnetPortIds(List<PortEntity> portEntities) throws Exception {
//Store the mapping between subnet id and port id
for (PortEntity portEntity: portEntities) {
if (GATEWAY_PORT_DEVICE_OWNER.equals(portEntity.getDeviceOwner())) {
continue;
Expand All @@ -54,37 +54,9 @@ private List<SubnetPortIds> getSubnetPortIds(List<PortEntity> portEntities) {
}

for (PortEntity.FixedIp fixedIp: fixedIps) {
String subnetId = fixedIp.getSubnetId();
if (!subnetPortIdsMap.containsKey(subnetId)) {
SubnetPortIds subnetPortIds = new SubnetPortIds(subnetId, new HashSet<>());
subnetPortIdsMap.put(subnetId, subnetPortIds);
}

subnetPortIdsMap.get(subnetId).getPortIds().add(portEntity.getId());
subnetPortIdsCache.put(fixedIp.getSubnetId() + cacheFactory.KEY_DELIMITER + portEntity.getId(), new PortIdSubnet(fixedIp.getSubnetId()));
}
}

return new ArrayList<>(subnetPortIdsMap.values());
}


public void addSubnetPortIds(List<PortEntity> portEntities) throws Exception {
//Store the mapping between subnet id and port id
List<SubnetPortIds> subnetPortIdsList = getSubnetPortIds(portEntities);

for (SubnetPortIds item: subnetPortIdsList) {
String subnetId = item.getSubnetId();
Set<String> portIds = item.getPortIds();

SubnetPortIds subnetPortIds = subnetPortIdsCache.get(subnetId);
if (subnetPortIds == null) {
subnetPortIds = new SubnetPortIds(subnetId, new HashSet<>(portIds));
} else {
subnetPortIds.getPortIds().addAll(portIds);
}

subnetPortIdsCache.put(subnetId, subnetPortIds);
}
}

public void updateSubnetPortIds(PortEntity oldPortEntity, PortEntity newPortEntity) throws Exception {
Expand All @@ -94,37 +66,29 @@ public void updateSubnetPortIds(PortEntity oldPortEntity, PortEntity newPortEnti
throw new FixedIpsInvalid();
}

List<String> oldSubnetIds = oldPortEntity.getFixedIps().stream()
.map(PortEntity.FixedIp::getSubnetId)
.collect(Collectors.toList());

List<String> newSubnetIds = oldPortEntity.getFixedIps().stream()
.map(PortEntity.FixedIp::getSubnetId)
.collect(Collectors.toList());

if (!oldSubnetIds.equals(newSubnetIds)) {
//Delete port ids from subnetPortIdsCache
for (String subnetId: oldSubnetIds) {
SubnetPortIds subnetPortIds = subnetPortIdsCache.get(subnetId);
if (subnetPortIds != null) {
subnetPortIds.getPortIds().remove(oldPortEntity.getId());
subnetPortIdsCache.put(subnetId, subnetPortIds);
}
}
if (GATEWAY_PORT_DEVICE_OWNER.equals(newPortEntity.getDeviceOwner())) {
return;
}

//Add new port ids to subnetPortIdsCache
for (String subnetId: newSubnetIds) {
SubnetPortIds subnetPortIds = subnetPortIdsCache.get(subnetId);
if (subnetPortIds != null) {
subnetPortIds.getPortIds().add(newPortEntity.getId());
} else {
Set<String> portIds = new HashSet<>();
portIds.add(newPortEntity.getId());
subnetPortIds = new SubnetPortIds(subnetId, portIds);
oldPortEntity.getFixedIps().forEach( item ->
{
try {
subnetPortIdsCache.remove(item.getSubnetId() + cacheFactory.KEY_DELIMITER + oldPortEntity.getId());
} catch (CacheException e) {
e.printStackTrace();
}
}
subnetPortIdsCache.put(subnetId, subnetPortIds);
}
}
);

newPortEntity.getFixedIps().forEach( item ->
{
try {
subnetPortIdsCache.put(item.getSubnetId() + cacheFactory.KEY_DELIMITER + newPortEntity.getId(), new PortIdSubnet(item.getSubnetId()));
} catch (CacheException e) {
e.printStackTrace();
}
}
);
}

public void deleteSubnetPortIds(PortEntity portEntity) throws Exception {
Expand All @@ -133,27 +97,25 @@ public void deleteSubnetPortIds(PortEntity portEntity) throws Exception {
throw new FixedIpsInvalid();
}

List<String> subnetIds = portEntity.getFixedIps().stream()
.map(PortEntity.FixedIp::getSubnetId)
.collect(Collectors.toList());
portEntity.getFixedIps().forEach( item ->
{
try {
subnetPortIdsCache.remove(item.getSubnetId() + cacheFactory.KEY_DELIMITER + portEntity.getId());
} catch (CacheException e) {
e.printStackTrace();
}
}
);

//Delete port ids from subnetPortIdsCache
for (String subnetId: subnetIds) {
SubnetPortIds subnetPortIds = subnetPortIdsCache.get(subnetId);
if (subnetPortIds != null) {
subnetPortIds.getPortIds().remove(portEntity.getId());
subnetPortIdsCache.put(subnetId, subnetPortIds);
}
}
}

@DurationStatistics
public int getSubnetPortNumber(String subnetId) throws CacheException {
SubnetPortIds subnetPortIds = subnetPortIdsCache.get(subnetId);
if (subnetPortIds == null) {
return 0;
}
Map<String, Object[]> queryParams = new HashMap<>();
Object[] values = new Object[1];
values[0] = subnetId;
queryParams.put("subnetId", values);

return subnetPortIds.getPortIds().size();
return subnetPortIdsCache.getAll(queryParams).size();
}
}
}
Loading

0 comments on commit d040632

Please sign in to comment.