2929import org .apache .logging .log4j .LogManager ;
3030import org .apache .logging .log4j .Logger ;
3131
32+ import java .util .Iterator ;
3233import java .util .List ;
3334import java .util .Map ;
3435import java .util .NavigableSet ;
@@ -56,16 +57,18 @@ public class MemoryIndex implements SinglePointIndex
5657 private final AtomicBoolean removed = new AtomicBoolean (false );
5758
5859 /**
59- * For unique indexes: baseKey -> (timestamp -> rowId)
60+ * For unique indexes: baseKey -> (timestamp -> rowId).
6061 */
6162 private final ConcurrentHashMap <CompositeKey , ConcurrentSkipListMap <Long , Long >> uniqueIndex ;
6263 /**
63- * For non-unique indexes: baseKey -> (rowId -> timestamps)
64+ * For non-unique indexes: baseKey -> (rowId -> timestamps).
6465 */
6566 private final ConcurrentHashMap <CompositeKey , ConcurrentHashMap <Long , ConcurrentSkipListSet <Long >>> nonUniqueIndex ;
6667
67- // Tombstones: baseKey -> tombstone timestamp, the transactions with timestamp >= tombstone timestamp can not see the deleted versions
68- private final ConcurrentHashMap <CompositeKey , Long > tombstones ;
68+ /**
69+ * Tombstones: baseKey -> tombstone timestamps.
70+ */
71+ private final ConcurrentHashMap <CompositeKey , ConcurrentSkipListSet <Long >> tombstones ;
6972
7073 public MemoryIndex (long tableId , long indexId , boolean unique )
7174 {
@@ -153,7 +156,6 @@ public boolean putPrimaryEntries(List<IndexProto.PrimaryIndexEntry> entries) thr
153156 this .uniqueIndex .computeIfAbsent (baseKey , k -> new ConcurrentSkipListMap <>());
154157 versions .put (timestamp , entry .getRowId ());
155158 mainIndex .putEntry (entry .getRowId (), entry .getRowLocation ());
156- tombstones .remove (baseKey );
157159 }
158160 return true ;
159161 }
@@ -183,7 +185,6 @@ private void internalPutEntry(long rowId, CompositeKey baseKey, long timestamp)
183185 ConcurrentSkipListMap <Long , Long > versions =
184186 this .uniqueIndex .computeIfAbsent (baseKey , k -> new ConcurrentSkipListMap <>());
185187 versions .put (timestamp , rowId );
186- tombstones .remove (baseKey );
187188 }
188189 else
189190 {
@@ -317,35 +318,42 @@ public long deleteUniqueEntry(IndexProto.IndexKey key) throws SinglePointIndexEx
317318 return -1L ;
318319 }
319320 // Add tombstone instead of removing the entry
320- this .tombstones .put (baseKey , key .getTimestamp ());
321-
321+ ConcurrentSkipListSet <Long > existingTombstones =
322+ this .tombstones .computeIfAbsent (baseKey , k -> new ConcurrentSkipListSet <>());
323+ existingTombstones .add (key .getTimestamp ());
322324 return rowId ;
323325 }
324326
325327 @ Override
326328 public List <Long > deleteEntry (IndexProto .IndexKey key ) throws SinglePointIndexException
327329 {
328330 checkClosed ();
329- CompositeKey baseKey = extractBaseKey (key );
330- if (unique )
331+ try
331332 {
332- long rowId = getUniqueRowId (key );
333- if (rowId < 0 )
333+ if (unique )
334334 {
335- return ImmutableList .of ();
335+ long rowId = getUniqueRowId (key );
336+ if (rowId < 0 )
337+ {
338+ return ImmutableList .of ();
339+ }
340+ return ImmutableList .of (rowId );
341+ } else
342+ {
343+ List <Long > rowIds = getRowIds (key );
344+ if (rowIds .isEmpty ())
345+ {
346+ return ImmutableList .of ();
347+ }
348+ return rowIds ;
336349 }
337- this .tombstones .put (baseKey , key .getTimestamp ());
338- return ImmutableList .of (rowId );
339350 }
340- else
351+ finally
341352 {
342- List <Long > rowIds = getRowIds (key );
343- if (rowIds .isEmpty ())
344- {
345- return ImmutableList .of ();
346- }
347- tombstones .put (baseKey , key .getTimestamp ());
348- return rowIds ;
353+ CompositeKey baseKey = extractBaseKey (key );
354+ ConcurrentSkipListSet <Long > existingTombstones =
355+ this .tombstones .computeIfAbsent (baseKey , k -> new ConcurrentSkipListSet <>());
356+ existingTombstones .add (key .getTimestamp ());
349357 }
350358 }
351359
@@ -369,18 +377,31 @@ public List<Long> purgeEntries(List<IndexProto.IndexKey> indexKeys) throws Singl
369377 for (IndexProto .IndexKey key : indexKeys )
370378 {
371379 CompositeKey baseKey = extractBaseKey (key );
380+ ConcurrentSkipListSet <Long > existingTombstones = this .tombstones .get (baseKey );
381+ if (existingTombstones == null )
382+ {
383+ continue ;
384+ }
372385 // Remove all versions with timestamp <= purgeTimestamp
373- Long tombstone = this . tombstones . get ( baseKey );
386+ Long tombstone = existingTombstones . floor ( key . getTimestamp () );
374387 if (tombstone != null && tombstone <= key .getTimestamp ())
375388 {
376389 if (unique )
377390 {
378391 // timestamp -> row id
379- ConcurrentNavigableMap <Long , Long > versions = this .uniqueIndex .get (baseKey ). headMap ( key . getTimestamp (), true );
380- if (! versions . isEmpty () )
392+ ConcurrentSkipListMap <Long , Long > versions = this .uniqueIndex .get (baseKey );
393+ if (versions != null )
381394 {
382- builder .addAll (versions .values ());
383- versions .clear ();
395+ ConcurrentNavigableMap <Long , Long > purging = versions .headMap (tombstone , true );
396+ if (!purging .isEmpty ())
397+ {
398+ builder .addAll (purging .values ());
399+ purging .clear ();
400+ }
401+ if (versions .isEmpty ())
402+ {
403+ this .uniqueIndex .remove (baseKey );
404+ }
384405 }
385406 }
386407 else
@@ -389,20 +410,33 @@ public List<Long> purgeEntries(List<IndexProto.IndexKey> indexKeys) throws Singl
389410 ConcurrentHashMap <Long , ConcurrentSkipListSet <Long >> rowIds = this .nonUniqueIndex .get (baseKey );
390411 if (rowIds != null )
391412 {
392- for (Map .Entry <Long , ConcurrentSkipListSet <Long >> entry : rowIds .entrySet ())
413+ for (Iterator < Map .Entry <Long , ConcurrentSkipListSet <Long >>> it = rowIds .entrySet (). iterator (); it . hasNext (); )
393414 {
394- NavigableSet <Long > versions = entry .getValue ().headSet (key .getTimestamp (), true );
415+ Map .Entry <Long , ConcurrentSkipListSet <Long >> entry = it .next ();
416+ NavigableSet <Long > versions = entry .getValue ().headSet (tombstone , true );
395417 if (!versions .isEmpty ())
396418 {
397419 builder .add (entry .getKey ());
398420 versions .clear ();
399421 }
422+ if (entry .getValue ().isEmpty ())
423+ {
424+ it .remove ();
425+ }
400426 }
401427 builder .addAll (rowIds .keySet ());
428+ if (rowIds .isEmpty ())
429+ {
430+ this .nonUniqueIndex .remove (baseKey );
431+ }
402432 }
403433 }
434+ existingTombstones .headSet (key .getTimestamp (), true ).clear ();
435+ if (existingTombstones .isEmpty ())
436+ {
437+ this .tombstones .remove (baseKey );
438+ }
404439 }
405- tombstones .remove (baseKey );
406440 }
407441 return builder .build ();
408442 }
@@ -478,16 +512,11 @@ private CompositeKey extractBaseKey(IndexProto.IndexKey key)
478512 /**
479513 * Find the latest visible version for a unique index.
480514 * @param baseKey the base key of the index entry
481- * @param snapshotTimestamp the snapshot timestamp
515+ * @param snapshotTimestamp the snapshot timestamp of the transaction
482516 * @return the latest visible row id, or -1 if no row id is visible
483517 */
484518 private long findUniqueRowId (CompositeKey baseKey , long snapshotTimestamp )
485519 {
486- // Check if this version is visible (not deleted by a tombstone)
487- if (!isVersionVisible (baseKey , snapshotTimestamp ))
488- {
489- return -1 ;
490- }
491520 ConcurrentSkipListMap <Long , Long > versions = this .uniqueIndex .get (baseKey );
492521 if (versions == null )
493522 {
@@ -499,38 +528,59 @@ private long findUniqueRowId(CompositeKey baseKey, long snapshotTimestamp)
499528 {
500529 return -1 ;
501530 }
531+ // Check if this version is visible (not deleted by a tombstone)
532+ if (!isVersionVisible (baseKey , versionEntry .getKey (), snapshotTimestamp ))
533+ {
534+ return -1 ;
535+ }
502536 return versionEntry .getValue ();
503537 }
504538
505539 /**
506540 * Find the latest visible version for a non-unique index.
541+ * @param baseKey the base key of the index entry
542+ * @param snapshotTimestamp the snapshot timestamp of the transaction
543+ * @return the latest visible row ids, or empty if no row id is visible
507544 */
508545 private List <Long > findNonUniqueRowIds (CompositeKey baseKey , long snapshotTimestamp )
509546 {
510547 ConcurrentHashMap <Long , ConcurrentSkipListSet <Long >> rowIds = this .nonUniqueIndex .get (baseKey );
511- if (rowIds == null || ! isVersionVisible ( baseKey , snapshotTimestamp ) )
548+ if (rowIds == null )
512549 {
513550 return ImmutableList .of ();
514551 }
515552 ImmutableList .Builder <Long > builder = ImmutableList .builder ();
516553 for (Map .Entry <Long , ConcurrentSkipListSet <Long >> entry : rowIds .entrySet ())
517554 {
518555 Long version = entry .getValue ().floor (snapshotTimestamp );
519- if (version != null )
556+ if (version != null && isVersionVisible ( baseKey , version , snapshotTimestamp ) )
520557 {
521558 builder .add (entry .getKey ());
522559 }
523560 }
524561 return builder .build ();
525562 }
526563
527- private boolean isVersionVisible (CompositeKey baseKey , long snapshotTimestamp )
564+ /**
565+ * Check is this version of index record is visible (i.e., not marked deleted by a tombstone
566+ * with timestamp >= the version's timestamp)
567+ * @param baseKey the key of the index record
568+ * @param versionTimestamp the version of the index record
569+ * @param snapshotTimestamp the snapshot timestamp of the transaction
570+ * @return true if this index record version is visible
571+ */
572+ private boolean isVersionVisible (CompositeKey baseKey , long versionTimestamp , long snapshotTimestamp )
528573 {
529- Long tombstone = tombstones .get (baseKey );
574+ ConcurrentSkipListSet <Long > existingTombstones = tombstones .get (baseKey );
575+ if (existingTombstones == null )
576+ {
577+ return true ;
578+ }
579+ Long tombstone = existingTombstones .floor (snapshotTimestamp );
530580 if (tombstone == null )
531581 {
532582 return true ;
533583 }
534- return tombstone > snapshotTimestamp ;
584+ return tombstone < versionTimestamp || snapshotTimestamp < tombstone ;
535585 }
536586}
0 commit comments