@@ -72,55 +72,30 @@ uint64_t SpillWriteFile::write(std::unique_ptr<folly::IOBuf> iobuf) {
72
72
return writtenBytes;
73
73
}
74
74
75
- SpillWriterBase::SpillWriterBase (
76
- uint64_t writeBufferSize,
77
- uint64_t targetFileSize,
75
+ SpillWriter::SpillWriter (
76
+ const RowTypePtr& type,
77
+ const std::vector<SpillSortKey>& sortingKeys,
78
+ common::CompressionKind compressionKind,
78
79
const std::string& pathPrefix,
80
+ uint64_t targetFileSize,
81
+ uint64_t writeBufferSize,
79
82
const std::string& fileCreateConfig,
80
83
common::UpdateAndCheckSpillLimitCB& updateAndCheckSpillLimitCb,
81
84
memory::MemoryPool* pool,
82
85
folly::Synchronized<common::SpillStats>* stats)
83
- : pool_(pool),
84
- stats_ (stats),
85
- updateAndCheckSpillLimitCb_(updateAndCheckSpillLimitCb),
86
- fileCreateConfig_(fileCreateConfig),
86
+ : type_(type),
87
+ sortingKeys_ (sortingKeys),
88
+ compressionKind_(compressionKind),
87
89
pathPrefix_(pathPrefix),
90
+ targetFileSize_(targetFileSize),
88
91
writeBufferSize_(writeBufferSize),
89
- targetFileSize_(targetFileSize) {}
90
-
91
- SpillFiles SpillWriterBase::finish () {
92
- checkNotFinished ();
93
- SCOPE_EXIT {
94
- finished_ = true ;
95
- };
96
- finishFile ();
97
- return std::move (finishedFiles_);
98
- }
99
-
100
- void SpillWriterBase::finishFile () {
101
- checkNotFinished ();
102
- flush ();
103
- closeFile ();
104
- VELOX_CHECK_NULL (currentFile_);
105
- }
106
-
107
- uint64_t SpillWriterBase::flush () {
108
- if (bufferEmpty ()) {
109
- return 0 ;
110
- }
111
-
112
- auto * file = ensureFile ();
113
- VELOX_CHECK_NOT_NULL (file);
114
- uint64_t writtenBytes{0 };
115
- uint64_t flushTimeNs{0 };
116
- uint64_t writeTimeNs{0 };
117
- flushBuffer (file, writtenBytes, flushTimeNs, writeTimeNs);
118
- updateWriteStats (writtenBytes, flushTimeNs, writeTimeNs);
119
- updateAndCheckSpillLimitCb_ (writtenBytes);
120
- return writtenBytes;
121
- }
92
+ fileCreateConfig_(fileCreateConfig),
93
+ updateAndCheckSpillLimitCb_(updateAndCheckSpillLimitCb),
94
+ pool_(pool),
95
+ serde_(getNamedVectorSerde(VectorSerde::Kind::kPresto )),
96
+ stats_(stats) {}
122
97
123
- SpillWriteFile* SpillWriterBase ::ensureFile () {
98
+ SpillWriteFile* SpillWriter ::ensureFile () {
124
99
if ((currentFile_ != nullptr ) && (currentFile_->size () > targetFileSize_)) {
125
100
closeFile ();
126
101
}
@@ -133,110 +108,63 @@ SpillWriteFile* SpillWriterBase::ensureFile() {
133
108
return currentFile_.get ();
134
109
}
135
110
136
- void SpillWriterBase ::closeFile () {
111
+ void SpillWriter ::closeFile () {
137
112
if (currentFile_ == nullptr ) {
138
113
return ;
139
114
}
140
115
currentFile_->finish ();
141
116
updateSpilledFileStats (currentFile_->size ());
142
- addFinishedFile (currentFile_.get ());
117
+ finishedFiles_.push_back (SpillFileInfo{
118
+ .id = currentFile_->id (),
119
+ .type = type_,
120
+ .path = currentFile_->path (),
121
+ .size = currentFile_->size (),
122
+ .sortingKeys = sortingKeys_,
123
+ .compressionKind = compressionKind_});
143
124
currentFile_.reset ();
144
125
}
145
126
146
- uint64_t SpillWriterBase::writeWithBufferControl (
147
- const std::function<uint64_t ()>& writeCb) {
148
- checkNotFinished ();
149
-
150
- uint64_t timeNs{0 };
151
- uint64_t rowsWritten{0 };
152
- {
153
- NanosecondTimer timer (&timeNs);
154
- rowsWritten = writeCb ();
155
- }
156
- updateAppendStats (rowsWritten, timeNs);
127
+ size_t SpillWriter::numFinishedFiles () const {
128
+ return finishedFiles_.size ();
129
+ }
157
130
158
- if (bufferSize () < writeBufferSize_) {
131
+ uint64_t SpillWriter::flush () {
132
+ if (batch_ == nullptr ) {
159
133
return 0 ;
160
134
}
161
- return flush ();
162
- }
163
-
164
- void SpillWriterBase::updateWriteStats (
165
- uint64_t spilledBytes,
166
- uint64_t flushTimeNs,
167
- uint64_t writeTimeNs) {
168
- auto statsLocked = stats_->wlock ();
169
- statsLocked->spilledBytes += spilledBytes;
170
- statsLocked->spillFlushTimeNanos += flushTimeNs;
171
- statsLocked->spillWriteTimeNanos += writeTimeNs;
172
- ++statsLocked->spillWrites ;
173
- common::updateGlobalSpillWriteStats (spilledBytes, flushTimeNs, writeTimeNs);
174
- }
175
-
176
- void SpillWriterBase::updateSpilledFileStats (uint64_t fileSize) {
177
- ++stats_->wlock ()->spilledFiles ;
178
- addThreadLocalRuntimeStat (
179
- " spillFileSize" , RuntimeCounter (fileSize, RuntimeCounter::Unit::kBytes ));
180
- common::incrementGlobalSpilledFiles ();
181
- }
182
-
183
- void SpillWriterBase::updateAppendStats (
184
- uint64_t numRows,
185
- uint64_t serializationTimeNs) {
186
- auto statsLocked = stats_->wlock ();
187
- statsLocked->spilledRows += numRows;
188
- statsLocked->spillSerializationTimeNanos += serializationTimeNs;
189
- common::updateGlobalSpillAppendStats (numRows, serializationTimeNs);
190
- }
191
135
192
- SpillWriter::SpillWriter (
193
- const RowTypePtr& type,
194
- const std::vector<SpillSortKey>& sortingKeys,
195
- common::CompressionKind compressionKind,
196
- const std::string& pathPrefix,
197
- uint64_t targetFileSize,
198
- uint64_t writeBufferSize,
199
- const std::string& fileCreateConfig,
200
- common::UpdateAndCheckSpillLimitCB& updateAndCheckSpillLimitCb,
201
- memory::MemoryPool* pool,
202
- folly::Synchronized<common::SpillStats>* stats)
203
- : SpillWriterBase(
204
- writeBufferSize,
205
- targetFileSize,
206
- pathPrefix,
207
- fileCreateConfig,
208
- updateAndCheckSpillLimitCb,
209
- pool,
210
- stats),
211
- type_(type),
212
- sortingKeys_(sortingKeys),
213
- compressionKind_(compressionKind),
214
- serde_(getNamedVectorSerde(VectorSerde::Kind::kPresto )) {}
136
+ auto * file = ensureFile ();
137
+ VELOX_CHECK_NOT_NULL (file);
215
138
216
- void SpillWriter::flushBuffer (
217
- SpillWriteFile* file,
218
- uint64_t & writtenBytes,
219
- uint64_t & flushTimeNs,
220
- uint64_t & writeTimeNs) {
221
139
IOBufOutputStream out (
222
140
*pool_, nullptr , std::max<int64_t >(64 * 1024 , batch_->size ()));
141
+ uint64_t flushTimeNs{0 };
223
142
{
224
143
NanosecondTimer timer (&flushTimeNs);
225
144
batch_->flush (&out);
226
145
}
227
146
batch_.reset ();
228
147
148
+ uint64_t writeTimeNs{0 };
149
+ uint64_t writtenBytes{0 };
229
150
auto iobuf = out.getIOBuf ();
230
151
{
231
152
NanosecondTimer timer (&writeTimeNs);
232
153
writtenBytes = file->write (std::move (iobuf));
233
154
}
155
+ updateWriteStats (writtenBytes, flushTimeNs, writeTimeNs);
156
+ updateAndCheckSpillLimitCb_ (writtenBytes);
157
+ return writtenBytes;
234
158
}
235
159
236
160
uint64_t SpillWriter::write (
237
161
const RowVectorPtr& rows,
238
162
const folly::Range<IndexRange*>& indices) {
239
- return writeWithBufferControl ([&]() {
163
+ checkNotFinished ();
164
+
165
+ uint64_t timeNs{0 };
166
+ {
167
+ NanosecondTimer timer (&timeNs);
240
168
if (batch_ == nullptr ) {
241
169
serializer::presto::PrestoVectorSerde::PrestoOptions options = {
242
170
kDefaultUseLosslessTimestamp ,
@@ -250,18 +178,56 @@ uint64_t SpillWriter::write(
250
178
&options);
251
179
}
252
180
batch_->append (rows, indices);
253
- return rows->size ();
254
- });
181
+ }
182
+ updateAppendStats (rows->size (), timeNs);
183
+ if (batch_->size () < writeBufferSize_) {
184
+ return 0 ;
185
+ }
186
+ return flush ();
255
187
}
256
188
257
- void SpillWriter::addFinishedFile (SpillWriteFile* file) {
258
- finishedFiles_.push_back (SpillFileInfo{
259
- .id = file->id (),
260
- .type = type_,
261
- .path = file->path (),
262
- .size = file->size (),
263
- .sortingKeys = sortingKeys_,
264
- .compressionKind = compressionKind_});
189
+ void SpillWriter::updateAppendStats (
190
+ uint64_t numRows,
191
+ uint64_t serializationTimeNs) {
192
+ auto statsLocked = stats_->wlock ();
193
+ statsLocked->spilledRows += numRows;
194
+ statsLocked->spillSerializationTimeNanos += serializationTimeNs;
195
+ common::updateGlobalSpillAppendStats (numRows, serializationTimeNs);
196
+ }
197
+
198
+ void SpillWriter::updateWriteStats (
199
+ uint64_t spilledBytes,
200
+ uint64_t flushTimeNs,
201
+ uint64_t fileWriteTimeNs) {
202
+ auto statsLocked = stats_->wlock ();
203
+ statsLocked->spilledBytes += spilledBytes;
204
+ statsLocked->spillFlushTimeNanos += flushTimeNs;
205
+ statsLocked->spillWriteTimeNanos += fileWriteTimeNs;
206
+ ++statsLocked->spillWrites ;
207
+ common::updateGlobalSpillWriteStats (
208
+ spilledBytes, flushTimeNs, fileWriteTimeNs);
209
+ }
210
+
211
+ void SpillWriter::updateSpilledFileStats (uint64_t fileSize) {
212
+ ++stats_->wlock ()->spilledFiles ;
213
+ addThreadLocalRuntimeStat (
214
+ " spillFileSize" , RuntimeCounter (fileSize, RuntimeCounter::Unit::kBytes ));
215
+ common::incrementGlobalSpilledFiles ();
216
+ }
217
+
218
+ void SpillWriter::finishFile () {
219
+ checkNotFinished ();
220
+ flush ();
221
+ closeFile ();
222
+ VELOX_CHECK_NULL (currentFile_);
223
+ }
224
+
225
+ SpillFiles SpillWriter::finish () {
226
+ checkNotFinished ();
227
+ auto finishGuard = folly::makeGuard ([this ]() { finished_ = true ; });
228
+
229
+ finishFile ();
230
+ return std::move (finishedFiles_);
265
231
}
266
232
267
233
std::vector<std::string> SpillWriter::testingSpilledFilePaths () const {
0 commit comments