Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 174 additions & 0 deletions pip/pip-418.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
# PIP-418: Determine the behaviors for components that rely on BookKeeper when BookKeeper is not used

# Background knowledge

`PulsarService` has a method for other classes to access the BookKeeper client (`BookKeeper`):

```java
public BookKeeper getBookKeeperClient() {
ManagedLedgerStorageClass defaultStorageClass = getManagedLedgerStorage().getDefaultStorageClass();
if (defaultStorageClass instanceof BookkeeperManagedLedgerStorageClass bkStorageClass) {
return bkStorageClass.getBookKeeperClient();
} else {
// TODO: Refactor code to support other than default bookkeeper based storage class
throw new UnsupportedOperationException("BookKeeper client is not available");
}
}
```

The BookKeeper client is obtained from the `BookkeeperManagedLedgerStorageClass` interface, which extends the `ManagedLedgerStorageClass` interface. This interface represents the actual storage layer of `ManagedLedgerStorage`.

```java
Collection<ManagedLedgerStorageClass> getStorageClasses();
```

The following components share the same `BookKeeper` instance:
- `PulsarCompactionServiceFactory`: The default compaction service configured by `compactionServiceFactoryClassName`
- `EventTimeCompactionServiceFactory`: Another built-in compaction service introduced by PIP-215
- `StrategicTwoPhaseCompactor`: The compactor used only for the `loadbalancer-service-unit-state` topic (introduced by PIP-352)

This `BookKeeper` instance is also used in:
- `AdminResource#bookKeeper`: Handles the `/bookies/all` REST API
- `PersistentTopic#getInternalStats`: Uses the `BookKeeper` object to get a list of ledgers

`PulsarService` also has another method to create a new BookKeeper client from the factory class (`BookKeeperClientFactory`):

```java
public BookKeeperClientFactory getBookKeeperClientFactory() {
return bkClientFactory;
}
```

The following components leverages this factory to create their own `BookKeeper` instances:
- `BookkeeperSchemaStorageFactory`: The default schema registry storage configured by `schemaRegistryStorageClassName`
- `BookkeeperBucketSnapshotStorage`: A non-default built-in delayed delivery tracker configured by `delayedDeliveryTrackerFactoryClassName`
Comment on lines +42 to +44
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m not entirely sure why separate BookKeeper instances are used for these two cases. While this doesn’t directly affect the goal of the proposal, should we consider addressing or revisiting this as part of a broader cleanup?


# Motivation

Pulsar's pluggable managed ledger interface (`ManagedLedgerStorage`) allows a customized storage layer, such as S3. However, in practice, implementing a custom `ManagedLedgerStorage` is insufficient. At least, we have to customize:
- Schema Registry: The default implementation relies on BookKeeper.
- Topic Policies Service: The default implementation relies on system topics, which may not function correctly when the compaction service is enabled.

Even if a storage layer that does not rely on BookKeeper is configured, Pulsar still sets a `BookKeeperClientFactory` for components like the schema registry. While `PulsarService#getBookKeeperClient` throws an exception if the default storage class is not a `BookkeeperManagedLedgerStorageClass`, the compaction service will not fail immediately. This is because the compaction service is only created when a topic is initialized (in `PulsarService#newTopicCompactionService`).

When BookKeeper is not the default storage, the behaviors are not clear for these components unless looking into the code. This proposal aims at a clear definition of the behavior.

# Goals

## In Scope

Define the behavior for components that rely on BookKeeper when BookKeeper is not the default storage.

# High Level Design

When BookKeeper is not the default storage,

| Component | Behavior |
| :- | :- |
| Schema Registry | Fail fast |
| Delayed Delivery Tracker | Fail fast if it's `BucketDelayedDeliveryTrackerFactory` |
| Compaction Service | Fallback to a dummy implementation |
Comment on lines +68 to +70
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These three items refer to Pulsar’s default implementations, correct? If users have provided their own plugin implementations, they shouldn’t be affected — is that understanding accurate?

| `/bookies/all` REST API | Return an empty bookie list |
| `PersistentTopic#getInternalStats` | Return an empty ledger list in stats |

Here we don't fail fast for the compaction service because whether to perform compaction does not affect the correctness. It only affects the storage size.

# Detailed Design

## Design & Implementation Details

Simplify the `ManagedLedgerStorage#initialize` method by removing the `BookKeeperClientFactory` and `EventLoopGroup` parameters:

```java
public interface ManagedLedgerStorage extends AutoCloseable {

void initialize(ServiceConfiguration conf,
MetadataStoreExtended metadataStore,
OpenTelemetry openTelemetry) throws Exception;
```

The reason to have these parameters is that `BookKeeperClientFactory` is maintained in `PulsarService` and creating a `BookKeeper` instance requires a `EventLoopGroup` object.

However, these BookKeeper related parameters are not useful for a `ManagedLedgerStorage` implementation. Even if it's another BookKeeper based implementation, it should create its own BookKeeper client objects according to the configuration (`conf`). In addition, the `EventLoopGroup` instance passed to the method represents the `pulsar-io` thread pool, which is coupled with Pulsar's internal details. The custom implementation should create its own thread pool.

Since the built-in implementation still requires an `EventLoopGroup` instance, we can skip calling `initialize` and pass the arguments directly to the constructor:

```java
if (config.getManagedLedgerStorageClassName().equals(ManagedLedgerClientFactory.class.getName())) {
return new ManagedLedgerClientFactory(config, localMetadataStore, ioEventLoopGroup, openTelemetry);
} else {
// create() will pass these arguments to initialize()
return ManagedLedgerStorage.create(config, localMetadataStore, openTelemetry);
}
```

Add a new method to `BookkeeperManagedLedgerStorageClass`:

```java
public interface BookkeeperManagedLedgerStorageClass extends ManagedLedgerStorageClass {
/* ... */
BookKeeperClientFactory getBookKeeperClientFactory();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it designed to be used by BookkeeperSchemaStorageFactory and BookkeeperBucketSnapshotStorage?

I mean, BookkeeperManagedLedgerStorageClass already has getBookKeeperClient, and should all Pulsar's internal implementations use the same bookkeeper client? Maybe I missed something.

}
```

Then the `bkClientFactory` field will move from `PulsarService` to `ManagedLedgerClientFactory`.

`PulsarService` still maintains the BookKeeper client getters but they should always access via the `ManagedLedgerStorage` object:

```java
public Optional<BookKeeper> getOptionalBookKeeperClient() {
final var defaultStorage = managedLedgerStorage.getDefaultStorageClass();
if (defaultStorage instanceof BookkeeperManagedLedgerStorageClass bkStorage) {
return Optional.of(bkStorage.getBookKeeperClient());
} else {
return Optional.empty();
}
}

public BookKeeper getBookKeeperClient() throws PulsarServerException {
return getOptionalBookKeeperClient().orElseThrow(PulsarServerException.BookKeeperNotSupportedException::new);
}

public BookKeeperClientFactory getBookKeeperClientFactory() throws PulsarServerException {
if (managedLedgerStorage.getDefaultStorageClass() instanceof BookkeeperManagedLedgerStorageClass bkStorage) {
return bkStorage.getBookKeeperClientFactory();
} else {
throw new PulsarServerException.BookKeeperNotSupportedException();
}
}
```

```java
public static class BookKeeperNotSupportedException extends PulsarServerException {
```

For fail-fast cases, this new exception will be thrown.

Specifically, this proposal provides a dummy compaction service implementation:

```java
public class DisabledTopicCompactionService implements CompactionServiceFactory {
```

When a built-in compaction service is configured but BookKeeper is not the storage, fallback to this implementation with a warning log:

```java
if (getOptionalBookKeeperClient().isEmpty()
&& PulsarCompactionServiceFactory.class.isAssignableFrom(compactionServiceFactory.getClass())) {
LOG.warn("BookKeeper client is not available, fallback to a disabled compaction service");
return new DisabledTopicCompactionService();
}
```

# Backward & Forward Compatibility

This proposal only breaks some unstable managed ledger related interfaces that might have impact on the plugin implementation.

If the custom `ManagedLedgerStorage` implementation already rely on the `pulsar-io` thread pool (the previous `eventLoopGroup` parameter), it has to create a separated `EventLoopGroup` instance instead. But it should be a very rare case and it's not a good practice to reuse the `pulsar-io` thread pool.

# Alternatives

# Links

* Mailing List discussion thread: https://lists.apache.org/thread/vbgw7ntszkx36mvq5vhk80qlzvtl909r
* Mailing List voting thread: