@@ -67,17 +67,16 @@ class CompactionCleaner extends TaskHandler {
67
67
private static final Logger LOG = LoggerFactory .getLogger (CompactionCleaner .class .getName ());
68
68
69
69
public CompactionCleaner (HiveConf conf , TxnStore txnHandler ,
70
- MetadataCache metadataCache , boolean metricsEnabled ,
71
- FSRemover fsRemover ) {
70
+ MetadataCache metadataCache , boolean metricsEnabled , FSRemover fsRemover ) {
72
71
super (conf , txnHandler , metadataCache , metricsEnabled , fsRemover );
73
72
}
74
73
75
74
@ Override
76
75
public List <Runnable > getTasks () throws MetaException {
77
76
long minOpenTxnId = txnHandler .findMinOpenTxnIdForCleaner ();
78
- long retentionTime = HiveConf .getBoolVar (conf , HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED )
79
- ? HiveConf .getTimeVar (conf , HIVE_COMPACTOR_CLEANER_RETENTION_TIME , TimeUnit .MILLISECONDS )
80
- : 0 ;
77
+ long retentionTime = HiveConf .getBoolVar (getConf () , HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED )
78
+ ? HiveConf .getTimeVar (getConf () , HIVE_COMPACTOR_CLEANER_RETENTION_TIME , TimeUnit .MILLISECONDS )
79
+ : 0 ;
81
80
List <CompactionInfo > readyToClean = txnHandler .findReadyToClean (minOpenTxnId , retentionTime );
82
81
if (!readyToClean .isEmpty ()) {
83
82
long minTxnIdSeenOpen = Math .min (minOpenTxnId , txnHandler .findMinTxnIdSeenOpen ());
@@ -89,18 +88,17 @@ public List<Runnable> getTasks() throws MetaException {
89
88
// and minTxnIdSeenOpen can be removed and minOpenTxnId can be used instead.
90
89
return readyToClean .stream ().map (ci -> {
91
90
long cleanerWaterMark = (ci .minOpenWriteId >= 0 ) ? ci .nextTxnId + 1 : minTxnIdSeenOpen ;
92
- LOG .info ("Cleaning based on min open txn id: {}" , cleanerWaterMark );
93
91
return ThrowingRunnable .unchecked (() -> clean (ci , cleanerWaterMark , metricsEnabled ));
94
92
}).collect (Collectors .toList ());
95
93
}
96
94
return Collections .emptyList ();
97
95
}
98
96
99
97
private void clean (CompactionInfo ci , long minOpenTxn , boolean metricsEnabled ) throws MetaException {
100
- LOG .info ("Starting cleaning for {}" , ci );
98
+ LOG .info ("Starting cleaning for {} based on min open txnId: {} " , ci , minOpenTxn );
101
99
PerfLogger perfLogger = PerfLogger .getPerfLogger (false );
102
100
String cleanerMetric = MetricsConstants .COMPACTION_CLEANER_CYCLE + "_" +
103
- (!isNull (ci .type ) ? ci .type .toString ().toLowerCase () : null );
101
+ (!isNull (ci .type ) ? ci .type .toString ().toLowerCase () : null );
104
102
try {
105
103
if (metricsEnabled ) {
106
104
perfLogger .perfLogBegin (CompactionCleaner .class .getName (), cleanerMetric );
@@ -111,11 +109,13 @@ private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) t
111
109
Partition p = null ;
112
110
113
111
if (isNull (location )) {
114
- t = metadataCache .computeIfAbsent (ci .getFullTableName (), () -> resolveTable (ci .dbname , ci .tableName ));
112
+ t = metadataCache .computeIfAbsent (ci .getFullTableName (),
113
+ () -> resolveTable (ci .dbname , ci .tableName ));
114
+
115
115
if (isNull (t )) {
116
116
// The table was dropped before we got around to cleaning it.
117
117
LOG .info ("Unable to find table {}, assuming it was dropped. {}" , ci .getFullTableName (),
118
- idWatermark (ci ));
118
+ idWatermark (ci ));
119
119
txnHandler .markCleaned (ci );
120
120
return ;
121
121
}
@@ -129,8 +129,8 @@ private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) t
129
129
p = resolvePartition (ci .dbname , ci .tableName , ci .partName );
130
130
if (isNull (p )) {
131
131
// The partition was dropped before we got around to cleaning it.
132
- LOG .info ("Unable to find partition {}, assuming it was dropped. {}" ,
133
- ci . getFullPartitionName (), idWatermark (ci ));
132
+ LOG .info ("Unable to find partition {}, assuming it was dropped. {}" , ci . getFullPartitionName (),
133
+ idWatermark (ci ));
134
134
txnHandler .markCleaned (ci );
135
135
return ;
136
136
}
@@ -146,8 +146,8 @@ private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) t
146
146
147
147
if (!isNull (t ) || !isNull (ci .partName )) {
148
148
String path = isNull (location )
149
- ? CompactorUtil .resolveStorageDescriptor (t , p ).getLocation ()
150
- : location ;
149
+ ? CompactorUtil .resolveStorageDescriptor (t , p ).getLocation ()
150
+ : location ;
151
151
boolean dropPartition = !isNull (ci .partName ) && isNull (p );
152
152
153
153
//check if partition wasn't re-created
@@ -161,7 +161,7 @@ private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) t
161
161
}
162
162
} catch (Exception e ) {
163
163
LOG .error ("Caught exception when cleaning, unable to complete cleaning of {} due to {}" , ci ,
164
- e .getMessage ());
164
+ e .getMessage ());
165
165
if (metricsEnabled ) {
166
166
Metrics .getOrCreateCounter (MetricsConstants .COMPACTION_CLEANER_FAILURE_COUNTER ).inc ();
167
167
}
@@ -205,9 +205,9 @@ private void cleanUsingLocation(CompactionInfo ci, String path, boolean requires
205
205
206
206
private void cleanUsingAcidDir (CompactionInfo ci , String location , long minOpenTxn ) throws Exception {
207
207
ValidTxnList validTxnList =
208
- TxnUtils .createValidTxnListForCleaner (txnHandler .getOpenTxns (), minOpenTxn , false );
208
+ TxnUtils .createValidTxnListForCleaner (txnHandler .getOpenTxns (), minOpenTxn , false );
209
209
//save it so that getAcidState() sees it
210
- conf .set (ValidTxnList .VALID_TXNS_KEY , validTxnList .writeToString ());
210
+ getConf () .set (ValidTxnList .VALID_TXNS_KEY , validTxnList .writeToString ());
211
211
/*
212
212
* {@code validTxnList} is capped by minOpenTxnGLB so if
213
213
* {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} sees a base/delta
@@ -241,7 +241,8 @@ private void cleanUsingAcidDir(CompactionInfo ci, String location, long minOpenT
241
241
242
242
// Creating 'reader' list since we are interested in the set of 'obsolete' files
243
243
ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList (ci , validTxnList );
244
- Table table = metadataCache .computeIfAbsent (ci .getFullTableName (), () -> resolveTable (ci .dbname , ci .tableName ));
244
+ Table table = metadataCache .computeIfAbsent (ci .getFullTableName (),
245
+ () -> resolveTable (ci .dbname , ci .tableName ));
245
246
LOG .debug ("Cleaning based on writeIdList: {}" , validWriteIdList );
246
247
247
248
boolean success = cleanAndVerifyObsoleteDirectories (ci , location , validWriteIdList , table );
@@ -254,7 +255,8 @@ private void cleanUsingAcidDir(CompactionInfo ci, String location, long minOpenT
254
255
}
255
256
256
257
private LockRequest createLockRequest (CompactionInfo ci ) {
257
- return CompactorUtil .createLockRequest (conf , ci , 0 , LockType .EXCL_WRITE , DataOperationType .DELETE );
258
+ return CompactorUtil .createLockRequest (
259
+ getConf (), ci , 0 , LockType .EXCL_WRITE , DataOperationType .DELETE );
258
260
}
259
261
260
262
private static String idWatermark (CompactionInfo ci ) {
@@ -263,7 +265,7 @@ private static String idWatermark(CompactionInfo ci) {
263
265
264
266
@ Override
265
267
protected ValidReaderWriteIdList getValidCleanerWriteIdList (CompactionInfo ci , ValidTxnList validTxnList )
266
- throws NoSuchTxnException , MetaException {
268
+ throws NoSuchTxnException , MetaException {
267
269
ValidReaderWriteIdList validWriteIdList = super .getValidCleanerWriteIdList (ci , validTxnList );
268
270
/*
269
271
* We need to filter the obsoletes dir list, to only remove directories that were made obsolete by this compaction
@@ -279,11 +281,14 @@ protected ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci, V
279
281
private CleanupRequest getCleaningRequestBasedOnLocation (CompactionInfo ci , String location ) {
280
282
String strIfPurge = ci .getProperty ("ifPurge" );
281
283
boolean ifPurge = strIfPurge != null || Boolean .parseBoolean (ci .getProperty ("ifPurge" ));
282
-
283
284
Path obsoletePath = new Path (location );
285
+
284
286
return new CleanupRequestBuilder ()
285
- .setLocation (location ).setDbName (ci .dbname ).setFullPartitionName (ci .getFullPartitionName ())
286
- .setRunAs (ci .runAs ).setPurge (ifPurge ).setObsoleteDirs (Collections .singletonList (obsoletePath ))
287
- .build ();
287
+ .setLocation (location ).setDbName (ci .dbname )
288
+ .setFullPartitionName (ci .getFullPartitionName ())
289
+ .setRunAs (ci .runAs )
290
+ .setPurge (ifPurge )
291
+ .setObsoleteDirs (Collections .singletonList (obsoletePath ))
292
+ .build ();
288
293
}
289
294
}
0 commit comments