Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PoC] Partition Level Parallelism #30

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft

[PoC] Partition Level Parallelism #30

wants to merge 4 commits into from

Conversation

ankitsultana
Copy link
Owner

@ankitsultana ankitsultana commented Mar 3, 2023

Approach:

Say a leaf stage has VirtualServers VS: [V0, V1, V2, V3]. We will say that leaf-stage has a non-empty ColocationKey if and only if we can assign all selected segments in such a way that each segment S with partition-id=P goes to VirtualServer VS[P % len(VS)].

This check is done in "PartitionWorkerManager" itself, and it marks those leaf stages as "localized". In GreedyShuffleRewriter#visitTableScan we will return a non-empty colocation key if that stage was marked localized.

Also, the shuffle-rewrite now does NOT re-assign virtual servers. It simply performs the following check:

  1. Do the colocation keys allow skipping shuffle.
  2. Whether the number of query-partitions are the same.

Some other points:

  1. virtualId for VirtualServers (aka Query-Partition id) is global. In the current master, setting stageParallelism=4 will spawn 4 threads in each server. Instead with this PR the behavior would be to have 4 different workers across all servers. If the number of servers needed to serve the query are higher than the parallelism, an error is returned. This can happen if the selected segments are across more than stageParallelism servers.

Comment on lines +60 to +62
public Map<String, Integer> getSegmentToPartitionMap() {
return _partitionIdMap;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice if we have a typical 2-stage partition architecture

a 1-stage partition includes

  1. partition key column(s)
  2. partition method
  3. partition count
    for example:
class PartitionInstance implements Partitioning {
  List<ServerInstance> _serverInstances;  
}
class PartitionTable implements Partitioning {
  Map<String, List<Partitioning>> _partitionIdToInstanceListMapping;
}

and the RoutingTable is a special 2-stage partition

class RoutingTable implements Partitioning {
  Map<String, List<Partitioning>> _partitionIdToSegmentListMapping;
}

for example the following list of segments

S1, P1, on Server1
S2, P1, on Server1
S3, P1, on Server2
S4, P2, on Server3
S5, P2, on Server4
S6, P2, on Server4
S7, P2, on Server5

The Routing Table will look like

_partitionIdToSegmentListMapping: 
{
  "P1": [ {"S1": ["Server1"], "S2": ["Server1"], "S3": ["Server2"]} ],
  "P2": [ {"S4": ["Server3"], "S5": ["Server4"], "S6": ["Server4"], "S7": ["Server5"] } ]
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rethink about this a bit. it seems to me that routingTable should be straight forward from how the routing is being produced and dispatch. thus

  • the server --> partition --> segment might be better suited for this.
  • the partition --> segment --> server route is easier to construct based on the info routing manager has

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ btw this operates under the assumption that partition-to-instance assignment for the same replica group only returns a single server (contradict to the current instance partition API which can return multiple)
it is more similar to the real-time table

Copy link

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

several remark from my end for parition definition only changes.

_tableCache = tableCache;
}

public PartitionWorkerManager get(QueryPlan queryPlan, Map<Integer, List<Integer>> stageTree, long requestId,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest not doing the provider for now. unless we want some pluggability. (IMO partition worker manager should be default)

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes +1

Comment on lines +762 to +768
Map<String, Integer> calculatePartitionInfo(Set<String> selectedSegments) {
Map<String, Integer> result = new HashMap<>();
for (String segment : selectedSegments) {
result.put(segment, _segmentMetadataCache.getPartitionId(segment));
}
return result;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can

  1. send the segment metadata info directly over to server. or
  2. keep a partition to segment reverse map during helix changes

if we have too many segments (e.g. > 10,000) the single-threaded nature of broker request handler is going to be a problem

@@ -25,25 +25,39 @@

public class RoutingTable {
private final Map<ServerInstance, List<String>> _serverInstanceToSegmentsMap;
private final Map<String, ServerInstance> _segmentToServerMap;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was planning to keep 2 alternatives

// for backward compatibility
Map<ServerInstance, List<String>> _serverInstanceToSegmentsMap;
// for future dispatch, unpartitioned dispatch will contain a `Collections.singletonMap(-1, oldSegmentLists)`
Map<ServerInstance, Map<Integer, List<String>>
_serverInstanceToPartitionedSegmentsMap;

and let the RoutingManager manages

  • partition->segments map
  • server->partitions map
    individually (update upon helix changes)

@@ -46,12 +45,12 @@ public class StageMetadata implements Serializable {
private List<String> _scannedTables;

// used for assigning server/worker nodes.
private List<VirtualServer> _serverInstances;
private List<VirtualServer> _virtualServers;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am actually

  1. modifying this back to ServerInstance and
  2. make it a map: Map<ServerInstance, List<Integer> instancesToPartitionMapping

again use -1 here for non-partitioned, e.g. all servers should be responsible for all partitions

@ankitsultana ankitsultana force-pushed the master branch 2 times, most recently from d58cba3 to fe98bb0 Compare May 11, 2023 20:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants