Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add allow_nonexistent_upstream_partitions for StaticPartitionMapping #28183

Open
j-blackwell opened this issue Mar 3, 2025 · 0 comments
Open

Comments

@j-blackwell
Copy link
Contributor

j-blackwell commented Mar 3, 2025

What's the use case?

Use-case we've come across is if a static-partitioned asset has some partitions populated by one upstream dependency, and other partitions populated by another upstream dependency. Currently, if one of the upstream assets doesn't have one of the downstream partitions, then the downstream asset cannot be materialised. This leads to loading unnecessary data through AllPartitionMapping with potentially very high redundant network costs, or through 'hacky' solutions loading asset values from the defs object, unless I am missing an easier solution?

e.g.

import dagster as dg

partition_a = dg.StaticPartitionMapping(["a"])

@dg.asset(partitions_def=partition_a, ...)
def upstream_a():
    ...

partition_b = dg.StaticPartitionMapping(["b"])

@dg.asset(partitions_def=partition_b, ...)
def upstream_b():
    ...

partition_all = dg.StaticPartitionMapping(["a", "b"])

@dg.asset(partitions_def=partition_all)
def downstream_asset(upstream_a: pd.DataFrame, upstream_b: pd.DataFrame):
    return pd.concat([upstream_a, upstream_b])

Ideas of implementation

TimeWindowMapping has this functionality, which is very useful. I have borrowed its implementation here:

    @cached_method
    def _check_upstream(self, *, upstream_partitions_def: StaticPartitionsDefinition):
        """Validate that the mapping from upstream to downstream is only defined on upstream keys."""
        check.inst_param(
            upstream_partitions_def,
            "upstream_partitions_def",
            StaticPartitionsDefinition,
            "StaticPartitionMapping can only be defined between two StaticPartitionsDefinitions",
        )
        if self.allow_nonexistent_upstream_partitions:
            # If allowed to have nonexistent upstream partitions, do not consider
            # out of range partitions to be invalid
            return
        upstream_keys = upstream_partitions_def.get_partition_keys()
        extra_keys = set(self._mapping.keys()).difference(upstream_keys)
        if extra_keys:
            raise ValueError(
                f"mapping source partitions not in the upstream partitions definition: {extra_keys}"
            )

Additional information

Thanks all :)
I am happy to draft a PR if this suggestion works for you?

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant