Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions csrc/host_ir/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ class HostIrContainer final : public Fusion {
Stream* getDefaultStream();

private:
// Consider using a linkedlist so insertion is faster.
std::vector<Expr*> top_level_exprs_;

// Indexed by group ID. This way, parallel compilation can write to disjoint
// locations without having to precompute a global index.
std::vector<std::unique_ptr<KernelExecutor>> kernel_executors_;

Stream* default_stream_ = nullptr;
};

Expand Down
18 changes: 17 additions & 1 deletion csrc/host_ir/host_ir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <kernel_ir.h>
#include <multidevice/communication.h>
#include <ops/all_ops.h>
#include <transform_replay.h>
#include <utils.h>

namespace nvfuser::hir {
Expand Down Expand Up @@ -450,6 +451,19 @@ std::string ShardByStream::toInlineString(int indent_size) const {
NVF_CHECK(false, "Cannot be printed inline");
}

TensorView* shardByStream(TensorView* in, Val* stream_index) {
auto* out = ops::newValLike(in, *in->getDataType())->as<TensorView>();

TransformReplay::selfReplay(in->domain(), out->domain());
// This is conservative and suboptimal. Consider reusing the algorithm in
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will resolved using #5316?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. It's one of the cases where out's contiguity ought to be different from in due to the slicing effect.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh okay, got it!
So in such cases the replay may in fact overwrite a correct contiguity as most users of selfReplay create the new TensorDomain using ops API, which sets the contiguity correctly. This is something we should consider for #5316.

// https://github.com/NVIDIA/Fuser/blob/33337e9b0b82dc88bc305d9956101f0c8a8a0c60/csrc/alias_analysis.cpp#L199
// to decide contiguity.
out->setAllocationDomain(out->getLoopDomain(), false);

IrBuilder::create<ShardByStream>(out, in, stream_index);
return out;
}

ForLoop::ForLoop(IrBuilderPasskey passkey, Val* index, Val* start, Val* stop)
: Expr(passkey, {index, start, stop}, {}, {}) {
NVF_ERROR(passkey.ir_container_ != nullptr);
Expand All @@ -473,7 +487,9 @@ std::string ForLoop::toInlineString(int indent_size) const {
NVF_CHECK(false, "Cannot be printed inline");
}

ForLoop* createForLoopFromIterDomain(Val* index, IterDomain* iter_domain) {
/*static*/ ForLoop* ForLoop::createFromIterDomain(
Val* index,
IterDomain* iter_domain) {
return IrBuilder::create<ForLoop>(
index, iter_domain->start(), iter_domain->stop());
}
Expand Down
8 changes: 6 additions & 2 deletions csrc/host_ir/host_ir.h
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,10 @@ class ShardByStream : public Expr {
}
};

// Creates a ShardByStream without needing the output TensorView. Returns the
// output TensorView.
TensorView* shardByStream(TensorView* in, Val* stream_index);

class ForLoop : public Expr {
public:
using Expr::Expr;
Expand All @@ -492,6 +496,8 @@ class ForLoop : public Expr {

NVFUSER_DECLARE_CLONE_AND_CREATE

static ForLoop* createFromIterDomain(Val* index, IterDomain* iter_domain);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm on the fence about this. The method is coupled with the ForLoop class so I moved here to save some typing. The downside is less access control because createFromIterDomain could access private fields/methods of ForLoop.


std::string toString(int indent_size = 0) const override;
std::string toInlineString(int indent_size = 0) const override;
const char* getOpString() const override {
Expand Down Expand Up @@ -519,6 +525,4 @@ class ForLoop : public Expr {
}
};

ForLoop* createForLoopFromIterDomain(Val* index, IterDomain* iter_domain);

} // namespace nvfuser::hir
120 changes: 115 additions & 5 deletions csrc/host_ir/lowering.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <host_ir/lower_to_communication.h>
#include <host_ir/lowering.h>
#include <host_ir/pass/insert_deallocations.h>
#include <multidevice/utils.h>
#include <runtime/executor_abstract.h>

namespace nvfuser {
Expand All @@ -36,6 +37,30 @@ void recomputeOutputTvs(Expr* e, IrCloner& ir_cloner) {
}
}

// Finds the stream-parallelized IterDomain in the loop domain of a TensorView,
// or nullptr if not found. This is different from `getShardedIterDomain(tv,
// ParallelType::Stream)`, which searches the allocation domain. Consider
// unifying them into one function with an extra DomainType parameter.
IterDomain* findStreamIterDomain(TensorView* tv) {
const std::vector<IterDomain*>& loop = tv->getLoopDomain();
// FinalizeMultideviceDomains pass puts the stream IterDomain to the
// front.
if (!loop.empty() && loop.front()->isStream()) {
return loop.front();
}
return nullptr;
}

// Finds the stream IterDomain in the outputs of a segment.
IterDomain* findStreamIterDomain(const std::vector<Val*>& outs) {
for (auto* out : ir_utils::filterByType<TensorView>(outs)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we are finding the stream ID in any of the outputs of a segment? Why not use the above variation directly with any of the segment outputs as they must have mapped stream IDs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I'm not sure about CPU-scalar TensorViews from composite ops. But I should probably harden the check to enforce every TensorView to have a Stream IterDomain. Wdyt?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In their blackbox state, it does not look we can currently support SDPA ops, for example. So adding an assert makes sense to signal something is wrong. I guess this is something I need to fix in PropagateShardingsPass also.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In their blackbox state, it does not look we can currently support SDPA ops, for example.

Why not? At least, batch and/or head can be easily parallelized on stream without changing the implementation of the SDPA op, assuming ShardByStreams are added properly of course.

if (auto* stream_id = findStreamIterDomain(out)) {
return stream_id;
}
}
return nullptr;
}

void lowerSegment(
const SegmentedGroup& group,
const AliasInfoMap& aliases,
Expand Down Expand Up @@ -72,15 +97,99 @@ void lowerSegment(
}
} break;
case SchedulerType::ExprEval: {
// push back segment's exprs into the container as top level
// expressions
for (auto* e : group.stablyOrderedExprs()) {
// Pseudocode:
// clang-format off
// ```
// clone all expressions and store the copies to a list
// if no expressions are stream parallelized:
// append the list to the top level
// return
// for each non-input TensorView:
// if it needs an out-of-loop allocation:
// create an Allocate and append it to the top level
// create a new, empty for loop
// for each cloned expression:
// for each input or output TensorView of that expression:
// shard it by stream if it's allocated outside the loop
// add the cloned expression to the loop body with the maybe-sharded inputs and outputs
// ```
// clang-format on
std::vector<Expr*> cloned_exprs;
cloned_exprs.reserve(group.exprs().size());
for (Expr* e : group.stablyOrderedExprs()) {
auto* e_clone = ir_cloner.clone(e);
recomputeOutputTvs(e, ir_cloner);
hic.pushBackTopLevelExprs(e_clone);
cloned_exprs.push_back(e_clone);
}

std::vector<Val*> cloned_outs = ir_cloner.clone(group.outputs());
// All expressions in the group are expected to be stream parallelized in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we enforce this constraint? If so is there an assertion somewhere?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't but we should. I'm waiting for a isResharding-like method to do that easily.

// the same way. So it's safe to find the stream IterDomain from any of
// them. Ideally, loop domains should be tied to expressions not
// TensorViews.
IterDomain* stream_id = findStreamIterDomain(cloned_outs);
if (stream_id == nullptr) {
for (Expr* e : cloned_exprs) {
hic.pushBackTopLevelExprs(e);
}
} else {
for (Expr* e : cloned_exprs) {
for (auto* out : ir_utils::filterByType<TensorView>(e->outputs())) {
if (getShardedIterDomain(out, ParallelType::Stream) == nullptr) {
auto* allocate =
IrBuilder::create<kir::Allocate>(out, MemoryType::Global);
hic.pushBackTopLevelExprs(allocate);
}
}
}

auto* stream_index = IrBuilder::create<Val>(DataType::Index);
auto* for_loop =
hir::ForLoop::createFromIterDomain(stream_index, stream_id);
hic.pushBackTopLevelExprs(for_loop);

std::unordered_map<Val*, Val*> replacement_map;
for (Expr* e : cloned_exprs) {
for (auto ins_or_out :
{ir_utils::filterByType<TensorView>(e->inputs()),
ir_utils::filterByType<TensorView>(e->outputs())}) {
for (auto* tv : ins_or_out) {
if (replacement_map.count(tv) > 0) {
continue;
}
if (findStreamIterDomain(tv) != nullptr &&
getShardedIterDomain(tv, ParallelType::Stream) == nullptr) {
// Loop is stream parallelized but allocation is not.
TensorView* sharded_tv = hir::shardByStream(tv, stream_index);
for_loop->body().push_back(sharded_tv->definition());
replacement_map[tv] = sharded_tv;
}
}
}

std::vector<Val*> new_inputs;
std::transform(
e->inputs().begin(),
e->inputs().end(),
std::back_inserter(new_inputs),
[&replacement_map](Val* input) {
return getOrDefault(replacement_map, input, input);
});
std::vector<Val*> new_outputs;
std::transform(
e->outputs().begin(),
e->outputs().end(),
std::back_inserter(new_outputs),
[&replacement_map](Val* output) {
return getOrDefault(replacement_map, output, output);
});
Expr* new_e = e->newObjectFunc()(
e->container(), new_inputs, new_outputs, e->attributes());
for_loop->body().push_back(new_e);
}
}
} break;
default:
default: {
const int group_id = group.groupId();

// Copy the input/output TensorViews to the container.
Expand Down Expand Up @@ -123,6 +232,7 @@ void lowerSegment(
cloned_outs,
cache_id);
hic.pushBackTopLevelExprs(launch_kernel);
}
}
}
} // namespace
Expand Down
2 changes: 2 additions & 0 deletions csrc/multidevice/multidevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#pragma once

#include <vector>

#include <c10/core/Device.h>

namespace nvfuser {
Expand Down
5 changes: 0 additions & 5 deletions csrc/preseg_passes/finalize_multidevice_domains.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,6 @@ void setLoopAndAllocationDomain(TensorView* tv, bool is_resharding) {
} // namespace

void FinalizeMultideviceDomainsPass::runPass(Fusion* fusion) {
bool has_mesh = validateMeshes(fusion);
if (!has_mesh) {
return;
}

for (Expr* expr : fusion->exprs()) {
auto inputs = ir_utils::filterByType<TensorView>(expr->inputs());
auto outputs = ir_utils::filterByType<TensorView>(expr->outputs());
Expand Down
2 changes: 1 addition & 1 deletion csrc/runtime/fusion_executor_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class NVF_API FusionExecutorCache {
//! what inputs and the fusion look like. This may be useful in some
//! cases as our analysis of index type may be overly conservative
//! for intermediate tensors.
//! WARING: Correctness is not guaranteed.
//! WANRING: Correctness is not guaranteed.
//! TODO: Check usage of forced_index_type. It's a lot of plumbing, what's the
//! value.
KernelArgumentHolder runFusionWithInputs(
Expand Down
7 changes: 2 additions & 5 deletions csrc/transform_replay.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,13 @@
// clang-format on
#pragma once

#include <unordered_map>

#include <exceptions.h>
#include <ir/internal_nodes.h>
#include <scheduler/tools/maxinfo_propagator.h>
#include <visibility.h>

#include <algorithm>
#include <unordered_map>
#include <unordered_set>
#include <vector>

namespace nvfuser {

/*
Expand Down
55 changes: 54 additions & 1 deletion tests/cpp/test_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,19 @@
#include <fusion_guard.h>
#include <ir/interface_nodes.h>
#include <ops/arith.h>
#include <ops/composite.h>
#include <runtime/fusion_executor_cache.h>
#include <tests/cpp/utils.h>
#include <tests/cpp/validator.h>

namespace nvfuser {

using StreamTest = NVFuserTest;
class StreamTest : public NVFuserTest {
public:
StreamTest() {
EnableOptionsGuard::getCurOptions().set(EnableOption::HostIrLowering);
}
};

TEST_F(StreamTest, AddPerStream) {
constexpr int64_t c = 3;
Expand Down Expand Up @@ -55,4 +63,49 @@ TEST_F(StreamTest, AddPerStream) {
<< out_tensor << " vs " << expected_out_tensor;
}

TEST_F(StreamTest, Matmul) {
constexpr int64_t c = 3;

auto fusion = std::make_unique<Fusion>();
{
FusionGuard fg(fusion.get());
TensorView* in = makeSymbolicTensor(2);
TensorView* w = makeSymbolicTensor(2);
TensorView* out = matmul(in, w);
fusion->addInput(in);
fusion->addInput(w);
fusion->addOutput(out);

w->outer_split(1, c);
w->axis(1)->parallelize(ParallelType::Stream);
out->outer_split(1, c);
out->axis(1)->parallelize(ParallelType::Stream);
}

auto options = at::TensorOptions().dtype(at::kFloat).device(at::kCUDA);
at::Tensor in_tensor = at::randn({5, 7}, options);
at::Tensor w_tensor = at::randn({7, c * 2}, options);

// With NVFUSER_DUMP=host_ir, you'll see the host IR container like the
// following:
// clang-format off
// %HostIrContainer { (T0_g_float[iS0{i0}, iS1{i2}], T1_g_float[istreamIdx7{3}, iS11{i2}, iS8{( ceilDiv(i4, 3) )}]) -> (T2_g_float[istreamIdx9{3}, iS4{i0}, iS10{( ceilDiv(i4, 3) )}, rS6{i2}]) :
// FOR i18 from 0 to 3:
// T2_g_float[istreamIdx9{3}, iS4{i0}, iS10{( ceilDiv(i4, 3) )}, rS6{i2}]
// = matmul(T0_g_float[iS0{i0}, iS1{i2}],
// T1_g_float[istreamIdx7{3}, iS11{i2}, iS8{( ceilDiv(i4, 3) )}])
// } // %HostIrContainer
// clang-format on
FusionExecutorCache executor_cache(std::move(fusion));
auto out_tensor = executor_cache.runFusionWithInputs({in_tensor, w_tensor})[0]
.as<at::Tensor>();

testValidate(
executor_cache.fusion(),
{out_tensor},
{in_tensor, w_tensor},
__LINE__,
__FILE__);
}

} // namespace nvfuser