Skip to content

Commit

Permalink
Remove the unused max_num_stream Pipeline parameter. Deprecate `max…
Browse files Browse the repository at this point in the history
…_streams` in Python. (NVIDIA#5720)

* Remove the unused max_num_stream Pipeline parameter.
* Deprecate max_streams argument in Python API.

---------

Signed-off-by: Michał Zientkiewicz <[email protected]>
  • Loading branch information
mzient authored Nov 21, 2024
1 parent 79d4bf0 commit 36ad55e
Show file tree
Hide file tree
Showing 15 changed files with 31 additions and 76 deletions.
6 changes: 1 addition & 5 deletions dali/benchmark/decoder_bench.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2017-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright (c) 2017-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -122,7 +122,6 @@ BENCHMARK_DEFINE_F(DecoderBench, ImageDecoder_GPU)(benchmark::State& st) {
OpSpec("ImageDecoder")
.AddArg("device", "mixed")
.AddArg("output_type", img_type)
.AddArg("max_streams", num_thread)
.AddArg("hybrid_huffman_threshold", std::numeric_limits<unsigned int>::max())
.AddArg("use_batched_decode", false)
.AddInput("raw_jpegs", "cpu")
Expand All @@ -145,7 +144,6 @@ BENCHMARK_DEFINE_F(DecoderBench, ImageDecoderCachedThreshold_GPU)(benchmark::Sta
OpSpec("ImageDecoder")
.AddArg("device", "mixed")
.AddArg("output_type", img_type)
.AddArg("max_streams", num_thread)
.AddArg("use_batched_decode", false)
.AddArg("cache_size", 1000) // megabytes
.AddArg("cache_threshold", 250*250*3)
Expand All @@ -170,7 +168,6 @@ BENCHMARK_DEFINE_F(DecoderBench, ImageDecoderCachedLargest_GPU)(benchmark::State
OpSpec("ImageDecoder")
.AddArg("device", "mixed")
.AddArg("output_type", img_type)
.AddArg("max_streams", num_thread)
.AddArg("use_batched_decode", false)
.AddArg("cache_size", 1000) // megabytes
.AddArg("cache_type", "largest")
Expand All @@ -194,7 +191,6 @@ BENCHMARK_DEFINE_F(DecoderBench, ImageDecoderBatched_GPU)(benchmark::State& st)
OpSpec("ImageDecoder")
.AddArg("device", "mixed")
.AddArg("output_type", img_type)
.AddArg("max_streams", num_thread)
.AddArg("use_batched_decode", true)
.AddInput("raw_jpegs", "cpu")
.AddOutput("images", "gpu"));
Expand Down
1 change: 0 additions & 1 deletion dali/benchmark/resnet50_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,6 @@ BENCHMARK_DEFINE_F(RN50, nvJPEGPipe)(benchmark::State& st) { // NOLINT
OpSpec("ImageDecoder")
.AddArg("device", "mixed")
.AddArg("output_type", img_type)
.AddArg("max_streams", num_thread)
.AddArg("use_batched_decode", false)
.AddInput("raw_jpegs", "cpu")
.AddOutput("images", "gpu"));
Expand Down
3 changes: 1 addition & 2 deletions dali/benchmark/resnet50_nvjpeg_bench.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2017-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright (c) 2017-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -49,7 +49,6 @@ BENCHMARK_DEFINE_F(RealRN50, nvjpegPipe)(benchmark::State& st) { // NOLINT
OpSpec("ImageDecoder")
.AddArg("device", "mixed")
.AddArg("output_type", img_type)
.AddArg("max_streams", num_thread)
.AddArg("use_batched_decode", false)
.AddInput("raw_jpegs", "cpu")
.AddOutput("images", "gpu"));
Expand Down
3 changes: 1 addition & 2 deletions dali/pipeline/executor/async_pipelined_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ class DLL_PUBLIC AsyncPipelinedExecutor : public PipelinedExecutor {
public:
DLL_PUBLIC inline AsyncPipelinedExecutor(int batch_size, int num_thread, int device_id,
size_t bytes_per_sample_hint, bool set_affinity = false,
int max_num_stream = -1,
QueueSizes prefetch_queue_depth = QueueSizes{2, 2})
: PipelinedExecutor(batch_size, num_thread, device_id, bytes_per_sample_hint, set_affinity,
max_num_stream, prefetch_queue_depth),
prefetch_queue_depth),
cpu_thread_(device_id, set_affinity, "CPU executor"),
mixed_thread_(device_id, set_affinity, "Mixed executor"),
gpu_thread_(device_id, set_affinity, "GPU executor") {}
Expand Down
5 changes: 2 additions & 3 deletions dali/pipeline/executor/async_separated_pipelined_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ class DLL_PUBLIC AsyncSeparatedPipelinedExecutor : public SeparatedPipelinedExec
public:
DLL_PUBLIC inline AsyncSeparatedPipelinedExecutor(
int batch_size, int num_thread, int device_id, size_t bytes_per_sample_hint,
bool set_affinity = false, int max_num_stream = -1,
QueueSizes prefetch_queue_depth = QueueSizes{2, 2})
bool set_affinity = false, QueueSizes prefetch_queue_depth = QueueSizes{2, 2})
: SeparatedPipelinedExecutor(batch_size, num_thread, device_id, bytes_per_sample_hint,
set_affinity, max_num_stream, prefetch_queue_depth),
set_affinity, prefetch_queue_depth),
cpu_thread_(device_id, set_affinity, "CPU executor"),
mixed_thread_(device_id, set_affinity, "Mixed executor"),
gpu_thread_(device_id, set_affinity, "GPU executor") {}
Expand Down
4 changes: 1 addition & 3 deletions dali/pipeline/executor/executor_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ namespace {

auto MakeExec2Config(int batch_size, int num_thread, int device_id,
size_t bytes_per_sample_hint, bool set_affinity,
int max_num_stream,
QueueSizes prefetch_queue_depth) {
exec2::Executor2::Config cfg{};
cfg.async_output = false;
Expand Down Expand Up @@ -90,12 +89,11 @@ std::unique_ptr<ExecutorBase> GetExecutorImpl(
std::unique_ptr<ExecutorBase> GetExecutor(bool pipelined, bool separated, bool async, bool dynamic,
int batch_size, int num_thread, int device_id,
size_t bytes_per_sample_hint, bool set_affinity,
int max_num_stream,
QueueSizes prefetch_queue_depth) {
return GetExecutorImpl(
pipelined, separated, async, dynamic,
batch_size, num_thread, device_id, bytes_per_sample_hint, set_affinity,
max_num_stream, prefetch_queue_depth);
prefetch_queue_depth);
}

} // namespace dali
1 change: 0 additions & 1 deletion dali/pipeline/executor/executor_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ DLL_PUBLIC
std::unique_ptr<ExecutorBase> GetExecutor(bool pipelined, bool separated, bool async, bool dynamic,
int batch_size, int num_thread, int device_id,
size_t bytes_per_sample_hint, bool set_affinity = false,
int max_num_stream = -1,
QueueSizes prefetch_queue_depth = QueueSizes{2, 2});

} // namespace dali
Expand Down
1 change: 0 additions & 1 deletion dali/pipeline/executor/executor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ class DLL_PUBLIC Executor : public ExecutorBase, public QueuePolicy {
public:
DLL_PUBLIC inline Executor(int max_batch_size, int num_thread, int device_id,
size_t bytes_per_sample_hint, bool set_affinity = false,
int max_num_stream = -1,
QueueSizes prefetch_queue_depth = QueueSizes{2, 2})
: max_batch_size_(max_batch_size),
device_id_(device_id),
Expand Down
3 changes: 1 addition & 2 deletions dali/pipeline/executor/pipelined_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@ class DLL_PUBLIC PipelinedExecutorImpl : public Executor<WorkspacePolicy, QueueP
public:
DLL_PUBLIC inline PipelinedExecutorImpl(int batch_size, int num_thread, int device_id,
size_t bytes_per_sample_hint, bool set_affinity = false,
int max_num_stream = -1,
QueueSizes prefetch_queue_depth = {2, 2})
: Executor<WorkspacePolicy, QueuePolicy>(batch_size, num_thread, device_id,
bytes_per_sample_hint, set_affinity, max_num_stream,
bytes_per_sample_hint, set_affinity,
prefetch_queue_depth) {
}

Expand Down
24 changes: 5 additions & 19 deletions dali/pipeline/pipeline.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,18 @@ void InitializeMemoryResources() {
Pipeline::Pipeline(int max_batch_size, int num_threads, int device_id, int64_t seed,
bool pipelined_execution, int prefetch_queue_depth,
bool async_execution, bool dynamic_execution, size_t bytes_per_sample_hint,
bool set_affinity, int max_num_stream) {
bool set_affinity) {
InitializeMemoryResources();
Init(max_batch_size, num_threads, device_id, seed, pipelined_execution, separated_execution_,
async_execution, dynamic_execution, bytes_per_sample_hint, set_affinity, max_num_stream,
async_execution, dynamic_execution, bytes_per_sample_hint, set_affinity,
QueueSizes{prefetch_queue_depth});
}

Pipeline::Pipeline(const string &serialized_pipe,
int batch_size, int num_threads, int device_id,
bool pipelined_execution, int prefetch_queue_depth,
bool async_execution, bool dynamic_execution,
size_t bytes_per_sample_hint, bool set_affinity, int max_num_stream,
size_t bytes_per_sample_hint, bool set_affinity,
int64_t seed) {
InitializeMemoryResources();
dali_proto::PipelineDef def;
Expand Down Expand Up @@ -141,7 +141,6 @@ Pipeline::Pipeline(const string &serialized_pipe,
dynamic_execution,
bytes_per_sample_hint,
set_affinity,
max_num_stream,
QueueSizes{prefetch_queue_depth});

// from serialized pipeline, construct new pipeline
Expand Down Expand Up @@ -179,14 +178,13 @@ Pipeline::~Pipeline() {
void Pipeline::Init(int max_batch_size, int num_threads, int device_id, int64_t seed,
bool pipelined_execution, bool separated_execution,
bool async_execution, bool dynamic_execution,
size_t bytes_per_sample_hint, bool set_affinity, int max_num_stream,
size_t bytes_per_sample_hint, bool set_affinity,
QueueSizes prefetch_queue_depth) {
DALI_ENFORCE(device_id == CPU_ONLY_DEVICE_ID || cuInitChecked(),
"You are trying to create a GPU DALI pipeline, while CUDA is not available. "
"Please install CUDA or set `device_id = None` in Pipeline constructor. "
"If running inside Docker container, you may need to use `--gpus` option.");

// guard cudaDeviceGetStreamPriorityRange call
DeviceGuard g(device_id);
this->max_batch_size_ = max_batch_size;
this->num_threads_ = num_threads;
Expand All @@ -198,22 +196,10 @@ void Pipeline::Init(int max_batch_size, int num_threads, int device_id, int64_t
this->dynamic_execution_ = dynamic_execution;
this->bytes_per_sample_hint_ = bytes_per_sample_hint;
this->set_affinity_ = set_affinity;
this->max_num_stream_ = max_num_stream;
this->prefetch_queue_depth_ = prefetch_queue_depth;
this->separated_execution_ = (prefetch_queue_depth.cpu_size != prefetch_queue_depth.gpu_size);
DALI_ENFORCE(max_batch_size_ > 0, "Max batch size must be greater than 0");

int lowest_cuda_stream_priority = 0, highest_cuda_stream_priority = 0;
// do it only for the GPU pipeline
if (device_id != CPU_ONLY_DEVICE_ID) {
CUDA_CALL(cudaDeviceGetStreamPriorityRange(&lowest_cuda_stream_priority,
&highest_cuda_stream_priority));
}
const auto min_priority_value =
std::min(lowest_cuda_stream_priority, highest_cuda_stream_priority);
const auto max_priority_value =
std::max(lowest_cuda_stream_priority, highest_cuda_stream_priority);

seed_.resize(MAX_SEEDS);
current_seed_ = 0;
std::seed_seq ss{this->original_seed_};
Expand Down Expand Up @@ -463,7 +449,7 @@ void Pipeline::Build(std::vector<PipelineOutputDesc> output_descs) {
executor_ =
GetExecutor(pipelined_execution_, separated_execution_, async_execution_, dynamic_execution_,
max_batch_size_, num_threads_, device_id_, bytes_per_sample_hint_, set_affinity_,
max_num_stream_, prefetch_queue_depth_);
prefetch_queue_depth_);
executor_->EnableMemoryStats(enable_memory_stats_);
executor_->EnableCheckpointing(checkpointing_);
executor_->Init();
Expand Down
10 changes: 2 additions & 8 deletions dali/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,17 @@ class DLL_PUBLIC Pipeline {
* Defaults to 0. Ignored when dynamic_execution is true.
* @param set_affinity indicates whether thread affinity should be
* configured in the thread pool. Defaults to 'false'.
* @param max_num_stream set an upper limit on the number of cudaStreams
* that can be allocated by the pipeline.
*/
DLL_PUBLIC Pipeline(int max_batch_size, int num_threads, int device_id, int64_t seed = -1,
bool pipelined_execution = true, int prefetch_queue_depth = 2,
bool async_execution = true, bool dynamic_execution = false,
size_t bytes_per_sample_hint = 0, bool set_affinity = false,
int max_num_stream = -1);
size_t bytes_per_sample_hint = 0, bool set_affinity = false);

DLL_PUBLIC Pipeline(const string &serialized_pipe,
int max_batch_size = -1, int num_threads = -1, int device_id = -1,
bool pipelined_execution = true, int prefetch_queue_depth = 2,
bool async_execution = true, bool dynamic_execution = false,
size_t bytes_per_sample_hint = 0, bool set_affinity = false,
int max_num_stream = -1,
int64_t seed = -1);

virtual DLL_PUBLIC ~Pipeline();
Expand Down Expand Up @@ -584,8 +580,7 @@ class DLL_PUBLIC Pipeline {
*/
void Init(int batch_size, int num_threads, int device_id, int64_t seed, bool pipelined_execution,
bool separated_execution, bool async_execution, bool dynamic_execution,
size_t bytes_per_sample_hint,
bool set_affinity, int max_num_stream,
size_t bytes_per_sample_hint, bool set_affinity,
QueueSizes prefetch_queue_depth = QueueSizes{2});

struct EdgeMeta {
Expand Down Expand Up @@ -713,7 +708,6 @@ class DLL_PUBLIC Pipeline {
bool dynamic_execution_ = false;
size_t bytes_per_sample_hint_ = 0;
int set_affinity_ = 0;
int max_num_stream_ = 0;
int next_logical_id_ = 0;
int next_internal_logical_id_ = -1;
QueueSizes prefetch_queue_depth_{};
Expand Down
16 changes: 6 additions & 10 deletions dali/python/backend_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2024,12 +2024,11 @@ PYBIND11_MODULE(backend_impl, m) {
bool pipelined_execution = true, int prefetch_queue_depth = 2,
bool async_execution = true, bool dynamic_execution = false,
size_t bytes_per_sample_hint = 0,
bool set_affinity = false, int max_num_stream = -1) {
bool set_affinity = false) {
return std::make_unique<PyPipeline>(
batch_size, num_threads, device_id, seed,
pipelined_execution, prefetch_queue_depth, async_execution, dynamic_execution,
bytes_per_sample_hint, set_affinity,
max_num_stream);
bytes_per_sample_hint, set_affinity);
}),
"batch_size"_a,
"num_threads"_a,
Expand All @@ -2040,22 +2039,20 @@ PYBIND11_MODULE(backend_impl, m) {
"exec_async"_a = true,
"exec_dynamic"_a = false,
"bytes_per_sample_hint"_a = 0,
"set_affinity"_a = false,
"max_num_stream"_a = -1
"set_affinity"_a = false
)
// initialize from serialized pipeline
.def(py::init(
[](string serialized_pipe,
int batch_size = -1, int num_threads = -1, int device_id = -1,
bool pipelined_execution = true, int prefetch_queue_depth = 2,
bool async_execution = true, bool dynamic_execution = false,
size_t bytes_per_sample_hint = 0, bool set_affinity = false, int max_num_stream = -1) {
size_t bytes_per_sample_hint = 0, bool set_affinity = false) {
return std::make_unique<PyPipeline>(
serialized_pipe,
batch_size, num_threads, device_id, pipelined_execution,
prefetch_queue_depth, async_execution, dynamic_execution,
bytes_per_sample_hint, set_affinity,
max_num_stream);
bytes_per_sample_hint, set_affinity);
}),
"serialized_pipe"_a,
"batch_size"_a = -1,
Expand All @@ -2066,8 +2063,7 @@ PYBIND11_MODULE(backend_impl, m) {
"exec_async"_a = true,
"exec_dynamic"_a = true,
"bytes_per_sample_hint"_a = 0,
"set_affinity"_a = false,
"max_num_stream"_a = -1
"set_affinity"_a = false
)
.def("AddOperator",
static_cast<int (Pipeline::*)(const OpSpec &, const std::string &)>
Expand Down
21 changes: 7 additions & 14 deletions dali/python/nvidia/dali/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,8 @@ class Pipeline(object):
set_affinity : bool, optional, default = False
Whether to set CPU core affinity to the one closest to the
GPU being used.
max_streams : int, optional, default = -1
Limit the number of CUDA streams used by the executor.
Value of -1 does not impose a limit.
This parameter is currently unused (and behavior of
unrestricted number of streams is assumed).
max_streams : int, deprecated, default = None
Deprecated, this parameter has no effect.
default_cuda_stream_priority : int, optional, default = None
Deprecated, this parameter has no effect.
enable_memory_stats : bool, optional, default = False
Expand Down Expand Up @@ -231,7 +228,7 @@ def __init__(
exec_async=True,
bytes_per_sample=0,
set_affinity=False,
max_streams=-1,
max_streams=None,
default_cuda_stream_priority=None,
*,
enable_memory_stats=False,
Expand All @@ -250,6 +247,8 @@ def __init__(
exec_dynamic = experimental_exec_dynamic
if default_cuda_stream_priority is not None:
_show_warning("The `default_cuda_stream_priority` is deprecated and has no effect.")
if max_streams is not None:
_show_warning("The `max_streams` is deprecated and has no effect.")
self._pipe = None
self._sinks = []
self._max_batch_size = batch_size
Expand Down Expand Up @@ -280,7 +279,6 @@ def __init__(
self._exec_dynamic = exec_dynamic
self._bytes_per_sample = bytes_per_sample
self._set_affinity = set_affinity
self._max_streams = max_streams
self._py_num_workers = py_num_workers
self._py_start_method = py_start_method
if py_callback_pickler is not None and py_start_method == "fork":
Expand Down Expand Up @@ -429,8 +427,8 @@ def set_affinity(self):

@property
def max_streams(self):
"""Reserved for future use."""
return self._max_streams
"""Deprecated, unused; returns -1."""
return -1

@property
def prefetch_queue_depth(self):
Expand Down Expand Up @@ -898,7 +896,6 @@ def _init_pipeline_backend(self):
self._exec_dynamic,
self._bytes_per_sample,
self._set_affinity,
self._max_streams,
)
self._pipe.SetExecutionTypes(self._exec_pipelined, self._exec_separated, self._exec_async)
self._pipe.SetQueueSizes(self._cpu_queue_size, self._gpu_queue_size)
Expand Down Expand Up @@ -1595,7 +1592,6 @@ def deserialize(cls, serialized_pipeline=None, filename=None, **kwargs):
kw.get("exec_dynamic", False),
kw.get("bytes_per_sample", 0),
kw.get("set_affinity", False),
kw.get("max_streams", -1),
)
if pipeline.device_id != types.CPU_ONLY_DEVICE_ID:
b.check_cuda_runtime()
Expand All @@ -1618,7 +1614,6 @@ def deserialize(cls, serialized_pipeline=None, filename=None, **kwargs):
pipeline._exec_async = kw.get("exec_async", True)
pipeline._bytes_per_sample = kw.get("bytes_per_sample", 0)
pipeline._set_affinity = kw.get("set_affinity", False)
pipeline._max_streams = kw.get("max_streams", -1)

return pipeline

Expand All @@ -1641,7 +1636,6 @@ def deserialize_and_build(self, serialized_pipeline):
self._exec_dynamic,
self._bytes_per_sample,
self._set_affinity,
self._max_streams,
)
self._pipe.SetExecutionTypes(self._exec_pipelined, self._exec_separated, self._exec_async)
self._pipe.SetQueueSizes(self._cpu_queue_size, self._gpu_queue_size)
Expand Down Expand Up @@ -2014,7 +2008,6 @@ def pipeline_def(
exec_async: bool = True,
bytes_per_sample: int = 0,
set_affinity: bool = False,
max_streams: int = -1,
enable_memory_stats: bool = False,
enable_checkpointing: bool = False,
checkpoint: Optional[Any] = None,
Expand Down
Loading

0 comments on commit 36ad55e

Please sign in to comment.