From 7de80a3c8ecc86f416173e78a0883aafa825170c Mon Sep 17 00:00:00 2001 From: Andrei Paduroiu Date: Thu, 1 Mar 2018 14:16:47 -0800 Subject: [PATCH] Issue 2228: (Segment Store) Admin Tools (#2256) * Builds a shell for a command-line interface that can support an arbitrary set of commands a user wants to run. * Is able to load Pravega config from a config file and set custom configs on it * BookKeeper commands: . List all BookKeeperLog summaries . List details for a BookKeeperLog . Cleanup orphan ledgers (issue #1165) . Enabling a BookKeeperLog that is currently disabled. . Disabling a BookKeeperLog that is currently enabled. * Container commands: . Executing a non-invasive DurableLog recovery for a specified container * Updates the SelfTester to pause for user input towards the end of the test - this allows a quick debugging of the admin tools, by pointing them to a local cluster that already has data and/or is active Signed-off-by: Andrei Paduroiu --- .codecov.yml | 1 + build.gradle | 8 + .../server/host/admin/AdminRunner.java | 159 ++++++++++ .../server/host/admin/Parser.java | 62 ++++ .../admin/commands/AdminCommandState.java | 46 +++ .../commands/BookKeeperCleanupCommand.java | 151 ++++++++++ .../admin/commands/BookKeeperCommand.java | 94 ++++++ .../commands/BookKeeperDetailsCommand.java | 84 ++++++ .../commands/BookKeeperDisableCommand.java | 95 ++++++ .../commands/BookKeeperEnableCommand.java | 72 +++++ .../admin/commands/BookKeeperListCommand.java | 46 +++ .../server/host/admin/commands/Command.java | 271 ++++++++++++++++++ .../host/admin/commands/CommandArgs.java | 34 +++ .../host/admin/commands/ConfigCommand.java | 21 ++ .../admin/commands/ConfigListCommand.java | 36 +++ .../host/admin/commands/ConfigSetCommand.java | 49 ++++ .../host/admin/commands/ContainerCommand.java | 71 +++++ .../commands/ContainerRecoverCommand.java | 165 +++++++++++ .../server/DataCorruptionException.java | 30 +- .../segmentstore/server/logs/DataFrame.java | 4 +- .../server/logs/DataFrameReader.java | 66 +++-- .../server/logs/DebugRecoveryProcessor.java | 192 +++++++++++++ .../segmentstore/server/logs/DurableLog.java | 199 +++---------- .../server/logs/RecoveryProcessor.java | 211 ++++++++++++++ .../impl/bookkeeper/BookKeeperLogFactory.java | 23 +- .../impl/bookkeeper/DebugLogWrapper.java | 200 +++++++++++++ .../impl/bookkeeper/LedgerMetadata.java | 2 +- .../storage/impl/bookkeeper/Ledgers.java | 2 +- .../storage/impl/bookkeeper/LogMetadata.java | 5 +- .../impl/bookkeeper/ReadOnlyLogMetadata.java | 55 ++++ .../integration/selftest/SelfTestRunner.java | 7 +- .../test/integration/selftest/TestConfig.java | 4 + .../adapters/OutOfProcessAdapter.java | 4 + 33 files changed, 2269 insertions(+), 200 deletions(-) create mode 100644 segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/AdminRunner.java create mode 100644 segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/Parser.java create mode 100644 segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/AdminCommandState.java create mode 100644 segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/BookKeeperCleanupCommand.java create mode 100644 segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/BookKeeperCommand.java create mode 100644 segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/BookKeeperDetailsCommand.java create mode 100644 segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/BookKeeperDisableCommand.java create mode 100644 segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/BookKeeperEnableCommand.java create mode 100644 segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/BookKeeperListCommand.java create mode 100644 segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/Command.java create mode 100644 segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/CommandArgs.java create mode 100644 segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/ConfigCommand.java create mode 100644 segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/ConfigListCommand.java create mode 100644 segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/ConfigSetCommand.java create mode 100644 segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/ContainerCommand.java create mode 100644 segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/ContainerRecoverCommand.java create mode 100644 segmentstore/server/src/main/java/io/pravega/segmentstore/server/logs/DebugRecoveryProcessor.java create mode 100644 segmentstore/server/src/main/java/io/pravega/segmentstore/server/logs/RecoveryProcessor.java create mode 100644 segmentstore/storage/impl/src/main/java/io/pravega/segmentstore/storage/impl/bookkeeper/DebugLogWrapper.java create mode 100644 segmentstore/storage/impl/src/main/java/io/pravega/segmentstore/storage/impl/bookkeeper/ReadOnlyLogMetadata.java diff --git a/.codecov.yml b/.codecov.yml index d57f3245c4f..03600bdb1ca 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -20,3 +20,4 @@ coverage: - "**/generated/**" - "standalone" - "test" + - "**/host/admin" diff --git a/build.gradle b/build.gradle index 00e81735d61..c6d8827e585 100644 --- a/build.gradle +++ b/build.gradle @@ -274,6 +274,13 @@ project('segmentstore:server:host') { outputDir = startScripts.outputDir } + task admin(type: JavaExec) { + main = "io.pravega.segmentstore.server.host.admin.AdminRunner" + classpath = sourceSets.main.runtimeClasspath + standardInput = System.in + systemProperties System.getProperties() + } + applicationDistribution.into("bin") { from(createAppWithGCLogging) } @@ -370,6 +377,7 @@ project('test:integration') { task selftest(type: JavaExec) { main = "io.pravega.test.integration.selftest.SelfTestRunner" classpath = sourceSets.main.runtimeClasspath + standardInput = System.in systemProperties System.getProperties() } } diff --git a/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/AdminRunner.java b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/AdminRunner.java new file mode 100644 index 00000000000..84d3501af95 --- /dev/null +++ b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/AdminRunner.java @@ -0,0 +1,159 @@ +/** + * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.segmentstore.server.host.admin; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.LoggerContext; +import com.google.common.base.Strings; +import io.pravega.segmentstore.server.host.admin.commands.AdminCommandState; +import io.pravega.segmentstore.server.host.admin.commands.Command; +import io.pravega.segmentstore.server.host.admin.commands.CommandArgs; +import io.pravega.segmentstore.server.host.admin.commands.ConfigListCommand; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Scanner; +import java.util.stream.Collectors; +import lombok.Cleanup; +import lombok.val; +import org.slf4j.LoggerFactory; + +/** + * Main entry point for the Admin tools. + */ +public final class AdminRunner { + private static final String CMD_HELP = "help"; + private static final String CMD_EXIT = "exit"; + + /** + * Main entry point for the Admin Tools Runner. + *

+ * To speed up setup, create a config.properties file and put the following properties (at a minimum): + *

+ * pravegaservice.containerCount={number of containers} + * pravegaservice.zkURL={host:port for ZooKeeper} + * bookkeeper.bkLedgerPath={path in ZooKeeper where BookKeeper stores Ledger metadata} + * bookkeeper.zkMetadataPath={path in ZooKeeper where Pravega stores BookKeeperLog metadata} + *

+ * Then invoke this program with: + * -Dpravega.configurationFile=config.properties + * + * @param args Arguments. + * @throws Exception If one occurred. + */ + public static void main(String[] args) throws Exception { + LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); + context.getLoggerList().get(0).setLevel(Level.ERROR); + + System.out.println("Pravega Admin Tools.\n"); + @Cleanup + AdminCommandState state = new AdminCommandState(); + + // Output loaded config. + System.out.println("Initial configuration:"); + val initialConfigCmd = new ConfigListCommand(new CommandArgs(Collections.emptyList(), state)); + initialConfigCmd.execute(); + + // Continuously accept new commands as long as the user entered one. + System.out.println(String.format("%nType \"%s\" for list of commands, or \"%s\" to exit.", CMD_HELP, CMD_EXIT)); + Scanner input = new Scanner(System.in); + while (true) { + System.out.print(System.lineSeparator() + "> "); + String line = input.nextLine(); + if (Strings.isNullOrEmpty(line.trim())) { + continue; + } + + Parser.Command pc = Parser.parse(line); + switch (pc.getComponent()) { + case CMD_HELP: + printHelp(null); + break; + case CMD_EXIT: + System.exit(0); + break; + default: + execCommand(pc, state); + break; + } + } + } + + private static void execCommand(Parser.Command pc, AdminCommandState state) { + CommandArgs cmdArgs = new CommandArgs(pc.getArgs(), state); + try { + Command cmd = Command.Factory.get(pc.getComponent(), pc.getName(), cmdArgs); + if (cmd == null) { + // No command was found. + printHelp(pc); + } else { + cmd.execute(); + } + } catch (IllegalArgumentException ex) { + // We found a command, but had the wrong arguments to it. + System.out.println("Bad command syntax: " + ex.getMessage()); + printCommandDetails(pc); + } catch (Exception ex) { + ex.printStackTrace(System.out); + } + } + + private static void printCommandSummary(Command.CommandDescriptor d) { + System.out.println(String.format("\t%s %s %s: %s", + d.getComponent(), + d.getName(), + Arrays.stream(d.getArgs()).map(AdminRunner::formatArgName).collect(Collectors.joining(" ")), + d.getDescription())); + } + + private static void printCommandDetails(Parser.Command command) { + Command.CommandDescriptor d = Command.Factory.getDescriptor(command.getComponent(), command.getName()); + if (d == null) { + printHelp(command); + return; + } + + printCommandSummary(d); + for (Command.ArgDescriptor ad : d.getArgs()) { + System.out.println(String.format("\t\t%s: %s", formatArgName(ad), ad.getDescription())); + } + } + + private static void printHelp(Parser.Command command) { + Collection commands; + if (command == null) { + // All commands. + commands = Command.Factory.getDescriptors(); + System.out.println("All available commands:"); + } else { + // Commands specific to a component. + commands = Command.Factory.getDescriptors(command.getComponent()); + if (commands.isEmpty()) { + System.out.println(String.format("No commands are available for component '%s'.", command.getComponent())); + } else { + System.out.println(String.format("All commands for component '%s':", command.getComponent())); + } + } + + commands.stream() + .sorted((d1, d2) -> { + int c = d1.getComponent().compareTo(d2.getComponent()); + if (c == 0) { + c = d1.getName().compareTo(d2.getName()); + } + return c; + }) + .forEach(AdminRunner::printCommandSummary); + } + + private static String formatArgName(Command.ArgDescriptor ad) { + return String.format("<%s>", ad.getName()); + } +} diff --git a/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/Parser.java b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/Parser.java new file mode 100644 index 00000000000..2aa265069d0 --- /dev/null +++ b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/Parser.java @@ -0,0 +1,62 @@ +/** + * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.segmentstore.server.host.admin; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Scanner; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * Helps parse Strings into Commands. + */ +final class Parser { + private static final String SCANNER_PATTERN = "[^\"\\s]+|\"(\\\\.|[^\\\\\"])*\""; + + /** + * Parses the given String into a Command, separating elements by spaces, and treating characters between double quotes(") + * as a single element. The first element is the Command Component, the second is the Command Name and the rest will + * be gathered as an ordered list of arguments. + * + * @param s The string to parse. + * @return A new instance of the Command class. + */ + static Command parse(String s) { + Scanner scanner = new Scanner(s); + String component = scanner.findInLine(SCANNER_PATTERN); + String command = scanner.findInLine(SCANNER_PATTERN); + ArrayList args = new ArrayList<>(); + String arg; + while ((arg = scanner.findInLine(SCANNER_PATTERN)) != null) { + args.add(arg); + } + + return new Command(component, command, Collections.unmodifiableList(args)); + } + + /** + * Represents a parsed Command. + */ + @RequiredArgsConstructor(access = AccessLevel.PRIVATE) + @Getter + static class Command { + private final String component; + private final String name; + private final List args; + + @Override + public String toString() { + return String.format("%s %s (%s)", this.component, this.name, String.join(", ", this.args)); + } + } +} diff --git a/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/AdminCommandState.java b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/AdminCommandState.java new file mode 100644 index 00000000000..72b7e6bdac6 --- /dev/null +++ b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/AdminCommandState.java @@ -0,0 +1,46 @@ +/** + * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.segmentstore.server.host.admin.commands; + +import io.pravega.common.concurrent.ExecutorServiceHelpers; +import io.pravega.segmentstore.server.store.ServiceBuilderConfig; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.concurrent.ScheduledExecutorService; +import lombok.Getter; + +/** + * Keeps state between commands. + */ +public class AdminCommandState implements AutoCloseable { + @Getter + private final ServiceBuilderConfig.Builder configBuilder; + @Getter + private final ScheduledExecutorService executor = ExecutorServiceHelpers.newScheduledThreadPool(3, "admin-tools"); + + /** + * Creates a new instance of the AdminCommandState class. + * + * @throws IOException If unable to read specified config properties file (assuming it exists). + */ + public AdminCommandState() throws IOException { + this.configBuilder = ServiceBuilderConfig.builder(); + try { + this.configBuilder.include(System.getProperty(ServiceBuilderConfig.CONFIG_FILE_PROPERTY_NAME, "config.properties")); + } catch (FileNotFoundException ex) { + // Nothing to do here. + } + } + + @Override + public void close() { + this.executor.shutdown(); + } +} diff --git a/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/BookKeeperCleanupCommand.java b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/BookKeeperCleanupCommand.java new file mode 100644 index 00000000000..6d95310fafe --- /dev/null +++ b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/BookKeeperCleanupCommand.java @@ -0,0 +1,151 @@ +/** + * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.segmentstore.server.host.admin.commands; + +import io.pravega.common.Exceptions; +import io.pravega.segmentstore.storage.impl.bookkeeper.DebugLogWrapper; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import lombok.Cleanup; +import lombok.val; +import org.apache.bookkeeper.client.BookKeeperAdmin; + +/** + * Identifies and deletes orphaned BookKeeper ledgers. + */ +public class BookKeeperCleanupCommand extends BookKeeperCommand { + /** + * Creates a new instance of the BookKeeperCleanupCommand. + * + * @param args The arguments for the command. + */ + BookKeeperCleanupCommand(CommandArgs args) { + super(args); + } + + @Override + public void execute() throws Exception { + ensureArgCount(0); + + @Cleanup + val context = createContext(); + + // Get all BK ledger ids. + output("Searching for all the ledgers ..."); + val bkAdmin = new BookKeeperAdmin(context.logFactory.getBookKeeperClient()); + val allLedgerIds = new ArrayList(); + bkAdmin.listLedgers().forEach(allLedgerIds::add); + + output("Searching for all referenced ledgers ..."); + + // We will not be deleting any ledger id above the highest referenced Ledger Id since we may be inadvertently + // deleting freshly created Ledgers that have not yet been added to a Ledger Metadata yet (BookKeeperLog.initialize + // first creates a new Ledger and then updates the Metadata in ZK with its existence). + AtomicLong highestReferencedLedgerId = new AtomicLong(); + val referencedLedgerIds = new HashSet(); + collectAllReferencedLedgerIds(referencedLedgerIds, context); + highestReferencedLedgerId.set(referencedLedgerIds.stream().max(Long::compareTo).orElse(-1L)); + + // We want to do our due diligence and verify there are no more other BookKeeperLogs that the user hasn't told us about. + output("Searching for possible other BookKeeperLogs ..."); + checkForExtraLogs(context); + + // Determine deletion candidates. + val deletionCandidates = allLedgerIds.stream() + .filter(id -> id < highestReferencedLedgerId.get() && !referencedLedgerIds.contains(id)) + .collect(Collectors.toList()); + output("\nTotal Count: %d, Referenced Count: %d, Highest Referenced Id: %s, To Delete Count: %d.", + allLedgerIds.size(), referencedLedgerIds.size(), highestReferencedLedgerId, deletionCandidates.size()); + if (deletionCandidates.isEmpty()) { + output("There are no Ledgers eligible for deletion at this time."); + return; + } + + output("\nDeletion candidates:"); + listCandidates(deletionCandidates, context); + + if (!confirmContinue()) { + output("Not deleting anything at this time."); + return; + } + + // Search again for referenced ledger ids, in case any new ones were just referenced. + collectAllReferencedLedgerIds(referencedLedgerIds, context); + highestReferencedLedgerId.set(referencedLedgerIds.stream().max(Long::compareTo).orElse(-1L)); + deleteCandidates(deletionCandidates, referencedLedgerIds, context); + } + + private void deleteCandidates(List deletionCandidates, Collection referencedLedgerIds, Context context) { + for (long ledgerId : deletionCandidates) { + if (referencedLedgerIds.contains(ledgerId)) { + output("Not deleting Ledger %d because is is now referenced.", ledgerId); + continue; + } + + try { + Exceptions.handleInterrupted(() -> context.logFactory.getBookKeeperClient().deleteLedger(ledgerId)); + output("Deleted Ledger %d.", ledgerId); + } catch (Exception ex) { + output("FAILED to delete Ledger %d: %s.", ledgerId, ex.getMessage()); + } + } + } + + private void listCandidates(List deletionCandidates, Context context) { + for (long ledgerId : deletionCandidates) { + try { + val lh = context.bkAdmin.openLedgerNoRecovery(ledgerId); + output("\tLedger %d: LAC=%d, Length=%d, Bookies=%d, Frags=%d.", + ledgerId, lh.getLastAddConfirmed(), lh.getLength(), lh.getNumBookies(), lh.getNumFragments()); + } catch (Exception ex) { + output("Ledger %d: %s.", ledgerId, ex.getMessage()); + } + } + } + + private void collectAllReferencedLedgerIds(Collection referencedLedgerIds, Context context) throws Exception { + referencedLedgerIds.clear(); + for (int logId = 0; logId < context.serviceConfig.getContainerCount(); logId++) { + @Cleanup + DebugLogWrapper log = context.logFactory.createDebugLogWrapper(logId); + val m = log.fetchMetadata(); + if (m == null) { + continue; + } + + for (val lm : m.getLedgers()) { + referencedLedgerIds.add(lm.getLedgerId()); + } + } + } + + private void checkForExtraLogs(Context context) throws Exception { + val maxLogId = context.serviceConfig.getContainerCount() * 10; + for (int logId = context.serviceConfig.getContainerCount(); logId < maxLogId; logId++) { + @Cleanup + DebugLogWrapper log = context.logFactory.createDebugLogWrapper(logId); + val m = log.fetchMetadata(); + if (m != null) { + throw new Exception(String.format("Discovered BookKeeperLog %d which is beyond the maximum log id (%d) as specified in the configuration.", + logId, context.serviceConfig.getContainerCount() - 1)); + } + } + } + + static CommandDescriptor descriptor() { + return new CommandDescriptor(BookKeeperCommand.COMPONENT, + "cleanup", + "Removes orphan BookKeeper Ledgers that are not used by any BookKeeperLog."); + } +} diff --git a/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/BookKeeperCommand.java b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/BookKeeperCommand.java new file mode 100644 index 00000000000..15b0c89910f --- /dev/null +++ b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/BookKeeperCommand.java @@ -0,0 +1,94 @@ +/** + * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.segmentstore.server.host.admin.commands; + +import io.pravega.common.Exceptions; +import io.pravega.segmentstore.server.store.ServiceConfig; +import io.pravega.segmentstore.storage.DurableDataLogException; +import io.pravega.segmentstore.storage.impl.bookkeeper.BookKeeperConfig; +import io.pravega.segmentstore.storage.impl.bookkeeper.BookKeeperLogFactory; +import io.pravega.segmentstore.storage.impl.bookkeeper.ReadOnlyLogMetadata; +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.val; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeperAdmin; +import org.apache.curator.framework.CuratorFramework; + +/** + * Base for any BookKeeper-related commands. + */ +abstract class BookKeeperCommand extends Command { + static final String COMPONENT = "bk"; + + BookKeeperCommand(CommandArgs args) { + super(args); + } + + /** + * Outputs a summary for the given Log. + * + * @param logId The Log Id. + * @param m The Log Metadata for the given Log Id. + */ + protected void outputLogSummary(int logId, ReadOnlyLogMetadata m) { + if (m == null) { + output("Log %d: No metadata.", logId); + } else { + output("Log %d: Epoch=%d, Version=%d, Enabled=%s, Ledgers=%d, Truncation={%s}", logId, + m.getEpoch(), m.getUpdateVersion(), m.isEnabled(), m.getLedgers().size(), m.getTruncationAddress()); + } + } + + /** + * Creates a new Context to be used by the BookKeeper command. + * + * @return A new Context. + * @throws DurableDataLogException If the BookKeeperLogFactory could not be initialized. + */ + protected Context createContext() throws DurableDataLogException { + val serviceConfig = getServiceConfig(); + val bkConfig = getCommandArgs().getState().getConfigBuilder() + .include(BookKeeperConfig.builder().with(BookKeeperConfig.ZK_ADDRESS, serviceConfig.getZkURL())) + .build().getConfig(BookKeeperConfig::builder); + val zkClient = createZKClient(); + val factory = new BookKeeperLogFactory(bkConfig, zkClient, getCommandArgs().getState().getExecutor()); + try { + factory.initialize(); + } catch (DurableDataLogException ex) { + zkClient.close(); + throw ex; + } + + val bkAdmin = new BookKeeperAdmin(factory.getBookKeeperClient()); + return new Context(serviceConfig, bkConfig, zkClient, factory, bkAdmin); + } + + @RequiredArgsConstructor(access = AccessLevel.PROTECTED) + protected static class Context implements AutoCloseable { + final ServiceConfig serviceConfig; + final BookKeeperConfig bookKeeperConfig; + final CuratorFramework zkClient; + final BookKeeperLogFactory logFactory; + final BookKeeperAdmin bkAdmin; + + @Override + @SneakyThrows(BKException.class) + public void close() { + this.logFactory.close(); + this.zkClient.close(); + + // There is no need to close the BK Admin object since it doesn't own anything; however it does have a close() + // method and it's a good idea to invoke it. + Exceptions.handleInterrupted(this.bkAdmin::close); + } + } +} diff --git a/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/BookKeeperDetailsCommand.java b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/BookKeeperDetailsCommand.java new file mode 100644 index 00000000000..cc36bfc96f8 --- /dev/null +++ b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/BookKeeperDetailsCommand.java @@ -0,0 +1,84 @@ +/** + * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.segmentstore.server.host.admin.commands; + +import io.pravega.segmentstore.storage.impl.bookkeeper.LedgerMetadata; +import java.util.stream.Collectors; +import lombok.Cleanup; +import lombok.val; +import org.apache.bookkeeper.client.LedgerHandle; + +/** + * Fetches details about a BookKeeperLog. + */ +public class BookKeeperDetailsCommand extends BookKeeperCommand { + + /** + * Creates a new instance of the BookKeeperDetailsCommand. + * + * @param args The arguments for the command. + */ + BookKeeperDetailsCommand(CommandArgs args) { + super(args); + } + + @Override + public void execute() throws Exception { + ensureArgCount(1); + int logId = getIntArg(0); + + @Cleanup + val context = createContext(); + @Cleanup + val log = context.logFactory.createDebugLogWrapper(logId); + val m = log.fetchMetadata(); + outputLogSummary(logId, m); + if (m == null) { + // Nothing else to do. + return; + } + + if (m.getLedgers().size() == 0) { + output("There are no ledgers for Log %s.", logId); + return; + } + + for (LedgerMetadata lm : m.getLedgers()) { + LedgerHandle lh = null; + try { + lh = log.openLedgerNoFencing(lm); + val bkLm = context.bkAdmin.getLedgerMetadata(lh); + output("\tLedger %d: Seq=%d, Status=%s, LAC=%d, Length=%d, Bookies=%d, Frags=%d, E/W/A=%d/%d/%d, Ensembles=%s.", + lm.getLedgerId(), lm.getSequence(), lm.getStatus(), + lh.getLastAddConfirmed(), lh.getLength(), lh.getNumBookies(), lh.getNumFragments(), + bkLm.getEnsembleSize(), bkLm.getWriteQuorumSize(), bkLm.getAckQuorumSize(), getEnsembleDescription(bkLm)); + } catch (Exception ex) { + output("\tLedger %d: Seq = %d, Status = %s. BK: %s", + lm.getLedgerId(), lm.getSequence(), lm.getStatus(), ex.getMessage()); + } finally { + if (lh != null) { + lh.close(); + } + } + } + } + + private String getEnsembleDescription(org.apache.bookkeeper.client.LedgerMetadata bkLm) { + return bkLm.getEnsembles().entrySet().stream() + .map(e -> String.format("%d:[%s]", e.getKey(), e.getValue().stream().map(Object::toString).collect(Collectors.joining(",")))) + .collect(Collectors.joining(",")); + } + + static CommandDescriptor descriptor() { + return new CommandDescriptor(BookKeeperCommand.COMPONENT, "details", + "Lists metadata details about a BookKeeperLog, including BK Ledger information.", + new ArgDescriptor("log-id", "Id of the log to get details for.")); + } +} diff --git a/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/BookKeeperDisableCommand.java b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/BookKeeperDisableCommand.java new file mode 100644 index 00000000000..482ed32f4ce --- /dev/null +++ b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/BookKeeperDisableCommand.java @@ -0,0 +1,95 @@ +/** + * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.segmentstore.server.host.admin.commands; + +import io.pravega.common.util.RetriesExhaustedException; +import io.pravega.common.util.Retry; +import io.pravega.segmentstore.storage.DataLogWriterNotPrimaryException; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.Cleanup; +import lombok.val; + +/** + * Disables a BookKeeperLog. + */ +public class BookKeeperDisableCommand extends BookKeeperCommand { + private static final int MAX_RETRIES = 10; + private static final Retry.RetryAndThrowBase DISABLE_RETRY = Retry + .withExpBackoff(100, 2, MAX_RETRIES, 1000) + .retryWhen(ex -> ex instanceof DataLogWriterNotPrimaryException) + .throwingOn(Exception.class); + + /** + * Creates a new instance of the BookKeeperDisableCommand. + * + * @param args The arguments for the command. + */ + BookKeeperDisableCommand(CommandArgs args) { + super(args); + } + + @Override + public void execute() throws Exception { + ensureArgCount(1); + int logId = getIntArg(0); + + @Cleanup + val context = createContext(); + @Cleanup + val log = context.logFactory.createDebugLogWrapper(logId); + + // Display a summary of the BookKeeperLog. + val m = log.fetchMetadata(); + outputLogSummary(logId, m); + if (m == null) { + // Nothing else to do. + return; + } else if (!m.isEnabled()) { + output("BookKeeperLog '%s' is already disabled.", logId); + return; + } + + output("BookKeeperLog '%s' is about to be DISABLED.", logId); + output("\tIts SegmentContainer will shut down and it will not be able to restart until re-enabled."); + output("\tNo request on this SegmentContainer can be processed until that time (OUTAGE ALERT)."); + if (!confirmContinue()) { + output("Not disabling anything at this time."); + return; + } + + try { + AtomicInteger count = new AtomicInteger(0); + // We may be competing with a rather active Log which updates its metadata quite frequently, so try a few + // times to acquire the ownership. + DISABLE_RETRY.run(() -> { + output("Acquiring ownership (attempt %d/%d) ...", count.incrementAndGet(), MAX_RETRIES); + log.disable(); + output("BookKeeperLog '%s' has been disabled.", logId); + return null; + }); + } catch (Exception ex) { + Throwable cause = ex; + if (cause instanceof RetriesExhaustedException && cause.getCause() != null) { + cause = cause.getCause(); + } + output("Disable failed: %s.", cause.getMessage()); + } + + output("Current metadata:"); + val m2 = log.fetchMetadata(); + outputLogSummary(logId, m2); + } + + static CommandDescriptor descriptor() { + return new CommandDescriptor(BookKeeperCommand.COMPONENT, "disable", + "Disables a BookKeeperLog by open-fencing it and updating its metadata in ZooKeeper (with the Enabled flag set to 'false').", + new ArgDescriptor("log-id", "Id of the log to disable.")); + } +} diff --git a/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/BookKeeperEnableCommand.java b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/BookKeeperEnableCommand.java new file mode 100644 index 00000000000..ffd67789d13 --- /dev/null +++ b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/BookKeeperEnableCommand.java @@ -0,0 +1,72 @@ +/** + * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.segmentstore.server.host.admin.commands; + +import lombok.Cleanup; +import lombok.val; + +/** + * Enables a previously disabled BookKeeperLog. + */ +public class BookKeeperEnableCommand extends BookKeeperCommand { + + /** + * Creates a new instance of the BookKeeperEnableCommand class. + * @param args The arguments for the command. + */ + BookKeeperEnableCommand(CommandArgs args) { + super(args); + } + + @Override + public void execute() throws Exception { + ensureArgCount(1); + int logId = getIntArg(0); + + @Cleanup + val context = createContext(); + @Cleanup + val log = context.logFactory.createDebugLogWrapper(logId); + + // Display a summary of the BookKeeperLog. + val m = log.fetchMetadata(); + outputLogSummary(logId, m); + if (m == null) { + // Nothing else to do. + return; + } else if (m.isEnabled()) { + output("BookKeeperLog '%s' is already enabled.", logId); + return; + } + + output("BookKeeperLog '%s' is about to be ENABLED.", logId); + if (!confirmContinue()) { + output("Not enabling anything at this time."); + return; + } + + try { + log.enable(); + output("BookKeeperLog '%s' has been enabled. It may take a few minutes for its SegmentContainer to resume operations.", logId); + } catch (Exception ex) { + output("Enable failed: " + ex.getMessage()); + } + + output("Current metadata:"); + val m2 = log.fetchMetadata(); + outputLogSummary(logId, m2); + } + + static CommandDescriptor descriptor() { + return new CommandDescriptor(BookKeeperCommand.COMPONENT, "enable", + "Enables a BookKeeperLog by updating its metadata in ZooKeeper (with the Enabled flag set to 'true').", + new ArgDescriptor("log-id", "Id of the log to enable.")); + } +} diff --git a/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/BookKeeperListCommand.java b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/BookKeeperListCommand.java new file mode 100644 index 00000000000..07485107391 --- /dev/null +++ b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/BookKeeperListCommand.java @@ -0,0 +1,46 @@ +/** + * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.segmentstore.server.host.admin.commands; + +import io.pravega.segmentstore.storage.impl.bookkeeper.DebugLogWrapper; +import lombok.Cleanup; +import lombok.val; + +/** + * Lists all BookKeeper Logs. + */ +class BookKeeperListCommand extends BookKeeperCommand { + /** + * Creates a new instance of the BookKeeperListCommand. + * @param args The arguments for the command. + */ + BookKeeperListCommand(CommandArgs args) { + super(args); + } + + @Override + public void execute() throws Exception { + ensureArgCount(0); + + // Loop through all known log ids and fetch their metadata. + @Cleanup + val context = createContext(); + for (int logId = 0; logId < context.serviceConfig.getContainerCount(); logId++) { + @Cleanup + DebugLogWrapper log = context.logFactory.createDebugLogWrapper(logId); + val m = log.fetchMetadata(); + outputLogSummary(logId, m); + } + } + + static CommandDescriptor descriptor() { + return new CommandDescriptor(BookKeeperCommand.COMPONENT, "list", "Lists all BookKeeper Logs."); + } +} diff --git a/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/Command.java b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/Command.java new file mode 100644 index 00000000000..d8755a992c0 --- /dev/null +++ b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/Command.java @@ -0,0 +1,271 @@ +/** + * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.segmentstore.server.host.admin.commands; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import io.pravega.common.Exceptions; +import io.pravega.segmentstore.server.store.ServiceConfig; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Scanner; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import lombok.AccessLevel; +import lombok.Data; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.val; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; + +/** + * Base class for any command to execute from the Admin tool. + */ +public abstract class Command { + //region Private + + @Getter(AccessLevel.PROTECTED) + private final CommandArgs commandArgs; + + @Getter(AccessLevel.PROTECTED) + private final PrintStream out = System.out; + + //endregion + + //region Constructor + + /** + * Creates a new instance of the Command class. + * + * @param args The arguments for the command. + */ + Command(CommandArgs args) { + this.commandArgs = Preconditions.checkNotNull(args, "commandArgs"); + } + + //endregion + + //region Command Implementation + + /** + * Executes the command with the arguments passed in via the Constructor. The command will allocate whatever resources + * it needs to execute and will clean up after its execution completes (successful or not). The only expected side + * effect may be the modification of the shared AdminCommandState that is passed in via the Constructor. + * + * @throws IllegalArgumentException If the arguments passed in via the Constructor are invalid. + * @throws Exception If the command failed to execute. + */ + public abstract void execute() throws Exception; + + /** + * Creates a new instance of the ServiceConfig class from the shared AdminCommandState passed in via the Constructor. + */ + protected ServiceConfig getServiceConfig() { + return getCommandArgs().getState().getConfigBuilder().build().getConfig(ServiceConfig::builder); + } + + /** + * Creates a new instance of the CuratorFramework class using configuration from the shared AdminCommandState. + */ + protected CuratorFramework createZKClient() { + val serviceConfig = getServiceConfig(); + CuratorFramework zkClient = CuratorFrameworkFactory + .builder() + .connectString(serviceConfig.getZkURL()) + .namespace("pravega/" + serviceConfig.getClusterName()) + .retryPolicy(new ExponentialBackoffRetry(serviceConfig.getZkRetrySleepMs(), serviceConfig.getZkRetryCount())) + .sessionTimeoutMs(serviceConfig.getZkSessionTimeoutMs()) + .build(); + zkClient.start(); + return zkClient; + } + + protected void output(String messageTemplate, Object... args) { + this.out.println(String.format(messageTemplate, args)); + } + + protected boolean confirmContinue() { + output("Do you want to continue?[yes|no]"); + Scanner s = new Scanner(System.in); + String input = s.nextLine(); + return input.equals("yes"); + } + + //endregion + + //region Arguments + + protected void ensureArgCount(int expectedCount) { + Preconditions.checkArgument(this.commandArgs.getArgs().size() == expectedCount, "Incorrect argument count."); + } + + protected int getIntArg(int index) { + return getArg(index, Integer::parseInt); + } + + protected long getLongArg(int index) { + return getArg(index, Long::parseLong); + } + + protected boolean getBooleanArg(int index) { + return getArg(index, Boolean::parseBoolean); + } + + private T getArg(int index, Function converter) { + String s = null; + try { + s = this.commandArgs.getArgs().get(index); + return converter.apply(s); + } catch (Exception ex) { + throw new IllegalArgumentException(String.format("Unexpected argument '%s' at position %d: %s.", s, index, ex.getMessage())); + } + } + + //endregion + + //region Descriptors + + /** + * Describes an argument. + */ + @RequiredArgsConstructor(access = AccessLevel.PROTECTED) + @Getter + public static class ArgDescriptor { + private final String name; + private final String description; + } + + /** + * Describes a Command. + */ + @Getter + public static class CommandDescriptor { + private final String component; + private final String name; + private final String description; + private final ArgDescriptor[] args; + CommandDescriptor(String component, String name, String description, ArgDescriptor... args) { + this.component = Exceptions.checkNotNullOrEmpty(component, "component"); + this.name = Exceptions.checkNotNullOrEmpty(name, "name"); + this.description = Exceptions.checkNotNullOrEmpty(description, "description"); + this.args = args; + } + } + + //endregion + + //region Factory + + /** + * Helps create new Command instances. + */ + public static class Factory { + private static final Map> COMMANDS = registerAll( + ImmutableMap., CommandCreator>builder() + .put(ConfigListCommand::descriptor, ConfigListCommand::new) + .put(ConfigSetCommand::descriptor, ConfigSetCommand::new) + .put(BookKeeperCleanupCommand::descriptor, BookKeeperCleanupCommand::new) + .put(BookKeeperListCommand::descriptor, BookKeeperListCommand::new) + .put(BookKeeperDetailsCommand::descriptor, BookKeeperDetailsCommand::new) + .put(BookKeeperEnableCommand::descriptor, BookKeeperEnableCommand::new) + .put(BookKeeperDisableCommand::descriptor, BookKeeperDisableCommand::new) + .put(ContainerRecoverCommand::descriptor, ContainerRecoverCommand::new) + .build()); + + /** + * Gets a Collection of CommandDescriptors for all registered commands. + * + * @return A new Collection. + */ + public static Collection getDescriptors() { + ArrayList result = new ArrayList<>(); + COMMANDS.values().forEach(g -> g.values().forEach(c -> result.add(c.getDescriptor()))); + return result; + } + + /** + * Gets a Collection of CommandDescriptors for all registered commands for the given component. + * + * @param component The component to query. + * @return A new Collection. + */ + public static Collection getDescriptors(String component) { + Map componentCommands = COMMANDS.getOrDefault(component, null); + return componentCommands == null + ? Collections.emptyList() + : componentCommands.values().stream().map(CommandInfo::getDescriptor).collect(Collectors.toList()); + } + + /** + * Gets a CommandDescriptor for the given commandArgs. + * + * @param component The name of the Component to get the descriptor for. + * @param command The name of the Command (within the Component) to get. + * @return The CommandDescriptor for the given argument, or null if no such command is registered. + */ + public static CommandDescriptor getDescriptor(String component, String command) { + CommandInfo ci = getCommand(component, command); + return ci == null ? null : ci.getDescriptor(); + } + + /** + * Gets a new instance of a Command for the given commandArgs. + * + * @param component The name of the Component to get the Command for. + * @param command The name of the Command (within the Component) to get. + * @param args CommandArgs for the command. + * @return A new instance of a Command base, already initialized with the command's commandArgs. + */ + public static Command get(String component, String command, CommandArgs args) { + CommandInfo ci = getCommand(component, command); + return ci == null ? null : ci.getCreator().apply(args); + } + + private static CommandInfo getCommand(String component, String command) { + Map componentCommands = COMMANDS.getOrDefault(component, null); + return componentCommands == null ? null : componentCommands.getOrDefault(command, null); + } + + private static Map> registerAll(Map, CommandCreator> items) { + val result = new HashMap>(); + for (val e : items.entrySet()) { + Command.CommandDescriptor d = e.getKey().get(); + Map componentCommands = result.getOrDefault(d.getComponent(), null); + if (componentCommands == null) { + componentCommands = new HashMap<>(); + result.put(d.getComponent(), componentCommands); + } + + if (componentCommands.putIfAbsent(d.getName(), new CommandInfo(d, e.getValue())) != null) { + throw new IllegalArgumentException(String.format("A command is already registered for '%s'-'%s'.", d.getComponent(), d.getName())); + } + } + return Collections.unmodifiableMap(result); + } + + @Data + private static class CommandInfo { + private final CommandDescriptor descriptor; + private final CommandCreator creator; + } + + @FunctionalInterface + private interface CommandCreator extends Function { + } + } + //endregion +} diff --git a/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/CommandArgs.java b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/CommandArgs.java new file mode 100644 index 00000000000..ea735a0c695 --- /dev/null +++ b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/CommandArgs.java @@ -0,0 +1,34 @@ +/** + * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.segmentstore.server.host.admin.commands; + +import com.google.common.base.Preconditions; +import java.util.List; +import lombok.Getter; + +/** + * Command arguments. + */ +@Getter +public class CommandArgs { + private final List args; + private final AdminCommandState state; + + /** + * Creates a new instance of the CommandArgs class. + * + * @param args The actual arguments to the command. + * @param state The shared AdminCommandState to pass to the command. + */ + public CommandArgs(List args, AdminCommandState state) { + this.args = Preconditions.checkNotNull(args, "args"); + this.state = Preconditions.checkNotNull(state, "state"); + } +} diff --git a/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/ConfigCommand.java b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/ConfigCommand.java new file mode 100644 index 00000000000..20ca9bbc27a --- /dev/null +++ b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/ConfigCommand.java @@ -0,0 +1,21 @@ +/** + * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.segmentstore.server.host.admin.commands; + +/** + * Base for all Config-related commands. + */ +abstract class ConfigCommand extends Command { + static final String COMPONENT = "config"; + + ConfigCommand(CommandArgs args) { + super(args); + } +} diff --git a/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/ConfigListCommand.java b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/ConfigListCommand.java new file mode 100644 index 00000000000..af142646c28 --- /dev/null +++ b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/ConfigListCommand.java @@ -0,0 +1,36 @@ +/** + * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.segmentstore.server.host.admin.commands; + +import com.google.common.base.Preconditions; + +/** + * Lists the contents of the shared Configuration. + */ +public class ConfigListCommand extends ConfigCommand { + /** + * Creates a new instance of the Command class. + * + * @param args The arguments for the command. + */ + public ConfigListCommand(CommandArgs args) { + super(args); + } + + @Override + public void execute() { + Preconditions.checkArgument(getCommandArgs().getArgs().size() == 0, "Not expecting any arguments."); + getCommandArgs().getState().getConfigBuilder().build().forEach((name, value) -> output("\t%s=%s", name, value)); + } + + static CommandDescriptor descriptor() { + return new CommandDescriptor(ConfigCommand.COMPONENT, "list", "Lists all configuration set during this session."); + } +} diff --git a/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/ConfigSetCommand.java b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/ConfigSetCommand.java new file mode 100644 index 00000000000..7c4bd2f9240 --- /dev/null +++ b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/ConfigSetCommand.java @@ -0,0 +1,49 @@ +/** + * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.segmentstore.server.host.admin.commands; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import java.util.Properties; + +/** + * Updates the shared AdminCommandState with new config values. + */ +class ConfigSetCommand extends Command { + /** + * Creates a new instance of the ConfigSetCommand class. + * + * @param args The arguments for the command. + */ + ConfigSetCommand(CommandArgs args) { + super(args); + } + + @Override + public void execute() { + Properties newValues = new Properties(); + getCommandArgs().getArgs().forEach(s -> { + String[] items = s.split("="); + Preconditions.checkArgument(items.length == 2, "Invalid name=value pair: '%s'.", s); + Preconditions.checkArgument(!Strings.isNullOrEmpty(items[0]) && !Strings.isNullOrEmpty(items[1]), + "Invalid name=value pair: '%s'.", s); + newValues.setProperty(items[0], items[1]); + }); + + Preconditions.checkArgument(newValues.size() > 0, "Expecting at least one argument."); + getCommandArgs().getState().getConfigBuilder().include(newValues); + } + + static CommandDescriptor descriptor() { + return new CommandDescriptor(ConfigCommand.COMPONENT, "set", + "Sets one or more config values for use during this session.", + new ArgDescriptor("name=value list", "Space-separated name=value pairs.")); + } +} diff --git a/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/ContainerCommand.java b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/ContainerCommand.java new file mode 100644 index 00000000000..cbf2dc63613 --- /dev/null +++ b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/ContainerCommand.java @@ -0,0 +1,71 @@ +/** + * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.segmentstore.server.host.admin.commands; + +import io.pravega.segmentstore.server.containers.ContainerConfig; +import io.pravega.segmentstore.server.store.ServiceConfig; +import io.pravega.segmentstore.storage.DurableDataLogException; +import io.pravega.segmentstore.storage.impl.bookkeeper.BookKeeperConfig; +import io.pravega.segmentstore.storage.impl.bookkeeper.BookKeeperLogFactory; +import lombok.val; +import org.apache.bookkeeper.client.BookKeeperAdmin; +import org.apache.curator.framework.CuratorFramework; + +/** + * Base for Container admin commands. + */ +abstract class ContainerCommand extends BookKeeperCommand { + static final String COMPONENT = "container"; + + ContainerCommand(CommandArgs args) { + super(args); + } + + /** + * Creates a new Context to be used by the BookKeeper command. + * + * @return A new Context. + * @throws DurableDataLogException If the BookKeeperLogFactory could not be initialized. + */ + @Override + protected Context createContext() throws DurableDataLogException { + val serviceConfig = getServiceConfig(); + val containerConfig = getCommandArgs().getState().getConfigBuilder().build().getConfig(ContainerConfig::builder); + val bkConfig = getCommandArgs().getState().getConfigBuilder() + .include(BookKeeperConfig.builder().with(BookKeeperConfig.ZK_ADDRESS, serviceConfig.getZkURL())) + .build().getConfig(BookKeeperConfig::builder); + val zkClient = createZKClient(); + val factory = new BookKeeperLogFactory(bkConfig, zkClient, getCommandArgs().getState().getExecutor()); + try { + factory.initialize(); + } catch (DurableDataLogException ex) { + zkClient.close(); + throw ex; + } + + val bkAdmin = new BookKeeperAdmin(factory.getBookKeeperClient()); + return new Context(serviceConfig, containerConfig, bkConfig, zkClient, factory, bkAdmin); + } + + protected static class Context extends BookKeeperCommand.Context { + final ContainerConfig containerConfig; + + protected Context(ServiceConfig serviceConfig, ContainerConfig containerConfig, BookKeeperConfig bookKeeperConfig, + CuratorFramework zkClient, BookKeeperLogFactory logFactory, BookKeeperAdmin bkAdmin) { + super(serviceConfig, bookKeeperConfig, zkClient, logFactory, bkAdmin); + this.containerConfig = containerConfig; + } + + @Override + public void close() { + super.close(); + } + } +} diff --git a/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/ContainerRecoverCommand.java b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/ContainerRecoverCommand.java new file mode 100644 index 00000000000..e53829306c5 --- /dev/null +++ b/segmentstore/server/host/src/main/java/io/pravega/segmentstore/server/host/admin/commands/ContainerRecoverCommand.java @@ -0,0 +1,165 @@ +/** + * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.segmentstore.server.host.admin.commands; + +import com.google.common.base.Strings; +import io.pravega.common.Exceptions; +import io.pravega.common.util.ByteArraySegment; +import io.pravega.segmentstore.server.DataCorruptionException; +import io.pravega.segmentstore.server.logs.DataFrame; +import io.pravega.segmentstore.server.logs.DebugRecoveryProcessor; +import io.pravega.segmentstore.server.logs.operations.Operation; +import io.pravega.segmentstore.server.reading.ReadIndexConfig; +import io.pravega.segmentstore.storage.DurableDataLog; +import java.util.Collection; +import java.util.List; +import lombok.Cleanup; +import lombok.val; + +/** + * Executes the recovery process for a particular container, using a no-op Cache and Storage. + */ +public class ContainerRecoverCommand extends ContainerCommand { + /** + * Creates a new instance of the ContainerRecoverCommand. + * + * @param args The arguments for the command. + */ + ContainerRecoverCommand(CommandArgs args) { + super(args); + } + + @Override + public void execute() throws Exception { + ensureArgCount(1); + int containerId = getIntArg(0); + @Cleanup + val context = createContext(); + val readIndexConfig = getCommandArgs().getState().getConfigBuilder().build().getConfig(ReadIndexConfig::builder); + + // We create a special "read-only" BK log that will not be doing fencing or otherwise interfere with an active + // container. As a result, due to the nature of BK, it is possible that it may not contain all the latest writes + // since the Bookies may not have yet synchronized the LAC on the last (active ledger). + @Cleanup + val log = context.logFactory.createDebugLogWrapper(containerId); + val bkLog = log.asReadOnly(); + + val recoveryState = new RecoveryState(); + val callbacks = new DebugRecoveryProcessor.OperationCallbacks( + recoveryState::newOperation, + op -> recoveryState.operationComplete(op, null), + recoveryState::operationComplete); + + @Cleanup + val rp = DebugRecoveryProcessor.create(containerId, bkLog, context.containerConfig, readIndexConfig, getCommandArgs().getState().getExecutor(), callbacks); + try { + rp.performRecovery(); + output("Recovery complete: %d DataFrame(s) containing %d Operation(s).", + recoveryState.dataFrameCount, recoveryState.operationCount); + } catch (Exception ex) { + output("Recovery FAILED: %d DataFrame(s) containing %d Operation(s) were able to be recovered.", + recoveryState.dataFrameCount, recoveryState.operationCount); + ex.printStackTrace(getOut()); + Throwable cause = Exceptions.unwrap(ex); + if (cause instanceof DataCorruptionException) { + unwrapDataCorruptionException((DataCorruptionException) cause); + } + } + } + + private void unwrapDataCorruptionException(DataCorruptionException dce) { + Object[] context = dce.getAdditionalData(); + if (context == null || context.length == 0) { + return; + } + for (int i = 0; i < context.length; i++) { + Object c = context[i]; + output("Debug Info %d/%d:", i + 1, context.length); + outputDebugObject(c, 1); + } + } + + private void outputDebugObject(Object c, int indent) { + String prefix = Strings.repeat("\t", indent) + c.getClass().getSimpleName(); + if (c instanceof Collection) { + Collection items = (Collection) c; + output("%s(%d):", prefix, items.size()); + for (Object o : items) { + outputDebugObject(o, indent + 1); + } + } else if (c instanceof DataFrame.DataFrameEntry) { + val dfe = (DataFrame.DataFrameEntry) c; + output("%s: Address={%s}, First/LastRecordEntry=%s/%s, LastInDF=%s, DF.Offset/Length=%d/%d.", + prefix, dfe.getFrameAddress(), dfe.isFirstRecordEntry(), dfe.isLastRecordEntry(), + dfe.isLastEntryInDataFrame(), dfe.getData().arrayOffset(), dfe.getData().getLength()); + } else if (c instanceof DataFrame) { + val df = (DataFrame) c; + output("%s: Address={%s}, Length=%s.", prefix, df.getAddress(), df.getLength()); + } else if (c instanceof DurableDataLog.ReadItem) { + val ri = (DurableDataLog.ReadItem) c; + output("%s: Address={%s}, Length=%s.", prefix, ri.getAddress(), ri.getLength()); + } else { + output("%s: %s.", prefix, c); + } + } + + static CommandDescriptor descriptor() { + return new CommandDescriptor(ContainerCommand.COMPONENT, "recover", + "Executes a local, non-invasive recovery for a SegmentContainer.", + new ArgDescriptor("container-id", "Id of the SegmentContainer to recover.")); + } + + //region RecoveryState + + private class RecoveryState { + private Operation currentOperation; + private int dataFrameCount = 0; + private int operationCount = 0; + private int currentFrameUsedLength = 0; + + private void newOperation(Operation op, List frameEntries) { + for (int i = 0; i < frameEntries.size(); i++) { + DataFrame.DataFrameEntry e = frameEntries.get(i); + if (this.currentFrameUsedLength == 0) { + output("Begin DataFrame: %s.", e.getFrameAddress()); + this.dataFrameCount++; + } + + ByteArraySegment data = e.getData(); + this.currentFrameUsedLength += data.getLength(); + String split = frameEntries.size() <= 1 ? "" : String.format(",#%d/%d", i + 1, frameEntries.size()); + output("\t@[%s,%s%s]: %s.", data.arrayOffset(), data.getLength(), split, op); + + if (e.isLastEntryInDataFrame()) { + int totalLength = data.arrayOffset() + data.getLength(); + output("End DataFrame: Length=%d/%d.\n", this.currentFrameUsedLength, totalLength); + this.currentFrameUsedLength = 0; + } + } + + this.currentOperation = op; + this.operationCount++; + } + + private void operationComplete(Operation op, Throwable failure) { + if (this.currentOperation == null || this.currentOperation.getSequenceNumber() != op.getSequenceNumber()) { + output("Operation completion mismatch. Expected '%s', found '%s'.", this.currentOperation, op); + } + + // We don't output anything for non-failed operations. + if (failure != null) { + output("\tOperation '%s' FAILED recovery.", this.currentOperation); + failure.printStackTrace(getOut()); + } + } + } + + //endregion +} diff --git a/segmentstore/server/src/main/java/io/pravega/segmentstore/server/DataCorruptionException.java b/segmentstore/server/src/main/java/io/pravega/segmentstore/server/DataCorruptionException.java index 1539b854ed1..40eb7ff7c03 100644 --- a/segmentstore/server/src/main/java/io/pravega/segmentstore/server/DataCorruptionException.java +++ b/segmentstore/server/src/main/java/io/pravega/segmentstore/server/DataCorruptionException.java @@ -10,6 +10,7 @@ package io.pravega.segmentstore.server; import io.pravega.segmentstore.contracts.StreamingException; +import lombok.Getter; /** * Exception that is thrown whenever we detect an unrecoverable data corruption. @@ -17,19 +18,40 @@ */ public class DataCorruptionException extends StreamingException { /** - * + * */ private static final long serialVersionUID = 1L; - public DataCorruptionException(String message) { - super(message); + /** + * Gets an array of objects that may contain additional context-related information. + */ + @Getter + private final Object[] additionalData; + + /** + * Creates a new instance of the DataCorruptionException class. + * + * @param message The message for the exception. + * @param additionalData An array of objects that contain additional debugging information. + */ + public DataCorruptionException(String message, Object... additionalData) { + this(message, null, additionalData); } - public DataCorruptionException(String message, Throwable cause) { + /** + * Creates a new instance of the DataCorruptionException class. + * + * @param message The message for the exception. + * @param cause The causing exception. + * @param additionalData An array of objects that contain additional debugging information. + */ + public DataCorruptionException(String message, Throwable cause, Object... additionalData) { super(message, cause); + this.additionalData = additionalData; } public DataCorruptionException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); + this.additionalData = null; } } diff --git a/segmentstore/server/src/main/java/io/pravega/segmentstore/server/logs/DataFrame.java b/segmentstore/server/src/main/java/io/pravega/segmentstore/server/logs/DataFrame.java index 41a01c481d6..cda2a3c0756 100644 --- a/segmentstore/server/src/main/java/io/pravega/segmentstore/server/logs/DataFrame.java +++ b/segmentstore/server/src/main/java/io/pravega/segmentstore/server/logs/DataFrame.java @@ -125,7 +125,7 @@ static DataFrame wrap(ByteArraySegment target) { * When reading frames from a source (read mode), this value may be less than the size of the source. * This value is serialized with the frame. */ - int getLength() { + public int getLength() { return this.header.getSerializationLength() + this.header.getContentLength(); } @@ -559,7 +559,7 @@ private byte encodeFlags() { /** * Represents an Entry in the DataFrame. */ - static class DataFrameEntry { + public static class DataFrameEntry { /** * whether this is the first entry for a record. */ diff --git a/segmentstore/server/src/main/java/io/pravega/segmentstore/server/logs/DataFrameReader.java b/segmentstore/server/src/main/java/io/pravega/segmentstore/server/logs/DataFrameReader.java index b6ae5ee160e..2e4237a31ae 100644 --- a/segmentstore/server/src/main/java/io/pravega/segmentstore/server/logs/DataFrameReader.java +++ b/segmentstore/server/src/main/java/io/pravega/segmentstore/server/logs/DataFrameReader.java @@ -13,7 +13,6 @@ import com.google.common.collect.Iterators; import io.pravega.common.Exceptions; import io.pravega.common.io.SerializationException; -import io.pravega.common.util.ByteArraySegment; import io.pravega.common.util.CloseableIterator; import io.pravega.segmentstore.server.DataCorruptionException; import io.pravega.segmentstore.server.LogItem; @@ -26,6 +25,7 @@ import java.io.InputStream; import java.io.SequenceInputStream; import java.util.LinkedList; +import java.util.List; import java.util.stream.Stream; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -113,7 +113,7 @@ public ReadResult getNext() throws Exception { this.lastReadSequenceNumber = seqNo; return new ReadResult<>(logItem, segments); } catch (IOException ex) { - throw new DataCorruptionException("Deserialization failed.", ex); + throw new DataCorruptionException("Deserialization failed.", ex, segments.segments); } // Any other exceptions are considered to be non-DataCorruption. } @@ -164,13 +164,13 @@ private SegmentCollection getNextOperationSegments() throws Exception { // this in the middle of a log, we very likely have some sort of corruption. throw new DataCorruptionException(String.format("Found a DataFrameEntry which is not marked as " + "'First Record Entry', but no active record is being read. DataFrameAddress = %s", - nextEntry.getFrameAddress())); + nextEntry.getFrameAddress()), result.segments, nextEntry); } continue; } - // Add the current entry's contents to the result. - result.add(nextEntry.getData(), nextEntry.getFrameAddress(), nextEntry.isLastEntryInDataFrame()); + // Add the current entry to the result. + result.add(nextEntry); if (nextEntry.isLastRecordEntry()) { // We are done. We found the last entry for a record. @@ -195,6 +195,12 @@ public static class ReadResult { @Getter private final T item; + /** + * An ordered list of DataFrame.DataFrameEntry objects representing the actual serialization of this ReadResult item. + */ + @Getter + private final List frameEntries; + /** * The Address of the Last Data Frame containing the LogItem. If the LogItem fits on exactly one DataFrame, this * will contain the Address for that Data Frame; if it spans multiple data frames, this stores the last data frame address. @@ -225,6 +231,7 @@ public static class ReadResult { */ protected ReadResult(T logItem, SegmentCollection segmentCollection) { this.item = logItem; + this.frameEntries = segmentCollection.segments; this.lastUsedDataFrameAddress = segmentCollection.getLastUsedDataFrameAddress(); this.lastFullDataFrameAddress = segmentCollection.getLastFullDataFrameAddress(); this.lastFrameEntry = segmentCollection.isLastFrameEntry(); @@ -238,13 +245,17 @@ public String toString() { //endregion + //region + + //endregion + //region SegmentCollection /** * A collection of ByteArraySegments that, together, make up the serialization for a Log Operation. */ private static class SegmentCollection { - private final LinkedList segments; + private final LinkedList segments; private LogAddress lastUsedDataFrameAddress; private LogAddress lastFullDataFrameAddress; private boolean lastFrameEntry; @@ -260,28 +271,28 @@ private static class SegmentCollection { /** * Adds a new segment to the collection. * - * @param segment The segment to append. - * @param dataFrameAddress The Address for the Data Frame containing the segment. - * @param lastFrameEntry Whether this segment is the last entry in the Data Frame. + * @param entry The DataFrameEntry whose data to append.. * @throws NullPointerException If segment is null. * @throws IllegalArgumentException If lastUsedDataFrameSequence is invalid. */ - public void add(ByteArraySegment segment, LogAddress dataFrameAddress, boolean lastFrameEntry) throws DataCorruptionException { - Preconditions.checkNotNull(segment, "segment"); + public void add(DataFrame.DataFrameEntry entry) throws DataCorruptionException { + Preconditions.checkNotNull(entry, "entry"); + LogAddress dataFrameAddress = entry.getFrameAddress(); long dataFrameSequence = dataFrameAddress.getSequence(); if (this.lastUsedDataFrameAddress != null && dataFrameSequence < this.lastUsedDataFrameAddress.getSequence()) { - throw new DataCorruptionException(String.format("Invalid DataFrameSequence. Expected at least '%d', found '%d'.", this.lastUsedDataFrameAddress.getSequence(), dataFrameSequence)); + throw new DataCorruptionException(String.format("Invalid DataFrameSequence. Expected at least '%d', found '%d'.", + this.lastUsedDataFrameAddress.getSequence(), dataFrameSequence), this.segments, entry); } - if (lastFrameEntry) { + if (entry.isLastEntryInDataFrame()) { // This is the last segment in this DataFrame, so we need to set the lastFullDataFrameAddress to the right value. this.lastFullDataFrameAddress = dataFrameAddress; } this.lastUsedDataFrameAddress = dataFrameAddress; - this.lastFrameEntry = lastFrameEntry; - this.segments.add(segment); + this.lastFrameEntry = entry.isLastEntryInDataFrame(); + this.segments.add(entry); } /** @@ -305,7 +316,7 @@ void clear() { * Returns an InputStream that reads from all ByteArraySegments making up this collection. */ InputStream getInputStream() { - Stream ss = this.segments.stream().map(ByteArraySegment::getReader); + Stream ss = this.segments.stream().map(e -> e.getData().getReader()); return new SequenceInputStream(Iterators.asEnumeration(ss.iterator())); } @@ -334,6 +345,13 @@ boolean isLastFrameEntry() { return this.lastFrameEntry; } + /** + * Gets a collection of DataFrame.DataFrameEntry objects that make up this Segment Collection. + */ + List getSegments() { + return this.segments; + } + @Override public String toString() { return String.format("Count = %d, LastUsedDataFrameSeq = %d, LastFullDataFrameSequence = %d, LastFrameEntry = %s", @@ -420,7 +438,7 @@ public DataFrame.DataFrameEntry getNext() throws Exception { } else { // The DataFrameEnumerator should not return empty frames. We can either go in a loop and try to get next, // or throw (which is correct, since we rely on DataFrameEnumerator to behave correctly. - throw new DataCorruptionException("Found empty DataFrame when non-empty was expected."); + throw new DataCorruptionException("Found empty DataFrame when non-empty was expected.", dataFrame); } } } @@ -439,9 +457,8 @@ private static class DataFrameEnumerator implements CloseableIterator reader; + private final CloseableIterator reader; //endregion @@ -456,13 +473,8 @@ private static class DataFrameEnumerator implements CloseableIterator(), readIndexFactory.createReadIndex(metadata, storage))); + this.readIndexFactory = readIndexFactory; + this.storage = storage; + this.callbacks = callbacks; + } + + @Override + public void close() { + this.readIndexFactory.close(); + this.storage.close(); + } + + /** + * Creates a new instance of the DebugRecoveryProcessor class with the given arguments. + * + * @param containerId The Id of the Container to recover. + * @param durableDataLog A DurableDataLog to recover from. + * @param config A ContainerConfig to use during recovery. + * @param readIndexConfig A ReadIndexConfig to use during recovery. + * @param executor An Executor to use for background tasks. + * @param callbacks Callbacks to invoke during recovery. + * @return A new instance of the DebugRecoveryProcessor. + */ + public static DebugRecoveryProcessor create(int containerId, DurableDataLog durableDataLog, ContainerConfig config, ReadIndexConfig readIndexConfig, + ScheduledExecutorService executor, OperationCallbacks callbacks) { + Preconditions.checkNotNull(durableDataLog, "durableDataLog"); + Preconditions.checkNotNull(config, "config"); + Preconditions.checkNotNull(readIndexConfig, "readIndexConfig"); + Preconditions.checkNotNull(executor, "executor"); + Preconditions.checkNotNull(callbacks, callbacks); + + StreamSegmentContainerMetadata metadata = new StreamSegmentContainerMetadata(containerId, config.getMaxActiveSegmentCount()); + ContainerReadIndexFactory rf = new ContainerReadIndexFactory(readIndexConfig, new NoOpCacheFactory(), executor); + Storage s = new InMemoryStorageFactory(executor).createStorageAdapter(); + return new DebugRecoveryProcessor(metadata, durableDataLog, rf, s, callbacks); + } + + //endregion + + //region RecoveryProcessor Overrides + + @Override + protected void recoverOperation(DataFrameReader.ReadResult readResult, OperationMetadataUpdater metadataUpdater) throws DataCorruptionException { + if (this.callbacks.beginRecoverOperation != null) { + Callbacks.invokeSafely(this.callbacks.beginRecoverOperation, readResult.getItem(), readResult.getFrameEntries(), null); + } + + try { + super.recoverOperation(readResult, metadataUpdater); + } catch (Throwable ex) { + if (this.callbacks.operationFailed != null) { + Callbacks.invokeSafely(this.callbacks.operationFailed, readResult.getItem(), ex, null); + } + + throw ex; + } + + if (this.callbacks.operationSuccess != null) { + Callbacks.invokeSafely(this.callbacks.operationSuccess, readResult.getItem(), null); + } + } + + //endregion + + //region OperationCallbacks + + /** + * Callbacks to pass in to the DebugRecoveryProcessor that will be invoked during recovery. + */ + @RequiredArgsConstructor + public static class OperationCallbacks { + /** + * Invoked before attempting to recover an operation. Args: Operation, DataFrameEntries making up that operation. + */ + private final BiConsumer> beginRecoverOperation; + + /** + * Invoked when an operation was successfully recovered. + */ + private final Consumer operationSuccess; + + /** + * Invoked when an operation failed to recover. + */ + private final BiConsumer operationFailed; + } + + //endregion + + //region NoOpCache + + private static class NoOpCacheFactory implements CacheFactory { + @Override + public Cache getCache(String id) { + return new NoOpCache(); + } + + @Override + public void close() { + // Nothing to do. + } + } + + private static class NoOpCache implements Cache { + @Override + public String getId() { + return null; + } + + @Override + public void insert(Key key, byte[] data) { + // Nothing to do. + } + + @Override + public void insert(Key key, ByteArraySegment data) { + // Nothing to do. + } + + @Override + public byte[] get(Key key) { + // This should not be invoked from within a DebugRecoveryProcessor. + throw new UnsupportedOperationException(); + } + + @Override + public void remove(Key key) { + // Nothing to do. + } + + @Override + public void close() { + // Nothing to do. + } + } + + //endregion +} diff --git a/segmentstore/server/src/main/java/io/pravega/segmentstore/server/logs/DurableLog.java b/segmentstore/server/src/main/java/io/pravega/segmentstore/server/logs/DurableLog.java index 24a5a5b7b7e..5b3754714c6 100644 --- a/segmentstore/server/src/main/java/io/pravega/segmentstore/server/logs/DurableLog.java +++ b/segmentstore/server/src/main/java/io/pravega/segmentstore/server/logs/DurableLog.java @@ -21,8 +21,6 @@ import io.pravega.common.concurrent.Services; import io.pravega.common.util.Retry; import io.pravega.common.util.SequencedItemList; -import io.pravega.segmentstore.contracts.ContainerException; -import io.pravega.segmentstore.contracts.StreamSegmentException; import io.pravega.segmentstore.contracts.StreamingException; import io.pravega.segmentstore.server.ContainerOfflineException; import io.pravega.segmentstore.server.DataCorruptionException; @@ -55,6 +53,7 @@ import java.util.stream.Collectors; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; /** @@ -207,6 +206,45 @@ private CompletableFuture tryStartOnce() { .thenComposeAsync(v -> anyItemsRecovered ? CompletableFuture.completedFuture(null) : queueMetadataCheckpoint(), this.executor)); } + @SneakyThrows(Exception.class) + private boolean performRecovery() { + // Make sure we are in the correct state. We do not want to do recovery while we are in full swing. + Preconditions.checkState(state() == State.STARTING || (state() == State.RUNNING && isOffline()), "Invalid State for recovery."); + + this.operationProcessor.getMetrics().operationLogInit(); + Timer timer = new Timer(); + try { + // Initialize the DurableDataLog, which will acquire its lock and ensure we are the only active users of it. + this.durableDataLog.initialize(RECOVERY_TIMEOUT); + + // Initiate the recovery. + RecoveryProcessor p = new RecoveryProcessor(this.metadata, this.durableDataLog, this.operationFactory, this.memoryStateUpdater); + int recoveredItemCount = p.performRecovery(); + this.operationProcessor.getMetrics().operationsCompleted(recoveredItemCount, timer.getElapsed()); + + // Verify that the Recovery Processor has left the metadata in a non-recovery mode. + Preconditions.checkState(!this.metadata.isRecoveryMode(), "Recovery completed but Metadata is still in Recovery Mode."); + return recoveredItemCount > 0; + } catch (Exception ex) { + log.error("{} Recovery FAILED.", this.traceObjectId, ex); + if (Exceptions.unwrap(ex) instanceof DataCorruptionException) { + // DataCorruptionException during recovery means we will be unable to execute the recovery successfully + // regardless how many times we try. We need to disable the log so that future instances of this class + // will not attempt to do so indefinitely (which could wipe away useful debugging information before + // someone can manually fix the problem). + try { + this.durableDataLog.disable(); + log.info("{} Log disabled due to DataCorruptionException during recovery.", this.traceObjectId); + } catch (Exception disableEx) { + log.warn("{}: Unable to disable log after DataCorruptionException during recovery.", this.traceObjectId, disableEx); + ex.addSuppressed(disableEx); + } + } + + throw ex; + } + } + @Override protected void doStop() { long traceId = LoggerHelpers.traceEnterWithContext(log, traceObjectId, "doStop"); @@ -361,163 +399,6 @@ public CompletableFuture awaitOnline() { //endregion - //region Recovery - - private boolean performRecovery() { - // Make sure we are in the correct state. We do not want to do recovery while we are in full swing. - Preconditions.checkState(state() == State.STARTING || (state() == State.RUNNING && isOffline()), - "Invalid State for recovery."); - - long traceId = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "performRecovery"); - Timer timer = new Timer(); - log.info("{} Recovery started.", this.traceObjectId); - - // Put metadata (and entire container) into 'Recovery Mode'. - this.metadata.enterRecoveryMode(); - - // Reset metadata. - this.metadata.reset(); - this.operationProcessor.getMetrics().operationLogInit(); - - OperationMetadataUpdater metadataUpdater = new OperationMetadataUpdater(this.metadata); - this.memoryStateUpdater.enterRecoveryMode(metadataUpdater); - - boolean successfulRecovery = false; - int recoveredItemCount; - try { - try { - this.durableDataLog.initialize(RECOVERY_TIMEOUT); - recoveredItemCount = recoverFromDataFrameLog(metadataUpdater); - this.metadata.setContainerEpoch(this.durableDataLog.getEpoch()); - log.info("{} Recovery completed. Epoch = {}, Items Recovered = {}, Time = {}ms.", this.traceObjectId, - this.metadata.getContainerEpoch(), recoveredItemCount, timer.getElapsedMillis()); - successfulRecovery = true; - } finally { - // We must exit recovery mode when done, regardless of outcome. - this.metadata.exitRecoveryMode(); - this.memoryStateUpdater.exitRecoveryMode(successfulRecovery); - } - } catch (Exception ex) { - // Both the inner try and finally blocks above can throw, so we need to catch both of those cases here. - log.error("{} Recovery FAILED.", this.traceObjectId, ex); - if (Exceptions.unwrap(ex) instanceof DataCorruptionException) { - // DataCorruptionException during recovery means we will be unable to execute the recovery successfully - // regardless how many times we try. We need to disable the log so that future instances of this class - // will not attempt to do so indefinitely (which could wipe away useful debugging information before - // someone can manually fix the problem). - try { - this.durableDataLog.disable(); - log.info("{} Log disabled due to DataCorruptionException during recovery.", this.traceObjectId); - } catch (Exception disableEx) { - log.warn("{}: Unable to disable log after DataCorruptionException during recovery.", this.traceObjectId, disableEx); - ex.addSuppressed(disableEx); - } - } - - throw new CompletionException(ex); - } - - this.operationProcessor.getMetrics().operationsCompleted(recoveredItemCount, timer.getElapsed()); - LoggerHelpers.traceLeave(log, this.traceObjectId, "performRecovery", traceId); - return recoveredItemCount > 0; - } - - /** - * Recovers the Operations from the DurableLog using the given OperationMetadataUpdater. Searches the DurableDataLog - * until the first MetadataCheckpointOperation is encountered. All Operations prior to this one are skipped over. - * Recovery starts with the first MetadataCheckpointOperation and runs until the end of the DurableDataLog is reached. - * Subsequent MetadataCheckpointOperations are ignored (as they contain redundant information - which has already - * been built up using the Operations up to them). - * - * @param metadataUpdater The OperationMetadataUpdater to use for updates. - * @return The number of Operations recovered. - */ - private int recoverFromDataFrameLog(OperationMetadataUpdater metadataUpdater) throws Exception { - long traceId = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "recoverFromDataFrameLog"); - int skippedOperationCount = 0; - int skippedDataFramesCount = 0; - int recoveredItemCount = 0; - - // Read all entries from the DataFrameLog and append them to the InMemoryOperationLog. - // Also update metadata along the way. - try (DataFrameReader reader = new DataFrameReader<>(this.durableDataLog, this.operationFactory, getId())) { - DataFrameReader.ReadResult readResult; - - // We can only recover starting from a MetadataCheckpointOperation; find the first one. - while (true) { - // Fetch the next operation. - readResult = reader.getNext(); - if (readResult == null) { - // We have reached the end and have not found any MetadataCheckpointOperations. - log.warn("{}: Reached the end of the DataFrameLog and could not find any MetadataCheckpointOperations after reading {} Operations and {} Data Frames.", this.traceObjectId, skippedOperationCount, skippedDataFramesCount); - break; - } else if (readResult.getItem() instanceof MetadataCheckpointOperation) { - // We found a checkpoint. Start recovering from here. - log.info("{}: Starting recovery from Sequence Number {} (skipped {} Operations and {} Data Frames).", this.traceObjectId, readResult.getItem().getSequenceNumber(), skippedOperationCount, skippedDataFramesCount); - break; - } else if (readResult.isLastFrameEntry()) { - skippedDataFramesCount++; - } - - skippedOperationCount++; - log.debug("{}: Not recovering operation because no MetadataCheckpointOperation encountered so far ({}).", this.traceObjectId, readResult.getItem()); - } - - // Now continue with the recovery from here. - while (readResult != null) { - recordTruncationMarker(readResult); - recoverOperation(readResult.getItem(), metadataUpdater); - recoveredItemCount++; - - // Fetch the next operation. - readResult = reader.getNext(); - } - } - - // Commit whatever changes we have in the metadata updater to the Container Metadata. - // This code will only be invoked if we haven't encountered any exceptions during recovery. - metadataUpdater.commitAll(); - LoggerHelpers.traceLeave(log, this.traceObjectId, "recoverFromDataFrameLog", traceId, recoveredItemCount); - return recoveredItemCount; - } - - private void recoverOperation(Operation operation, OperationMetadataUpdater metadataUpdater) throws DataCorruptionException { - // Update Metadata Sequence Number. - metadataUpdater.setOperationSequenceNumber(operation.getSequenceNumber()); - - // Update the metadata with the information from the Operation. - try { - log.debug("{} Recovering {}.", this.traceObjectId, operation); - metadataUpdater.preProcessOperation(operation); - metadataUpdater.acceptOperation(operation); - } catch (StreamSegmentException | ContainerException ex) { - // Metadata updates failures should not happen during recovery. - throw new DataCorruptionException(String.format("Unable to update metadata for Log Operation %s", operation), ex); - } - - // Update in-memory structures. - this.memoryStateUpdater.process(operation); - } - - private void recordTruncationMarker(DataFrameReader.ReadResult readResult) { - // Truncation Markers are stored directly in the ContainerMetadata. There is no need for an OperationMetadataUpdater - // to do this. - // Determine and record Truncation Markers, but only if the current operation spans multiple DataFrames - // or it's the last entry in a DataFrame. - LogAddress lastFullAddress = readResult.getLastFullDataFrameAddress(); - LogAddress lastUsedAddress = readResult.getLastUsedDataFrameAddress(); - if (lastFullAddress != null && lastFullAddress.getSequence() != lastUsedAddress.getSequence()) { - // This operation spans multiple DataFrames. The TruncationMarker should be set on the last DataFrame - // that ends with a part of it. - this.metadata.recordTruncationMarker(readResult.getItem().getSequenceNumber(), lastFullAddress); - } else if (readResult.isLastFrameEntry()) { - // The operation was the last one in the frame. This is a Truncation Marker. - this.metadata.recordTruncationMarker(readResult.getItem().getSequenceNumber(), lastUsedAddress); - } - } - - //endregion - //region Helpers private void ensureRunning() { diff --git a/segmentstore/server/src/main/java/io/pravega/segmentstore/server/logs/RecoveryProcessor.java b/segmentstore/server/src/main/java/io/pravega/segmentstore/server/logs/RecoveryProcessor.java new file mode 100644 index 00000000000..3d211d87495 --- /dev/null +++ b/segmentstore/server/src/main/java/io/pravega/segmentstore/server/logs/RecoveryProcessor.java @@ -0,0 +1,211 @@ +/** + * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.segmentstore.server.logs; + +import com.google.common.base.Preconditions; +import io.pravega.common.LoggerHelpers; +import io.pravega.common.Timer; +import io.pravega.segmentstore.contracts.ContainerException; +import io.pravega.segmentstore.contracts.StreamSegmentException; +import io.pravega.segmentstore.server.DataCorruptionException; +import io.pravega.segmentstore.server.LogItemFactory; +import io.pravega.segmentstore.server.UpdateableContainerMetadata; +import io.pravega.segmentstore.server.logs.operations.MetadataCheckpointOperation; +import io.pravega.segmentstore.server.logs.operations.Operation; +import io.pravega.segmentstore.storage.DurableDataLog; +import io.pravega.segmentstore.storage.LogAddress; +import lombok.extern.slf4j.Slf4j; + +/** + * Helper class (for the DurableLog) that is used to execute the recovery process. + */ +@Slf4j +class RecoveryProcessor { + //region Members + + private final UpdateableContainerMetadata metadata; + private final DurableDataLog durableDataLog; + private final LogItemFactory operationFactory; + private final MemoryStateUpdater stateUpdater; + private final String traceObjectId; + + //endregion + + //region Constructor + + /** + * Creates a new instance of the RecoveryProcessor class. + * + * @param metadata The UpdateableContainerMetadata to use for recovery. + * @param durableDataLog The (uninitialized) DurableDataLog to read data from for recovery. + * @param operationFactory A LogItemFactory that can create Operations. + * @param stateUpdater A MemoryStateUpdater that can be used to apply the recovered operations. + */ + RecoveryProcessor(UpdateableContainerMetadata metadata, DurableDataLog durableDataLog, LogItemFactory operationFactory, + MemoryStateUpdater stateUpdater) { + this.metadata = Preconditions.checkNotNull(metadata, "metadata"); + this.durableDataLog = Preconditions.checkNotNull(durableDataLog, "durableDataLog"); + this.operationFactory = Preconditions.checkNotNull(operationFactory, "operationFactory"); + this.stateUpdater = Preconditions.checkNotNull(stateUpdater, "stateUpdater"); + this.traceObjectId = String.format("RecoveryProcessor[%s]", this.metadata.getContainerId()); + } + + //endregion + + //region Operations + + /** + * Executes a DurableLog recovery using data from DurableDataLog. During this process, the following will happen: + * 1. Metadata will be reset and put into recovery mode. + * 2. DurableDataLog will be initialized. This will fail if the DurableDataLog has already been initialized. + * 3. Reads the entire contents of the DurableDataLog, extracts Operations, and updates the Metadata and other + * components (via MemoryStateUpdater) based on their contents. + * 4. Metadata is taken out of recovery mode. + * + * @return The number of Operations recovered. + * @throws Exception If an exception occurred. This could be one of the following: + * * DataLogWriterNotPrimaryException: If unable to acquire DurableDataLog ownership or the ownership + * has been lost in the process. + * * DataCorruptionException: If an unrecoverable corruption has been detected with the recovered data. + * * SerializationException: If a DataFrame or Operation was unable to be deserialized. + * * IOException: If a general IO exception occurred. + */ + public int performRecovery() throws Exception { + long traceId = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "performRecovery"); + Timer timer = new Timer(); + log.info("{} Recovery started.", this.traceObjectId); + + // Put metadata (and entire container) into 'Recovery Mode'. + this.metadata.enterRecoveryMode(); + + // Reset metadata. + this.metadata.reset(); + + OperationMetadataUpdater metadataUpdater = new OperationMetadataUpdater(this.metadata); + this.stateUpdater.enterRecoveryMode(metadataUpdater); + + boolean successfulRecovery = false; + int recoveredItemCount; + try { + recoveredItemCount = recoverAllOperations(metadataUpdater); + this.metadata.setContainerEpoch(this.durableDataLog.getEpoch()); + log.info("{} Recovery completed. Epoch = {}, Items Recovered = {}, Time = {}ms.", this.traceObjectId, + this.metadata.getContainerEpoch(), recoveredItemCount, timer.getElapsedMillis()); + successfulRecovery = true; + } finally { + // We must exit recovery mode when done, regardless of outcome. + this.metadata.exitRecoveryMode(); + this.stateUpdater.exitRecoveryMode(successfulRecovery); + } + + LoggerHelpers.traceLeave(log, this.traceObjectId, "performRecovery", traceId); + return recoveredItemCount; + } + + /** + * Recovers the Operations from the DurableLog using the given OperationMetadataUpdater. Searches the DurableDataLog + * until the first MetadataCheckpointOperation is encountered. All Operations prior to this one are skipped over. + * Recovery starts with the first MetadataCheckpointOperation and runs until the end of the DurableDataLog is reached. + * Subsequent MetadataCheckpointOperations are ignored (as they contain redundant information - which has already + * been built up using the Operations up to them). + * + * @param metadataUpdater The OperationMetadataUpdater to use for updates. + * @return The number of Operations recovered. + */ + private int recoverAllOperations(OperationMetadataUpdater metadataUpdater) throws Exception { + long traceId = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "recoverAllOperations"); + int skippedOperationCount = 0; + int skippedDataFramesCount = 0; + int recoveredItemCount = 0; + + // Read all entries from the DataFrameLog and append them to the InMemoryOperationLog. + // Also update metadata along the way. + try (DataFrameReader reader = new DataFrameReader<>(this.durableDataLog, this.operationFactory, this.metadata.getContainerId())) { + DataFrameReader.ReadResult readResult; + + // We can only recover starting from a MetadataCheckpointOperation; find the first one. + while (true) { + // Fetch the next operation. + readResult = reader.getNext(); + if (readResult == null) { + // We have reached the end and have not found any MetadataCheckpointOperations. + log.warn("{}: Reached the end of the DataFrameLog and could not find any MetadataCheckpointOperations after reading {} Operations and {} Data Frames.", + this.traceObjectId, skippedOperationCount, skippedDataFramesCount); + break; + } else if (readResult.getItem() instanceof MetadataCheckpointOperation) { + // We found a checkpoint. Start recovering from here. + log.info("{}: Starting recovery from Sequence Number {} (skipped {} Operations and {} Data Frames).", + this.traceObjectId, readResult.getItem().getSequenceNumber(), skippedOperationCount, skippedDataFramesCount); + break; + } else if (readResult.isLastFrameEntry()) { + skippedDataFramesCount++; + } + + skippedOperationCount++; + log.debug("{}: Not recovering operation because no MetadataCheckpointOperation encountered so far ({}).", + this.traceObjectId, readResult.getItem()); + } + + // Now continue with the recovery from here. + while (readResult != null) { + recordTruncationMarker(readResult); + recoverOperation(readResult, metadataUpdater); + recoveredItemCount++; + + // Fetch the next operation. + readResult = reader.getNext(); + } + } + + // Commit whatever changes we have in the metadata updater to the Container Metadata. + // This code will only be invoked if we haven't encountered any exceptions during recovery. + metadataUpdater.commitAll(); + LoggerHelpers.traceLeave(log, this.traceObjectId, "recoverAllOperations", traceId, recoveredItemCount); + return recoveredItemCount; + } + + protected void recoverOperation(DataFrameReader.ReadResult readResult, OperationMetadataUpdater metadataUpdater) throws DataCorruptionException { + // Update Metadata Sequence Number. + Operation operation = readResult.getItem(); + metadataUpdater.setOperationSequenceNumber(operation.getSequenceNumber()); + + // Update the metadata with the information from the Operation. + try { + log.debug("{} Recovering {}.", this.traceObjectId, operation); + metadataUpdater.preProcessOperation(operation); + metadataUpdater.acceptOperation(operation); + } catch (StreamSegmentException | ContainerException ex) { + // Metadata update failures should not happen during recovery. + throw new DataCorruptionException(String.format("Unable to update metadata for Log Operation '%s'.", operation), ex); + } + + // Update in-memory structures. + this.stateUpdater.process(operation); + } + + private void recordTruncationMarker(DataFrameReader.ReadResult readResult) { + // Truncation Markers are stored directly in the ContainerMetadata. There is no need for an OperationMetadataUpdater + // to do this. + // Determine and record Truncation Markers, but only if the current operation spans multiple DataFrames + // or it's the last entry in a DataFrame. + LogAddress lastFullAddress = readResult.getLastFullDataFrameAddress(); + LogAddress lastUsedAddress = readResult.getLastUsedDataFrameAddress(); + if (lastFullAddress != null && lastFullAddress.getSequence() != lastUsedAddress.getSequence()) { + // This operation spans multiple DataFrames. The TruncationMarker should be set on the last DataFrame + // that ends with a part of it. + this.metadata.recordTruncationMarker(readResult.getItem().getSequenceNumber(), lastFullAddress); + } else if (readResult.isLastFrameEntry()) { + // The operation was the last one in the frame. This is a Truncation Marker. + this.metadata.recordTruncationMarker(readResult.getItem().getSequenceNumber(), lastUsedAddress); + } + } + + //endregion +} diff --git a/segmentstore/storage/impl/src/main/java/io/pravega/segmentstore/storage/impl/bookkeeper/BookKeeperLogFactory.java b/segmentstore/storage/impl/src/main/java/io/pravega/segmentstore/storage/impl/bookkeeper/BookKeeperLogFactory.java index 83b92f77ff2..0748587a4c0 100644 --- a/segmentstore/storage/impl/src/main/java/io/pravega/segmentstore/storage/impl/bookkeeper/BookKeeperLogFactory.java +++ b/segmentstore/storage/impl/src/main/java/io/pravega/segmentstore/storage/impl/bookkeeper/BookKeeperLogFactory.java @@ -98,13 +98,30 @@ public void initialize() throws DurableDataLogException { } @Override - public DurableDataLog createDurableDataLog(int containerId) { + public DurableDataLog createDurableDataLog(int logId) { Preconditions.checkState(this.bookKeeper.get() != null, "BookKeeperLogFactory is not initialized."); - return new BookKeeperLog(containerId, this.zkClient, this.bookKeeper.get(), this.config, this.executor); + return new BookKeeperLog(logId, this.zkClient, this.bookKeeper.get(), this.config, this.executor); } + /** + * Creates a new DebugLogWrapper that can be used for debugging purposes. This should not be used for regular operations. + * + * @param logId Id of the Log to create a wrapper for. + * @return A new instance of the DebugLogWrapper class. + */ + public DebugLogWrapper createDebugLogWrapper(int logId) { + Preconditions.checkState(this.bookKeeper.get() != null, "BookKeeperLogFactory is not initialized."); + return new DebugLogWrapper(logId, this.zkClient, this.bookKeeper.get(), this.config, this.executor); + } + + /** + * Gets a pointer to the BookKeeper client used by this BookKeeperLogFactory. This should only be used for testing or + * admin tool purposes only. It should not be used for regular operations. + * + * @return The BookKeeper client. + */ @VisibleForTesting - BookKeeper getBookKeeperClient() { + public BookKeeper getBookKeeperClient() { return this.bookKeeper.get(); } diff --git a/segmentstore/storage/impl/src/main/java/io/pravega/segmentstore/storage/impl/bookkeeper/DebugLogWrapper.java b/segmentstore/storage/impl/src/main/java/io/pravega/segmentstore/storage/impl/bookkeeper/DebugLogWrapper.java new file mode 100644 index 00000000000..fffbf02c76c --- /dev/null +++ b/segmentstore/storage/impl/src/main/java/io/pravega/segmentstore/storage/impl/bookkeeper/DebugLogWrapper.java @@ -0,0 +1,200 @@ +/** + * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.segmentstore.storage.impl.bookkeeper; + +import io.pravega.common.util.ArrayView; +import io.pravega.common.util.CloseableIterator; +import io.pravega.segmentstore.storage.DataLogInitializationException; +import io.pravega.segmentstore.storage.DurableDataLog; +import io.pravega.segmentstore.storage.DurableDataLogException; +import io.pravega.segmentstore.storage.LogAddress; +import io.pravega.segmentstore.storage.QueueStats; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.curator.framework.CuratorFramework; + +/** + * Wrapper for a BookKeeperLog which only exposes methods that should be used for debugging/admin tools. + * NOTE: this class is not meant to be used for regular, production code. It exposes operations that should only be executed + * from the admin tools. + */ +public class DebugLogWrapper implements AutoCloseable { + //region Members + + private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30); + private final BookKeeperLog log; + private final BookKeeper bkClient; + private final BookKeeperConfig config; + private final AtomicBoolean initialized; + + //endregion + + //region Constructor + + /** + * Creates a new instance of the DebugLogWrapper class. + * + * @param logId The Id of the BookKeeperLog to wrap. + * @param zkClient A pointer to the CuratorFramework client to use. + * @param bookKeeper A pointer to the BookKeeper client to use. + * @param config BookKeeperConfig to use. + * @param executor An Executor to use for async operations. + */ + DebugLogWrapper(int logId, CuratorFramework zkClient, BookKeeper bookKeeper, BookKeeperConfig config, ScheduledExecutorService executor) { + this.log = new BookKeeperLog(logId, zkClient, bookKeeper, config, executor); + this.bkClient = bookKeeper; + this.config = config; + this.initialized = new AtomicBoolean(); + } + + //endregion + + //region AutoCloseable Implementation + + @Override + public void close() { + this.log.close(); + } + + //endregion + + //region Operations + + /** + * Creates a special DurableDataLog wrapping the BookKeeperLog that does only supports reading from the log. It does + * not support initialization or otherwise modifications to the log. Accessing this log will not interfere with other + * active writes to this log (i.e., it will not fence anyone out or close Ledgers that shouldn't be closed). + * + * @return A new DurableDataLog instance. + * @throws DataLogInitializationException If an exception occurred fetching metadata from ZooKeeper. + */ + public DurableDataLog asReadOnly() throws DataLogInitializationException { + return new ReadOnlyBooKeeperLog(this.log.loadMetadata()); + } + + /** + * Loads a fresh copy BookKeeperLog Metadata from ZooKeeper, without doing any sort of fencing or otherwise modifying + * it. + * + * @return A new instance of the LogMetadata class, or null if no such metadata exists (most likely due to this being + * the first time accessing this log). + * @throws DataLogInitializationException If an Exception occurred. + */ + public ReadOnlyLogMetadata fetchMetadata() throws DataLogInitializationException { + return this.log.loadMetadata(); + } + + /** + * Opens a ledger for reading purposes (does not fence it). + * + * @param ledgerMetadata LedgerMetadata for the ledger to open. + * @return A BookKeeper LedgerHandle representing the ledger. + * @throws DurableDataLogException If an exception occurred. + */ + public LedgerHandle openLedgerNoFencing(LedgerMetadata ledgerMetadata) throws DurableDataLogException { + return Ledgers.openRead(ledgerMetadata.getLedgerId(), this.bkClient, this.config); + } + + /** + * Updates the Metadata for this BookKeeperLog in ZooKeeper by setting its Enabled flag to true. + * @throws DurableDataLogException If an exception occurred. + */ + public void enable() throws DurableDataLogException { + this.log.enable(); + } + + /** + * Open-Fences the BookKeeperLog (initializes it), then updates the Metadata for it in ZooKeeper by setting its + * Enabled flag to false. + * @throws DurableDataLogException If an exception occurred. + */ + public void disable() throws DurableDataLogException { + initialize(); + this.log.disable(); + } + + private void initialize() throws DurableDataLogException { + if (this.initialized.compareAndSet(false, true)) { + try { + this.log.initialize(DEFAULT_TIMEOUT); + } catch (Exception ex) { + this.initialized.set(false); + throw ex; + } + } + } + + //endregion + + //region ReadOnlyBookKeeperLog + + @RequiredArgsConstructor(access = AccessLevel.PRIVATE) + private class ReadOnlyBooKeeperLog implements DurableDataLog { + private final LogMetadata logMetadata; + + @Override + public void close() { + // Nothing to do. + } + + @Override + public CloseableIterator getReader() throws DurableDataLogException { + return new LogReader(this.logMetadata, DebugLogWrapper.this.bkClient, DebugLogWrapper.this.config); + } + + @Override + public int getMaxAppendLength() { + return BookKeeperConfig.MAX_APPEND_LENGTH; + } + + @Override + public long getEpoch() { + return this.logMetadata.getEpoch(); + } + + @Override + public QueueStats getQueueStatistics() { + return null; + } + + @Override + public void initialize(Duration timeout) throws DurableDataLogException { + throw new UnsupportedOperationException(); + } + + @Override + public void enable() throws DurableDataLogException { + throw new UnsupportedOperationException(); + } + + @Override + public void disable() throws DurableDataLogException { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture append(ArrayView data, Duration timeout) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture truncate(LogAddress upToAddress, Duration timeout) { + throw new UnsupportedOperationException(); + } + } + + //endregion +} diff --git a/segmentstore/storage/impl/src/main/java/io/pravega/segmentstore/storage/impl/bookkeeper/LedgerMetadata.java b/segmentstore/storage/impl/src/main/java/io/pravega/segmentstore/storage/impl/bookkeeper/LedgerMetadata.java index 0b17abb4b91..9b02cd67dfc 100644 --- a/segmentstore/storage/impl/src/main/java/io/pravega/segmentstore/storage/impl/bookkeeper/LedgerMetadata.java +++ b/segmentstore/storage/impl/src/main/java/io/pravega/segmentstore/storage/impl/bookkeeper/LedgerMetadata.java @@ -18,7 +18,7 @@ */ @RequiredArgsConstructor @Getter -class LedgerMetadata { +public class LedgerMetadata { /** * The BookKeeper-assigned Ledger Id. */ diff --git a/segmentstore/storage/impl/src/main/java/io/pravega/segmentstore/storage/impl/bookkeeper/Ledgers.java b/segmentstore/storage/impl/src/main/java/io/pravega/segmentstore/storage/impl/bookkeeper/Ledgers.java index 93bc215f7ac..a53db8667e8 100644 --- a/segmentstore/storage/impl/src/main/java/io/pravega/segmentstore/storage/impl/bookkeeper/Ledgers.java +++ b/segmentstore/storage/impl/src/main/java/io/pravega/segmentstore/storage/impl/bookkeeper/Ledgers.java @@ -127,7 +127,7 @@ static void close(LedgerHandle handle) throws DurableDataLogException { try { Exceptions.handleInterrupted(handle::close); } catch (BKException bkEx) { - throw new DurableDataLogException(String.format("Unable to open-fence ledger %d.", handle.getId()), bkEx); + throw new DurableDataLogException(String.format("Unable to close ledger %d.", handle.getId()), bkEx); } } diff --git a/segmentstore/storage/impl/src/main/java/io/pravega/segmentstore/storage/impl/bookkeeper/LogMetadata.java b/segmentstore/storage/impl/src/main/java/io/pravega/segmentstore/storage/impl/bookkeeper/LogMetadata.java index 7c06372d62a..ca2a9146dbf 100644 --- a/segmentstore/storage/impl/src/main/java/io/pravega/segmentstore/storage/impl/bookkeeper/LogMetadata.java +++ b/segmentstore/storage/impl/src/main/java/io/pravega/segmentstore/storage/impl/bookkeeper/LogMetadata.java @@ -26,7 +26,7 @@ * Metadata for a Ledger-based log. */ @NotThreadSafe -class LogMetadata { +class LogMetadata implements ReadOnlyLogMetadata { //region Members /** @@ -219,7 +219,8 @@ LogMetadata updateLedgerStatus(Map lastAddConfirmed) { * * @return The current version. */ - int getUpdateVersion() { + @Override + public int getUpdateVersion() { return this.updateVersion.get(); } diff --git a/segmentstore/storage/impl/src/main/java/io/pravega/segmentstore/storage/impl/bookkeeper/ReadOnlyLogMetadata.java b/segmentstore/storage/impl/src/main/java/io/pravega/segmentstore/storage/impl/bookkeeper/ReadOnlyLogMetadata.java new file mode 100644 index 00000000000..04dd9838edb --- /dev/null +++ b/segmentstore/storage/impl/src/main/java/io/pravega/segmentstore/storage/impl/bookkeeper/ReadOnlyLogMetadata.java @@ -0,0 +1,55 @@ +/** + * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.segmentstore.storage.impl.bookkeeper; + +import java.util.List; + +/** + * Defines a read-only view of the BookKeeper Log Metadata. + */ +public interface ReadOnlyLogMetadata { + /** + * Gets a value indicating the current epoch of this LogMetadata. This changes upon every successful log initialization, + * immediately after it was fenced. + * + * @return The current epoch. + */ + long getEpoch(); + + /** + * Gets a value indicating the current version of the Metadata (this changes upon every successful metadata persist). + * Note: this is different from getEpoch() - which gets incremented with every successful recovery. + * + * @return The current version. + */ + int getUpdateVersion(); + + /** + * Gets a value indicating whether this log is enabled or not. + * + * @return True if enabled, false otherwise. + */ + boolean isEnabled(); + + /** + * Gets a read-only ordered list of LedgerMetadata instances representing the Ledgers that currently make up this + * Log Metadata. + * + * @return A new read-only list. + */ + List getLedgers(); + + /** + * Gets a LedgerAddress representing the first location in the log that is accessible for reads. + * + * @return The Truncation Address. + */ + LedgerAddress getTruncationAddress(); +} diff --git a/test/integration/src/main/java/io/pravega/test/integration/selftest/SelfTestRunner.java b/test/integration/src/main/java/io/pravega/test/integration/selftest/SelfTestRunner.java index 5c6410034e6..783b6db7cfd 100644 --- a/test/integration/src/main/java/io/pravega/test/integration/selftest/SelfTestRunner.java +++ b/test/integration/src/main/java/io/pravega/test/integration/selftest/SelfTestRunner.java @@ -65,6 +65,10 @@ public static void main(String[] args) throws Exception { // Wait for the test to finish. test.awaitFinished().join(); + if (testConfig.isPauseBeforeExit()) { + System.out.println("Services are still running. Press any key to exit ..."); + System.in.read(); + } // Make sure the test is stopped. test.stopAsync().awaitTerminated(); @@ -204,7 +208,8 @@ private static class Shortcuts { new Shortcut("controller", TestConfig.CONTROLLER_HOST), new Shortcut("controllerport", TestConfig.CONTROLLER_BASE_PORT), new Shortcut("metrics", TestConfig.METRICS_ENABLED), - new Shortcut("reads", TestConfig.READS_ENABLED))); + new Shortcut("reads", TestConfig.READS_ENABLED), + new Shortcut("pause", TestConfig.PAUSE_BEFORE_EXIT))); SHORTCUTS = Collections.unmodifiableMap(s); } diff --git a/test/integration/src/main/java/io/pravega/test/integration/selftest/TestConfig.java b/test/integration/src/main/java/io/pravega/test/integration/selftest/TestConfig.java index 6e1264a8d14..f3d91d8b71d 100644 --- a/test/integration/src/main/java/io/pravega/test/integration/selftest/TestConfig.java +++ b/test/integration/src/main/java/io/pravega/test/integration/selftest/TestConfig.java @@ -51,6 +51,7 @@ public class TestConfig { static final Property SEGMENT_STORE_COUNT = Property.named("segmentStoreCount", 1); static final Property CONTROLLER_HOST = Property.named("controllerHost", LOCALHOST); static final Property CONTROLLER_BASE_PORT = Property.named("controllerPort", 9200); + static final Property PAUSE_BEFORE_EXIT = Property.named("pauseBeforeExit", false); private static final Property ZK_PORT = Property.named("zkPort", 9000); private static final Property BK_BASE_PORT = Property.named("bkBasePort", 9100); private static final Property SEGMENT_STORE_BASE_PORT = Property.named("segmentStorePort", 9300); @@ -110,6 +111,8 @@ public class TestConfig { @Getter private final boolean metricsEnabled; @Getter + private final boolean pauseBeforeExit; + @Getter private final String testId = Long.toHexString(System.currentTimeMillis()); //endregion @@ -154,6 +157,7 @@ private TestConfig(TypedProperties properties) throws ConfigurationException { this.testType = TestType.valueOf(properties.get(TEST_TYPE)); this.readsEnabled = properties.getBoolean(READS_ENABLED); this.metricsEnabled = properties.getBoolean(METRICS_ENABLED); + this.pauseBeforeExit = properties.getBoolean(PAUSE_BEFORE_EXIT); checkOverlappingPorts(); } diff --git a/test/integration/src/main/java/io/pravega/test/integration/selftest/adapters/OutOfProcessAdapter.java b/test/integration/src/main/java/io/pravega/test/integration/selftest/adapters/OutOfProcessAdapter.java index 219923b71a7..1b3f02b0ae2 100644 --- a/test/integration/src/main/java/io/pravega/test/integration/selftest/adapters/OutOfProcessAdapter.java +++ b/test/integration/src/main/java/io/pravega/test/integration/selftest/adapters/OutOfProcessAdapter.java @@ -215,6 +215,10 @@ private Process startSegmentStore(int segmentStoreId) throws IOException { .sysProp(configProperty(AutoScalerConfig.COMPONENT_CODE, AutoScalerConfig.CONTROLLER_URI), getControllerUrl()) .stdOut(ProcessBuilder.Redirect.to(new File(this.testConfig.getComponentOutLogPath("segmentStore", segmentStoreId)))) .stdErr(ProcessBuilder.Redirect.to(new File(this.testConfig.getComponentErrLogPath("segmentStore", segmentStoreId)))); + if (this.testConfig.getBookieCount() > 0) { + ps = ps.sysProp(configProperty(ServiceConfig.COMPONENT_CODE, ServiceConfig.DATALOG_IMPLEMENTATION), ServiceConfig.DataLogType.BOOKKEEPER); + } + if (this.testConfig.isMetricsEnabled()) { ps.sysProp(configProperty(MetricsConfig.COMPONENT_CODE, MetricsConfig.ENABLE_STATISTICS), true); ps.sysProp(configProperty(MetricsConfig.COMPONENT_CODE, MetricsConfig.ENABLE_CSV_REPORTER), true);