Skip to content

Commit 94cccad

Browse files
ivosonueshin
authored andcommitted
[SPARK-53755][CORE] Add log support in BlockManager
### What changes were proposed in this pull request? Add log support in `BlockManager` to help collecting and analyzing logs for live debug. A few new components have been introduced to do that. 1. `LogBlockId` to represent log data, which can be used for log data filter. And `LogBlockType` to identify different kinds of logs. 2. `LogLine` to define the log structure. 3. `LogBlockWriter` to help to write a single log block. 4. `RollingLogWriter` to help to write logs in a rolling manner, split blocks by size(approximately). Usage Example: ``` // Define a blockId generator which can help to generate unique block ids. val logBlockIdGenerator = new LogBlockIdGenerator { override def logBlockType: LogBlockType = LogBlockType.TEST override protected def genUniqueBlockId( lastLogTime: Long, executorId: String): LogBlockId = { TestLogBlockId(lastLogTime, executorId) } } // Get a log writer and write logs. val logBlockWriter = store.getRollingLogWriter[LogLine](logBlockIdGenerator) logBlockWriter.writeLog(LogLine(0L, 1, "Log message 1")) logBlockWriter.writeLog(LogLine(1L, 2, "Log message 2")) logBlockWriter.writeLog(LogLine(2L, 3, "Log message 3")) logBlockWriter.writeLog(LogLine(3L, 4, "Log message 4")) // Close writer after writing all logs. logBlockWriter.close() ``` ### Why are the changes needed? Collect and analyze logs with spark jobs to make live debug easier. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT added. ### Was this patch authored or co-authored using generative AI tooling? No Closes #52643 from ivoson/SPARK-53755. Authored-by: Tengfei Huang <[email protected]> Signed-off-by: Takuya Ueshin <[email protected]>
1 parent 37ee992 commit 94cccad

File tree

9 files changed

+691
-6
lines changed

9 files changed

+691
-6
lines changed

core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,8 @@ private[spark] class SerializerManager(
172172
outputStream: OutputStream,
173173
values: Iterator[T]): Unit = {
174174
val byteStream = new BufferedOutputStream(outputStream)
175-
val autoPick = !blockId.isInstanceOf[StreamBlockId]
176-
val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance()
177-
ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
175+
blockSerializationStream[T](blockId, byteStream)(implicitly[ClassTag[T]])
176+
.writeAll(values).close()
178177
}
179178

180179
/** Serializes into a chunked byte buffer. */
@@ -212,4 +211,14 @@ private[spark] class SerializerManager(
212211
.deserializeStream(wrapForCompression(blockId, stream))
213212
.asIterator.asInstanceOf[Iterator[T]]
214213
}
214+
215+
/** Generate a `SerializationStream` for a block. */
216+
private[spark] def blockSerializationStream[T](
217+
blockId: BlockId,
218+
outputStream: OutputStream)
219+
(classTag: ClassTag[T]): SerializationStream = {
220+
val autoPick = !blockId.isInstanceOf[StreamBlockId]
221+
val ser = getSerializer(classTag, autoPick).newInstance()
222+
ser.serializeStream(wrapForCompression(blockId, outputStream))
223+
}
215224
}

core/src/main/scala/org/apache/spark/storage/BlockId.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.spark.SparkException
2323
import org.apache.spark.annotation.{DeveloperApi, Since}
2424
import org.apache.spark.errors.SparkCoreErrors
2525
import org.apache.spark.network.shuffle.RemoteBlockPushResolver
26+
import org.apache.spark.storage.LogBlockType.LogBlockType
2627

2728
/**
2829
* :: DeveloperApi ::
@@ -175,6 +176,42 @@ case class PythonStreamBlockId(streamId: Int, uniqueId: Long) extends BlockId {
175176
override def name: String = "python-stream-" + streamId + "-" + uniqueId
176177
}
177178

179+
object LogBlockType extends Enumeration {
180+
type LogBlockType = Value
181+
val TEST = Value
182+
}
183+
184+
/**
185+
* Identifies a block of log data.
186+
*
187+
* @param lastLogTime the timestamp of the last log entry in this block, used for filtering
188+
* and log management.
189+
* @param executorId the ID of the executor that produced this log block.
190+
*/
191+
abstract sealed class LogBlockId(
192+
val lastLogTime: Long,
193+
val executorId: String) extends BlockId {
194+
def logBlockType: LogBlockType
195+
}
196+
197+
object LogBlockId {
198+
def empty(logBlockType: LogBlockType): LogBlockId = {
199+
logBlockType match {
200+
case LogBlockType.TEST => TestLogBlockId(0L, "")
201+
case _ => throw new SparkException(s"Unsupported log block type: $logBlockType")
202+
}
203+
}
204+
}
205+
206+
// Used for test purpose only.
207+
case class TestLogBlockId(override val lastLogTime: Long, override val executorId: String)
208+
extends LogBlockId(lastLogTime, executorId) {
209+
override def name: String =
210+
"test_log_" + lastLogTime + "_" + executorId
211+
212+
override def logBlockType: LogBlockType = LogBlockType.TEST
213+
}
214+
178215
/** Id associated with temporary local data managed as blocks. Not serializable. */
179216
private[spark] case class TempLocalBlockId(id: UUID) extends BlockId {
180217
override def name: String = "temp_local_" + id
@@ -222,6 +259,7 @@ object BlockId {
222259
val TEMP_LOCAL = "temp_local_([-A-Fa-f0-9]+)".r
223260
val TEMP_SHUFFLE = "temp_shuffle_([-A-Fa-f0-9]+)".r
224261
val TEST = "test_(.*)".r
262+
val TEST_LOG_BLOCK = "test_log_([0-9]+)_(.*)".r
225263

226264
def apply(name: String): BlockId = name match {
227265
case RDD(rddId, splitIndex) =>
@@ -262,6 +300,8 @@ object BlockId {
262300
TempLocalBlockId(UUID.fromString(uuid))
263301
case TEMP_SHUFFLE(uuid) =>
264302
TempShuffleBlockId(UUID.fromString(uuid))
303+
case TEST_LOG_BLOCK(lastLogTime, executorId) =>
304+
TestLogBlockId(lastLogTime.toLong, executorId)
265305
case TEST(value) =>
266306
TestBlockId(value)
267307
case _ => throw SparkCoreErrors.unrecognizedBlockIdError(name)

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ import org.apache.spark.scheduler.ExecutorCacheTaskLocation
5858
import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
5959
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, MigratableResolver, ShuffleManager, ShuffleWriteMetricsReporter}
6060
import org.apache.spark.storage.BlockManagerMessages.{DecommissionBlockManager, ReplicateBlock}
61+
import org.apache.spark.storage.LogBlockType.LogBlockType
6162
import org.apache.spark.storage.memory._
6263
import org.apache.spark.unsafe.Platform
6364
import org.apache.spark.util._
@@ -294,12 +295,17 @@ private[spark] class BlockManager(
294295
decommissioner.isDefined
295296
}
296297

297-
@inline final private def checkShouldStore(blockId: BlockId) = {
298+
@inline final private def checkShouldStore(blockId: BlockId, level: StorageLevel) = {
298299
// Don't reject broadcast blocks since they may be stored during task exec and
299300
// don't need to be migrated.
300301
if (isDecommissioning() && !blockId.isBroadcast) {
301302
throw SparkCoreErrors.cannotSaveBlockOnDecommissionedExecutorError(blockId)
302303
}
304+
if (blockId.isInstanceOf[LogBlockId] && level != StorageLevel.DISK_ONLY) {
305+
throw SparkException.internalError(
306+
s"Cannot store log block $blockId with storage level $level. " +
307+
"Log blocks must be stored with DISK_ONLY.")
308+
}
303309
}
304310

305311
// This is a lazy val so someone can migrating RDDs even if they don't have a MigratableResolver
@@ -763,7 +769,7 @@ private[spark] class BlockManager(
763769
level: StorageLevel,
764770
classTag: ClassTag[_]): StreamCallbackWithID = {
765771

766-
checkShouldStore(blockId)
772+
checkShouldStore(blockId, level)
767773

768774
if (blockId.isShuffle) {
769775
logDebug(s"Putting shuffle block ${blockId}")
@@ -1483,6 +1489,26 @@ private[spark] class BlockManager(
14831489
syncWrites, writeMetrics, blockId)
14841490
}
14851491

1492+
/**
1493+
* To get a log block writer that can write logs directly to a disk block. Either `save` or
1494+
* `close` should be called to finish the writing and release opened resources.
1495+
* `save` would write the block to the block manager, while `close` would just close the writer.
1496+
*/
1497+
def getLogBlockWriter(
1498+
logBlockType: LogBlockType): LogBlockWriter = {
1499+
new LogBlockWriter(this, logBlockType, conf)
1500+
}
1501+
1502+
/**
1503+
* To get a rolling log writer that can write logs to block manager and split the logs
1504+
* to multiple blocks if the log size exceeds the threshold.
1505+
*/
1506+
def getRollingLogWriter(
1507+
blockIdGenerator: LogBlockIdGenerator,
1508+
rollingSize: Long = 33554432L): RollingLogWriter = {
1509+
new RollingLogWriter(this, blockIdGenerator, rollingSize)
1510+
}
1511+
14861512
/**
14871513
* Put a new block of serialized bytes to the block manager.
14881514
*
@@ -1540,7 +1566,7 @@ private[spark] class BlockManager(
15401566

15411567
require(blockId != null, "BlockId is null")
15421568
require(level != null && level.isValid, "StorageLevel is null or invalid")
1543-
checkShouldStore(blockId)
1569+
checkShouldStore(blockId, level)
15441570

15451571
val putBlockInfo = {
15461572
val newInfo = new BlockInfo(level, classTag, tellMaster)
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.storage
19+
20+
import org.apache.spark.SparkException
21+
import org.apache.spark.storage.LogBlockType.LogBlockType
22+
23+
/**
24+
* LogBlockIdGenerator is responsible for generating unique LogBlockIds for log blocks.
25+
*/
26+
trait LogBlockIdGenerator {
27+
// The log block type that this generator supports.
28+
def logBlockType: LogBlockType
29+
30+
// Generates a unique LogBlockId based on the last log time and executor ID.
31+
protected def genUniqueBlockId(lastLogTime: Long, executorId: String): LogBlockId
32+
33+
/**
34+
* Generates a new LogBlockId based on the last log time and executor ID. Make sure that
35+
* the generated LogBlockId has the same log block type as this generator.
36+
*
37+
* @param lastLogTime The timestamp of the last log entry.
38+
* @param executorId The ID of the executor generating the log block.
39+
*/
40+
final def nextBlockId(lastLogTime: Long, executorId: String): LogBlockId = {
41+
val blockId = genUniqueBlockId(lastLogTime, executorId)
42+
if (blockId.logBlockType != this.logBlockType) {
43+
throw SparkException.internalError(
44+
"BlockId generated by LogBlockIdGenerator does not match " +
45+
s"the expected log block type: ${blockId.logBlockType} != ${this.logBlockType}")
46+
}
47+
blockId
48+
}
49+
}
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.storage
19+
20+
import java.io.BufferedOutputStream
21+
import java.io.File
22+
import java.io.FileOutputStream
23+
24+
import org.apache.commons.io.output.CountingOutputStream
25+
26+
import org.apache.spark.{SparkConf, SparkException}
27+
import org.apache.spark.internal.Logging
28+
import org.apache.spark.serializer.SerializationStream
29+
import org.apache.spark.storage.LogBlockType.LogBlockType
30+
import org.apache.spark.util.Utils
31+
32+
/**
33+
* A class for writing logs directly to a file on disk and save as a block in BlockManager if
34+
* there are any logs written.
35+
* `save` or `close` must be called to ensure resources are released properly. `save` will add
36+
* the log block to BlockManager, while `close` will just release the resources without saving
37+
* the log block.
38+
*
39+
* Notes:
40+
* - This class does not support concurrent writes.
41+
* - The writer will be automatically closed when failed to write logs or failed to save the
42+
* log block.
43+
* - Write operations after closing will throw exceptions.
44+
*/
45+
private[spark] class LogBlockWriter(
46+
blockManager: BlockManager,
47+
logBlockType: LogBlockType,
48+
sparkConf: SparkConf,
49+
bufferSize: Int = 32 * 1024) extends Logging {
50+
51+
private[storage] var tmpFile: File = null
52+
53+
private var cos: CountingOutputStream = null
54+
private var objOut: SerializationStream = null
55+
private var hasBeenClosed = false
56+
private var recordsWritten = false
57+
private var totalBytesWritten = 0
58+
59+
initialize()
60+
61+
private def initialize(): Unit = {
62+
try {
63+
val dir = new File(Utils.getLocalDir(sparkConf))
64+
tmpFile = File.createTempFile(s"spark_log_$logBlockType", "", dir)
65+
val fos = new FileOutputStream(tmpFile, false)
66+
val bos = new BufferedOutputStream(fos, bufferSize)
67+
cos = new CountingOutputStream(bos)
68+
val emptyBlockId = LogBlockId.empty(logBlockType)
69+
objOut = blockManager
70+
.serializerManager
71+
.blockSerializationStream(emptyBlockId, cos)(LogLine.getClassTag(logBlockType))
72+
} catch {
73+
case e: Exception =>
74+
logError(log"Failed to initialize LogBlockWriter.", e)
75+
close()
76+
throw e
77+
}
78+
}
79+
80+
def bytesWritten(): Int = {
81+
Option(cos)
82+
.map(_.getCount)
83+
.getOrElse(totalBytesWritten)
84+
}
85+
86+
/**
87+
* Write a log entry to the log block. Exception will be thrown if the writer has been closed
88+
* or if there is an error during writing. Caller needs to deal with the exception. Suggest to
89+
* close the writer when exception is thrown as block data could be corrupted which would lead
90+
* to issues when reading the log block later.
91+
*
92+
* @param logEntry The log entry to write.
93+
*/
94+
def writeLog(logEntry: LogLine): Unit = {
95+
if (hasBeenClosed) {
96+
throw SparkException.internalError(
97+
"Writer already closed. Cannot write more data.",
98+
category = "STORAGE"
99+
)
100+
}
101+
102+
try {
103+
objOut.writeObject(logEntry)
104+
recordsWritten = true
105+
} catch {
106+
case e: Exception =>
107+
logError(log"Failed to write log entry.", e)
108+
throw e
109+
}
110+
}
111+
112+
def save(blockId: LogBlockId): Unit = {
113+
if (hasBeenClosed) {
114+
throw SparkException.internalError(
115+
"Writer already closed. Cannot save.",
116+
category = "STORAGE"
117+
)
118+
}
119+
120+
try {
121+
if (blockId.logBlockType != logBlockType) {
122+
throw SparkException.internalError(
123+
s"LogBlockWriter is for $logBlockType, but got blockId $blockId")
124+
}
125+
126+
objOut.flush()
127+
objOut.close()
128+
objOut = null
129+
130+
if(recordsWritten) {
131+
totalBytesWritten = cos.getCount
132+
// Save log block to BlockManager and delete the tmpFile.
133+
val success = saveToBlockManager(blockId, totalBytesWritten)
134+
if (!success) {
135+
throw SparkException.internalError(s"Failed to save log block $blockId to BlockManager")
136+
}
137+
}
138+
} finally {
139+
close()
140+
}
141+
}
142+
143+
def close(): Unit = {
144+
if (hasBeenClosed) {
145+
return
146+
}
147+
148+
try {
149+
if (objOut != null) {
150+
objOut.close()
151+
}
152+
if (tmpFile != null && tmpFile.exists()) {
153+
tmpFile.delete()
154+
}
155+
} catch {
156+
case e: Exception =>
157+
logWarning(log"Failed to close resources of LogBlockWriter", e)
158+
} finally {
159+
objOut = null
160+
cos = null
161+
hasBeenClosed = true
162+
}
163+
}
164+
165+
// For test only.
166+
private[storage] def flush(): Unit = {
167+
if (objOut != null) {
168+
objOut.flush()
169+
}
170+
}
171+
172+
private[storage] def saveToBlockManager(blockId: LogBlockId, blockSize: Long): Boolean = {
173+
blockManager.
174+
TempFileBasedBlockStoreUpdater(
175+
blockId,
176+
StorageLevel.DISK_ONLY,
177+
LogLine.getClassTag(logBlockType),
178+
tmpFile,
179+
blockSize)
180+
.save()
181+
}
182+
}

0 commit comments

Comments
 (0)