Skip to content

Commit

Permalink
Merge branch 'master' into issues/amoro-3172
Browse files Browse the repository at this point in the history
  • Loading branch information
zhoujinsong authored Oct 10, 2024
2 parents ee5816b + a181281 commit a8b3183
Show file tree
Hide file tree
Showing 83 changed files with 1,811 additions and 671 deletions.
8 changes: 4 additions & 4 deletions amoro-ams/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,10 @@
<artifactId>url-connection-client</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3-transfer-manager</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3-transfer-manager</artifactId>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.amoro.config.ConfigHelpers;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.server.dashboard.DashboardServer;
import org.apache.amoro.server.dashboard.JavalinJsonMapper;
import org.apache.amoro.server.dashboard.response.ErrorResponse;
import org.apache.amoro.server.dashboard.utils.AmsUtil;
import org.apache.amoro.server.dashboard.utils.CommonUtil;
Expand Down Expand Up @@ -66,7 +67,6 @@
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Paths;
Expand Down Expand Up @@ -215,7 +215,7 @@ public void dispose() {
MetricManager.dispose();
}

private void initConfig() throws IOException {
private void initConfig() throws Exception {
LOG.info("initializing configurations...");
new ConfigurationHelper().init();
}
Expand Down Expand Up @@ -243,6 +243,7 @@ private void initHttpService() {
config.addStaticFiles(dashboardServer.configStaticFiles());
config.sessionHandler(SessionHandler::new);
config.enableCorsForAllOrigins();
config.jsonMapper(JavalinJsonMapper.createDefaultJsonMapper());
config.showJavalinBanner = false;
});
httpServer.routes(
Expand Down Expand Up @@ -407,14 +408,14 @@ private class ConfigurationHelper {

private JsonNode yamlConfig;

public void init() throws IOException {
public void init() throws Exception {
Map<String, Object> envConfig = initEnvConfig();
initServiceConfig(envConfig);
setIcebergSystemProperties();
initContainerConfig();
}

private void initServiceConfig(Map<String, Object> envConfig) throws IOException {
private void initServiceConfig(Map<String, Object> envConfig) throws Exception {
LOG.info("initializing service configuration...");
String configPath = Environments.getConfigPath() + "/" + SERVER_CONFIG_FILENAME;
LOG.info("load config from path: {}", configPath);
Expand All @@ -438,6 +439,8 @@ private void initServiceConfig(Map<String, Object> envConfig) throws IOException

private Map<String, Object> initEnvConfig() {
LOG.info("initializing system env configuration...");
Map<String, String> envs = System.getenv();
envs.forEach((k, v) -> LOG.info("export {}={}", k, v));
String prefix = AmoroManagementConf.SYSTEM_CONFIG.toUpperCase();
return ConfigHelpers.convertConfigurationKeys(prefix, System.getenv());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.amoro.resource.Resource;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.server.exception.ForbiddenException;
import org.apache.amoro.server.exception.IllegalTaskStateException;
import org.apache.amoro.server.exception.ObjectNotExistsException;
import org.apache.amoro.server.exception.PluginRetryAuthException;
import org.apache.amoro.server.exception.TaskNotFoundException;
Expand All @@ -49,7 +50,6 @@
import org.apache.amoro.server.table.DefaultTableService;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableRuntimeMeta;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -121,24 +121,24 @@ public RuntimeHandlerChain getTableRuntimeHandler() {
return tableHandlerChain;
}

private void loadOptimizingQueues(List<TableRuntimeMeta> tableRuntimeMetaList) {
private void loadOptimizingQueues(List<TableRuntime> tableRuntimeMetaList) {
List<ResourceGroup> optimizerGroups =
getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups);
List<OptimizerInstance> optimizers = getAs(OptimizerMapper.class, OptimizerMapper::selectAll);
Map<String, List<TableRuntimeMeta>> groupToTableRuntimes =
Map<String, List<TableRuntime>> groupToTableRuntimes =
tableRuntimeMetaList.stream()
.collect(Collectors.groupingBy(TableRuntimeMeta::getOptimizerGroup));
.collect(Collectors.groupingBy(TableRuntime::getOptimizerGroup));
optimizerGroups.forEach(
group -> {
String groupName = group.getName();
List<TableRuntimeMeta> tableRuntimeMetas = groupToTableRuntimes.remove(groupName);
List<TableRuntime> tableRuntimes = groupToTableRuntimes.remove(groupName);
OptimizingQueue optimizingQueue =
new OptimizingQueue(
tableService,
group,
this,
planExecutor,
Optional.ofNullable(tableRuntimeMetas).orElseGet(ArrayList::new),
Optional.ofNullable(tableRuntimes).orElseGet(ArrayList::new),
maxPlanningParallelism);
optimizingQueueByGroup.put(groupName, optimizingQueue);
});
Expand All @@ -148,8 +148,8 @@ private void loadOptimizingQueues(List<TableRuntimeMeta> tableRuntimeMetaList) {
.forEach(groupName -> LOG.warn("Unloaded task runtime in group {}", groupName));
}

private void registerOptimizer(OptimizerInstance optimizer, boolean needPersistency) {
if (needPersistency) {
private void registerOptimizer(OptimizerInstance optimizer, boolean needPersistent) {
if (needPersistent) {
doAs(OptimizerMapper.class, mapper -> mapper.insertOptimizer(optimizer));
}

Expand Down Expand Up @@ -456,9 +456,9 @@ public void handleTableRemoved(TableRuntime tableRuntime) {
}

@Override
protected void initHandler(List<TableRuntimeMeta> tableRuntimeMetaList) {
protected void initHandler(List<TableRuntime> tableRuntimeList) {
LOG.info("OptimizerManagementService begin initializing");
loadOptimizingQueues(tableRuntimeMetaList);
loadOptimizingQueues(tableRuntimeList);
optimizerKeeper.start();
LOG.info("SuspendingDetector for Optimizer has been started.");
LOG.info("OptimizerManagementService initializing has completed");
Expand Down Expand Up @@ -566,7 +566,14 @@ private void retryTask(TaskRuntime task, OptimizingQueue queue) {
task.getTaskId(),
task.getResourceDesc());
// optimizing task of suspending optimizer would not be counted for retrying
queue.retryTask(task);
try {
queue.retryTask(task);
} catch (IllegalTaskStateException e) {
LOG.error(
"Retry task {} failed due to {}, will check it in next round",
task.getTaskId(),
e.getMessage());
}
}

private Predicate<TaskRuntime> buildSuspendingPredication(Set<String> activeTokens) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.amoro.server.dashboard.controller.HealthCheckController;
import org.apache.amoro.server.dashboard.controller.LoginController;
import org.apache.amoro.server.dashboard.controller.OptimizerController;
import org.apache.amoro.server.dashboard.controller.OptimizerGroupController;
import org.apache.amoro.server.dashboard.controller.OverviewController;
import org.apache.amoro.server.dashboard.controller.PlatformFileInfoController;
import org.apache.amoro.server.dashboard.controller.SettingController;
Expand Down Expand Up @@ -77,6 +78,7 @@ public class DashboardServer {
private final CatalogController catalogController;
private final HealthCheckController healthCheckController;
private final LoginController loginController;
private final OptimizerGroupController optimizerGroupController;
private final OptimizerController optimizerController;
private final PlatformFileInfoController platformFileInfoController;
private final SettingController settingController;
Expand All @@ -98,7 +100,8 @@ public DashboardServer(
this.catalogController = new CatalogController(tableService, platformFileManager);
this.healthCheckController = new HealthCheckController();
this.loginController = new LoginController(serviceConfig);
this.optimizerController = new OptimizerController(tableService, optimizerManager);
this.optimizerGroupController = new OptimizerGroupController(tableService, optimizerManager);
this.optimizerController = new OptimizerController(optimizerManager);
this.platformFileInfoController = new PlatformFileInfoController(platformFileManager);
this.settingController = new SettingController(serviceConfig, optimizerManager);
ServerTableDescriptor tableDescriptor = new ServerTableDescriptor(tableService, serviceConfig);
Expand Down Expand Up @@ -221,6 +224,9 @@ private EndpointGroup apiGroup() {
get(
"/catalogs/{catalog}/dbs/{db}/tables/{table}/optimizing-processes",
tableController::getOptimizingProcesses);
get(
"/catalogs/{catalog}/dbs/{db}/tables/{table}/optimizing-types",
tableController::getOptimizingTypes);
get(
"/catalogs/{catalog}/dbs/{db}/tables/{table}/optimizing-processes/{processId}/tasks",
tableController::getOptimizingProcessTasks);
Expand Down Expand Up @@ -274,26 +280,29 @@ private EndpointGroup apiGroup() {
() -> {
get(
"/optimizerGroups/{optimizerGroup}/tables",
optimizerController::getOptimizerTables);
get("/optimizerGroups/{optimizerGroup}/optimizers", optimizerController::getOptimizers);
get("/optimizerGroups", optimizerController::getOptimizerGroups);
optimizerGroupController::getOptimizerTables);
get(
"/optimizerGroups/{optimizerGroup}/optimizers",
optimizerGroupController::getOptimizers);
get("/optimizerGroups", optimizerGroupController::getOptimizerGroups);
get(
"/optimizerGroups/{optimizerGroup}/info",
optimizerController::getOptimizerGroupInfo);
delete(
"/optimizerGroups/{optimizerGroup}/optimizers/{jobId}",
optimizerController::releaseOptimizer);
optimizerGroupController::getOptimizerGroupInfo);
post(
"/optimizerGroups/{optimizerGroup}/optimizers",
optimizerController::scaleOutOptimizer);
get("/resourceGroups", optimizerController::getResourceGroup);
post("/resourceGroups", optimizerController::createResourceGroup);
put("/resourceGroups", optimizerController::updateResourceGroup);
delete("/resourceGroups/{resourceGroupName}", optimizerController::deleteResourceGroup);
optimizerGroupController::scaleOutOptimizer);
post("/optimizers", optimizerController::createOptimizer);
delete("/optimizers/{jobId}", optimizerController::releaseOptimizer);
get("/resourceGroups", optimizerGroupController::getResourceGroup);
post("/resourceGroups", optimizerGroupController::createResourceGroup);
put("/resourceGroups", optimizerGroupController::updateResourceGroup);
delete(
"/resourceGroups/{resourceGroupName}",
optimizerGroupController::deleteResourceGroup);
get(
"/resourceGroups/{resourceGroupName}/delete/check",
optimizerController::deleteCheckResourceGroup);
get("/containers/get", optimizerController::getContainers);
optimizerGroupController::deleteCheckResourceGroup);
get("/containers/get", optimizerGroupController::getContainers);
});

// console apis
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.dashboard;

import io.javalin.plugin.json.JsonMapper;
import org.apache.amoro.TableFormat;
import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
import org.jetbrains.annotations.NotNull;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;

/** Json mapper to adapt shaded jackson. */
public class JavalinJsonMapper implements JsonMapper {

private final ObjectMapper objectMapper;

public static JavalinJsonMapper createDefaultJsonMapper() {
ObjectMapper om = new ObjectMapper();
SimpleModule module = new SimpleModule();
module.addSerializer(TableFormat.class, new TableFormat.JsonSerializer());
module.addDeserializer(TableFormat.class, new TableFormat.JsonDeserializer());
om.registerModule(module);
return new JavalinJsonMapper(om);
}

public JavalinJsonMapper(ObjectMapper shadedMapper) {
this.objectMapper = shadedMapper;
}

@NotNull
@Override
public String toJsonString(@NotNull Object obj) {
if (obj instanceof String) {
return (String) obj;
}
try {
return objectMapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

@NotNull
@Override
public InputStream toJsonStream(@NotNull Object obj) {
if (obj instanceof String) {
String result = (String) obj;
return new ByteArrayInputStream(result.getBytes());
} else {
byte[] string = new byte[0];
try {
string = objectMapper.writeValueAsBytes(obj);
return new ByteArrayInputStream(string);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}

@NotNull
@Override
public <T> T fromJsonString(@NotNull String json, @NotNull Class<T> targetClass) {
try {
return objectMapper.readValue(json, targetClass);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

@NotNull
@Override
public <T> T fromJsonStream(@NotNull InputStream json, @NotNull Class<T> targetClass) {
try {
return objectMapper.readValue(json, targetClass);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.amoro.server.optimizing.MetricsSummary;
import org.apache.amoro.server.optimizing.OptimizingProcessMeta;
import org.apache.amoro.server.optimizing.OptimizingTaskMeta;
import org.apache.amoro.server.optimizing.OptimizingType;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
Expand Down Expand Up @@ -64,6 +65,7 @@
import org.apache.amoro.utils.MixedDataFiles;
import org.apache.amoro.utils.MixedTableUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.HasTableOperations;
Expand Down Expand Up @@ -501,7 +503,7 @@ public List<ConsumerInfo> getTableConsumerInfos(AmoroTable<?> amoroTable) {

@Override
public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
AmoroTable<?> amoroTable, int limit, int offset) {
AmoroTable<?> amoroTable, String type, ProcessStatus status, int limit, int offset) {
TableIdentifier tableIdentifier = amoroTable.id();
List<OptimizingProcessMeta> processMetaList =
getAs(
Expand All @@ -511,6 +513,16 @@ public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
tableIdentifier.getCatalog(),
tableIdentifier.getDatabase(),
tableIdentifier.getTableName()));

processMetaList =
processMetaList.stream()
.filter(
p ->
StringUtils.isBlank(type)
|| type.equalsIgnoreCase(p.getOptimizingType().getStatus().displayValue()))
.filter(p -> status == null || status.name().equalsIgnoreCase(p.getStatus().name()))
.collect(Collectors.toList());

int total = processMetaList.size();
processMetaList =
processMetaList.stream().skip(offset).limit(limit).collect(Collectors.toList());
Expand All @@ -532,6 +544,15 @@ public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
total);
}

@Override
public Map<String, String> getTableOptimizingTypes(AmoroTable<?> amoroTable) {
Map<String, String> types = Maps.newHashMap();
for (OptimizingType type : OptimizingType.values()) {
types.put(type.name(), type.getStatus().displayValue());
}
return types;
}

@Override
public List<OptimizingTaskInfo> getOptimizingTaskInfos(
AmoroTable<?> amoroTable, String processId) {
Expand Down
Loading

0 comments on commit a8b3183

Please sign in to comment.