Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[cleanup][ml] ManagedCursor clean up. (apache#22246)
Browse files Browse the repository at this point in the history
  • Loading branch information
dao-jun authored Mar 12, 2024
1 parent 9f63e24 commit 532b0d9
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ protected EntryImpl newObject(Handle<EntryImpl> handle) {
private long timestamp;
private long ledgerId;
private long entryId;
private PositionImpl position;
ByteBuf data;

private Runnable onDeallocate;
Expand Down Expand Up @@ -151,7 +152,10 @@ public int getLength() {

@Override
public PositionImpl getPosition() {
return new PositionImpl(ledgerId, entryId);
if (position == null) {
position = PositionImpl.get(ledgerId, entryId);
}
return position;
}

@Override
Expand Down Expand Up @@ -197,6 +201,7 @@ protected void deallocate() {
timestamp = -1;
ledgerId = -1;
entryId = -1;
position = null;
recyclerHandle.recycle(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1502,10 +1502,7 @@ public Set<? extends Position> asyncReplayEntries(Set<? extends Position> positi
Set<Position> alreadyAcknowledgedPositions = new HashSet<>();
lock.readLock().lock();
try {
positions.stream()
.filter(position -> ((PositionImpl) position).compareTo(markDeletePosition) <= 0
|| individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId()))
.forEach(alreadyAcknowledgedPositions::add);
positions.stream().filter(this::isMessageDeleted).forEach(alreadyAcknowledgedPositions::add);
} finally {
lock.readLock().unlock();
}
Expand Down Expand Up @@ -2278,8 +2275,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
return;
}

if (position.compareTo(markDeletePosition) <= 0
|| individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId())) {
if (isMessageDeleted(position)) {
if (config.isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
Expand Down Expand Up @@ -3504,8 +3500,7 @@ public Range<PositionImpl> getLastIndividualDeletedRange() {
@Override
public void trimDeletedEntries(List<Entry> entries) {
entries.removeIf(entry -> {
boolean isDeleted = markDeletePosition.compareTo(entry.getLedgerId(), entry.getEntryId()) >= 0
|| individualDeletedMessages.contains(entry.getLedgerId(), entry.getEntryId());
boolean isDeleted = isMessageDeleted(entry.getPosition());
if (isDeleted) {
entry.release();
}
Expand Down

0 comments on commit 532b0d9

Please sign in to comment.