-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[improve][pip] PIP-418: Determine the behaviors for components that rely on BookKeeper when BookKeeper is not used #24296
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,174 @@ | ||
| # PIP-418: Determine the behaviors for components that rely on BookKeeper when BookKeeper is not used | ||
|
|
||
| # Background knowledge | ||
|
|
||
| `PulsarService` has a method for other classes to access the BookKeeper client (`BookKeeper`): | ||
|
|
||
| ```java | ||
| public BookKeeper getBookKeeperClient() { | ||
| ManagedLedgerStorageClass defaultStorageClass = getManagedLedgerStorage().getDefaultStorageClass(); | ||
| if (defaultStorageClass instanceof BookkeeperManagedLedgerStorageClass bkStorageClass) { | ||
| return bkStorageClass.getBookKeeperClient(); | ||
| } else { | ||
| // TODO: Refactor code to support other than default bookkeeper based storage class | ||
| throw new UnsupportedOperationException("BookKeeper client is not available"); | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| The BookKeeper client is obtained from the `BookkeeperManagedLedgerStorageClass` interface, which extends the `ManagedLedgerStorageClass` interface. This interface represents the actual storage layer of `ManagedLedgerStorage`. | ||
|
|
||
| ```java | ||
| Collection<ManagedLedgerStorageClass> getStorageClasses(); | ||
| ``` | ||
|
|
||
| The following components share the same `BookKeeper` instance: | ||
| - `PulsarCompactionServiceFactory`: The default compaction service configured by `compactionServiceFactoryClassName` | ||
| - `EventTimeCompactionServiceFactory`: Another built-in compaction service introduced by PIP-215 | ||
| - `StrategicTwoPhaseCompactor`: The compactor used only for the `loadbalancer-service-unit-state` topic (introduced by PIP-352) | ||
|
|
||
| This `BookKeeper` instance is also used in: | ||
| - `AdminResource#bookKeeper`: Handles the `/bookies/all` REST API | ||
| - `PersistentTopic#getInternalStats`: Uses the `BookKeeper` object to get a list of ledgers | ||
|
|
||
| `PulsarService` also has another method to create a new BookKeeper client from the factory class (`BookKeeperClientFactory`): | ||
|
|
||
| ```java | ||
| public BookKeeperClientFactory getBookKeeperClientFactory() { | ||
| return bkClientFactory; | ||
| } | ||
| ``` | ||
|
|
||
| The following components leverages this factory to create their own `BookKeeper` instances: | ||
| - `BookkeeperSchemaStorageFactory`: The default schema registry storage configured by `schemaRegistryStorageClassName` | ||
| - `BookkeeperBucketSnapshotStorage`: A non-default built-in delayed delivery tracker configured by `delayedDeliveryTrackerFactoryClassName` | ||
|
|
||
| # Motivation | ||
|
|
||
| Pulsar's pluggable managed ledger interface (`ManagedLedgerStorage`) allows a customized storage layer, such as S3. However, in practice, implementing a custom `ManagedLedgerStorage` is insufficient. At least, we have to customize: | ||
| - Schema Registry: The default implementation relies on BookKeeper. | ||
| - Topic Policies Service: The default implementation relies on system topics, which may not function correctly when the compaction service is enabled. | ||
|
|
||
| Even if a storage layer that does not rely on BookKeeper is configured, Pulsar still sets a `BookKeeperClientFactory` for components like the schema registry. While `PulsarService#getBookKeeperClient` throws an exception if the default storage class is not a `BookkeeperManagedLedgerStorageClass`, the compaction service will not fail immediately. This is because the compaction service is only created when a topic is initialized (in `PulsarService#newTopicCompactionService`). | ||
|
|
||
| When BookKeeper is not the default storage, the behaviors are not clear for these components unless looking into the code. This proposal aims at a clear definition of the behavior. | ||
|
|
||
| # Goals | ||
|
|
||
| ## In Scope | ||
|
|
||
| Define the behavior for components that rely on BookKeeper when BookKeeper is not the default storage. | ||
|
|
||
| # High Level Design | ||
|
|
||
| When BookKeeper is not the default storage, | ||
|
|
||
| | Component | Behavior | | ||
| | :- | :- | | ||
| | Schema Registry | Fail fast | | ||
| | Delayed Delivery Tracker | Fail fast if it's `BucketDelayedDeliveryTrackerFactory` | | ||
| | Compaction Service | Fallback to a dummy implementation | | ||
|
Comment on lines
+68
to
+70
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These three items refer to Pulsar’s default implementations, correct? If users have provided their own plugin implementations, they shouldn’t be affected — is that understanding accurate? |
||
| | `/bookies/all` REST API | Return an empty bookie list | | ||
| | `PersistentTopic#getInternalStats` | Return an empty ledger list in stats | | ||
|
|
||
| Here we don't fail fast for the compaction service because whether to perform compaction does not affect the correctness. It only affects the storage size. | ||
|
|
||
| # Detailed Design | ||
|
|
||
| ## Design & Implementation Details | ||
|
|
||
| Simplify the `ManagedLedgerStorage#initialize` method by removing the `BookKeeperClientFactory` and `EventLoopGroup` parameters: | ||
|
|
||
| ```java | ||
| public interface ManagedLedgerStorage extends AutoCloseable { | ||
|
|
||
| void initialize(ServiceConfiguration conf, | ||
| MetadataStoreExtended metadataStore, | ||
| OpenTelemetry openTelemetry) throws Exception; | ||
| ``` | ||
|
|
||
| The reason to have these parameters is that `BookKeeperClientFactory` is maintained in `PulsarService` and creating a `BookKeeper` instance requires a `EventLoopGroup` object. | ||
|
|
||
| However, these BookKeeper related parameters are not useful for a `ManagedLedgerStorage` implementation. Even if it's another BookKeeper based implementation, it should create its own BookKeeper client objects according to the configuration (`conf`). In addition, the `EventLoopGroup` instance passed to the method represents the `pulsar-io` thread pool, which is coupled with Pulsar's internal details. The custom implementation should create is own thread pool. | ||
BewareMyPower marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| Since the built-in implementation still requires an `EventLoopGroup` instance, we can skip calling `initialize` and pass the arguments directly to the constructor: | ||
|
|
||
| ```java | ||
| if (config.getManagedLedgerStorageClassName().equals(ManagedLedgerClientFactory.class.getName())) { | ||
| return new ManagedLedgerClientFactory(config, localMetadataStore, ioEventLoopGroup, openTelemetry); | ||
| } else { | ||
| // create() will pass these arguments to initialize() | ||
| return ManagedLedgerStorage.create(config, localMetadataStore, openTelemetry); | ||
| } | ||
| ``` | ||
|
|
||
| Add a new method to `BookkeeperManagedLedgerStorageClass`: | ||
|
|
||
| ```java | ||
| public interface BookkeeperManagedLedgerStorageClass extends ManagedLedgerStorageClass { | ||
| /* ... */ | ||
| BookKeeperClientFactory getBookKeeperClientFactory(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it designed to be used by I mean, BookkeeperManagedLedgerStorageClass already has |
||
| } | ||
| ``` | ||
|
|
||
| Then the `bkClientFactory` field will move from `PulsarService` to `ManagedLedgerClientFactory`. | ||
|
|
||
| `PulsarService` still maintains the BookKeeper client getters but they should always access via the `ManagedLedgerStorage` object: | ||
|
|
||
| ```java | ||
| public Optional<BookKeeper> getOptionalBookKeeperClient() { | ||
| final var defaultStorage = managedLedgerStorage.getDefaultStorageClass(); | ||
| if (defaultStorage instanceof BookkeeperManagedLedgerStorageClass bkStorage) { | ||
| return Optional.of(bkStorage.getBookKeeperClient()); | ||
| } else { | ||
| return Optional.empty(); | ||
| } | ||
| } | ||
|
|
||
| public BookKeeper getBookKeeperClient() throws PulsarServerException { | ||
| return getOptionalBookKeeperClient().orElseThrow(PulsarServerException.BookKeeperNotSupportedException::new); | ||
| } | ||
|
|
||
| public BookKeeperClientFactory getBookKeeperClientFactory() throws PulsarServerException { | ||
| if (managedLedgerStorage.getDefaultStorageClass() instanceof BookkeeperManagedLedgerStorageClass bkStorage) { | ||
| return bkStorage.getBookKeeperClientFactory(); | ||
| } else { | ||
| throw new PulsarServerException.BookKeeperNotSupportedException(); | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| ```java | ||
| public static class BookKeeperNotSupportedException extends PulsarServerException { | ||
| ``` | ||
|
|
||
| For fail-fast cases, this new exception will be thrown. | ||
|
|
||
| Specifically, this proposal provides a dummy compaction service implementation: | ||
|
|
||
| ```java | ||
| public class DisabledTopicCompactionService implements CompactionServiceFactory { | ||
| ``` | ||
|
|
||
| When a built-in compaction service is configured but BookKeeper is not the storage, fallback to this implementation with a warning log: | ||
|
|
||
| ```java | ||
| if (getOptionalBookKeeperClient().isEmpty() | ||
| && PulsarCompactionServiceFactory.class.isAssignableFrom(compactionServiceFactory.getClass())) { | ||
| LOG.warn("BookKeeper client is not available, fallback to a disabled compaction service"); | ||
| return new DisabledTopicCompactionService(); | ||
| } | ||
| ``` | ||
|
|
||
| # Backward & Forward Compatibility | ||
|
|
||
| This proposal only breaks some unstable managed ledger related interfaces that might have impact on the plugin implementation. | ||
|
|
||
| If the custom `ManagedLedgerStorage` implementation already rely on the `pulsar-io` thread pool (the previous `eventLoopGroup` parameter), it has to create a separated `EventLoopGroup` instance instead. But it should be a very rare case and it's not a good practice to reuse the `pulsar-io` thread pool. | ||
|
|
||
| # Alternatives | ||
|
|
||
| # Links | ||
|
|
||
| * Mailing List discussion thread: https://lists.apache.org/thread/vbgw7ntszkx36mvq5vhk80qlzvtl909r | ||
| * Mailing List voting thread: | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m not entirely sure why separate BookKeeper instances are used for these two cases. While this doesn’t directly affect the goal of the proposal, should we consider addressing or revisiting this as part of a broader cleanup?