Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Applies to: ✅ Microsoft Fabric ✅ Azure Data Explorer ✅ Azure Monitor ✅ Microsoft Sentinel
The shuffle query is a semantic-preserving transformation used with a set of operators that support the shuffle strategy. Depending on the data involved, querying with the shuffle strategy can yield better performance. It's better to use the shuffle query strategy when the shuffle key (a join key, summarize key, make-series key or partition key) has a high cardinality and the regular operator query hits query limits.
You can use the following operators with the shuffle command:
To use the shuffle query strategy, add the expression hint.strategy = shuffle or hint.shufflekey = <key>. When you use hint.strategy=shuffle, the operator data will be shuffled by all the keys. Use this expression when the compound key is unique but each key isn't unique enough, so you'll shuffle the data using all the keys of the shuffled operator.
When partitioning data with the shuffle strategy, the data load is shared on all cluster nodes. Each node processes one partition of the data. The default number of partitions is equal to the number of cluster nodes.
The partition number can be overridden by using the syntax hint.num_partitions = total_partitions, which will control the number of partitions. This is useful when the cluster has a small number of cluster nodes and the default partitions number will be small, and the query fails or takes a long execution time.
Note
Using many partitions may consume more cluster resources and degrade performance. Choose the partition number carefully by starting with the hint.strategy = shuffle and start increasing the partitions gradually.
In some cases, the hint.strategy = shuffle is ignored, and the query won't run in shuffle strategy. This can happen when:
- The joinoperator has anothershuffle-compatible operator (join,summarize,make-seriesorpartition) on the left side or the right side.
- The summarizeoperator appears after anothershuffle-compatible operator (join,summarize,make-seriesorpartition) in the query.
Syntax
With hint.strategy = shuffle
T | DataExpression | join  hint.strategy = shuffle ( DataExpression )
T | summarize hint.strategy = shuffle DataExpression
T | Query | partition hint.strategy = shuffle  ( SubQuery )
With hint.shufflekey = key
T | DataExpression | join  hint.shufflekey = key ( DataExpression )
T | summarize hint.shufflekey = key DataExpression
T | make-series hint.shufflekey = key DataExpression
T | Query | partition  hint.shufflekey = key ( SubQuery )
Learn more about syntax conventions.
Parameters
| Name | Type | Required | Description | 
|---|---|---|---|
| T | string | ✔️ | The tabular source whose data is to be processed by the operator. | 
| DataExpression | string | An implicit or explicit tabular transformation expression. | |
| Query | string | A transformation expression run on the records of T. | |
| key | string | Use a joinkey,summarizekey,make-serieskey orpartitionkey. | |
| SubQuery | string | A transformation expression. | 
Note
Either DataExpression or Query must be specified depending on the chosen syntax.
Examples
The example in this section shows how to use the syntax to help you get started.
The examples in this article use publicly available tables in the help cluster, such as the
StormEventstable in the Samples database.
The examples in this article use publicly available tables, such as the
Weathertable in the Weather analytics sample gallery. You may need to modify the the table name in the example query to match the table in your workspace.
Use summarize with shuffle
The shuffle strategy query with summarize operator shares the load on all cluster nodes, where each node processes one partition of the data.
StormEvents
| summarize hint.strategy = shuffle count(), avg(InjuriesIndirect) by State
| count 
Output
| Count | 
|---|
| 67 | 
Use join with shuffle
StormEvents
| where State has "West"
| where EventType has "Flood"
| join hint.strategy=shuffle 
    (
    StormEvents
    | where EventType has "Hail"
    | project EpisodeId, State, DamageProperty
    )
    on State
| count
Output
| Count | 
|---|
| 103 | 
Use make-series with shuffle
StormEvents
| where State has "North"
| make-series hint.shufflekey = State sum(DamageProperty) default = 0 on StartTime in range(datetime(2007-01-01 00:00:00.0000000), datetime(2007-01-31 23:59:00.0000000), 15d) by State
Output
| State | sum_DamageProperty | StartTime | 
|---|---|---|
| NORTH DAKOTA | [60000,0,0] | ["2006-12-31T00:00:00.0000000Z","2007-01-15T00:00:00.0000000Z","2007-01-30T00:00:00.0000000Z"] | 
| NORTH CAROLINA | [20000,0,1000] | ["2006-12-31T00:00:00.0000000Z","2007-01-15T00:00:00.0000000Z","2007-01-30T00:00:00.0000000Z"] | 
| ATLANTIC NORTH | [0,0,0] | ["2006-12-31T00:00:00.0000000Z","2007-01-15T00:00:00.0000000Z","2007-01-30T00:00:00.0000000Z"] | 
Use partition with shuffle
StormEvents
| partition hint.strategy=shuffle by EpisodeId
(
    top 3 by DamageProperty
    | project EpisodeId, State, DamageProperty
)
| count
Output
| Count | 
|---|
| 22345 | 
Compare hint.strategy=shuffle and hint.shufflekey=key
When you use hint.strategy=shuffle, the shuffled operator will be shuffled by all the keys. In the following example, the query shuffles the data using both EpisodeId and EventId as keys:
StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| join kind = inner hint.strategy=shuffle (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId
| count
Output
| Count | 
|---|
| 14 | 
The following query uses hint.shufflekey = key. The query above is equivalent to this query.
StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| join kind = inner hint.shufflekey = EpisodeId hint.shufflekey = EventId (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId
Output
| Count | 
|---|
| 14 | 
Shuffle the data with multiple keys
In some cases, the hint.strategy=shuffle will be ignored, and the query won't run in shuffle strategy. For example, in the following example, the join has summarize on its left side, so using hint.strategy=shuffle won't apply shuffle strategy to the query:
StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| summarize count() by EpisodeId, EventId
| join kind = inner hint.strategy=shuffle (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId
Output
| EpisodeId | EventId | ... | EpisodeId1 | EventId1 | ... | 
|---|---|---|---|---|---|
| 1030 | 4407 | ... | 1030 | 4407 | ... | 
| 1030 | 13721 | ... | 1030 | 13721 | ... | 
| 2477 | 12530 | ... | 2477 | 12530 | ... | 
| 2103 | 10237 | ... | 2103 | 10237 | ... | 
| 2103 | 10239 | ... | 2103 | 10239 | ... | 
| ... | ... | ... | ... | ... | ... | 
To overcome this issue and run in shuffle strategy, choose the key that is common for the summarize and join operations. In this case, this key is EpisodeId. Use the hint hint.shufflekey to specify the shuffle key on the join to hint.shufflekey = EpisodeId:
StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| summarize count() by EpisodeId, EventId
| join kind = inner hint.shufflekey=EpisodeId (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId
Output
| EpisodeId | EventId | ... | EpisodeId1 | EventId1 | ... | 
|---|---|---|---|---|---|
| 1030 | 4407 | ... | 1030 | 4407 | ... | 
| 1030 | 13721 | ... | 1030 | 13721 | ... | 
| 2477 | 12530 | ... | 2477 | 12530 | ... | 
| 2103 | 10237 | ... | 2103 | 10237 | ... | 
| 2103 | 10239 | ... | 2103 | 10239 | ... | 
| ... | ... | ... | ... | ... | ... | 
Use summarize with shuffle to improve performance
In this example, using the summarize operator with shuffle strategy improves performance. The source table has 150M records and the cardinality of the group by key is 10M, which is spread over 10 cluster nodes.
Using summarize operator without shuffle strategy, the query ends after 1:08 and the memory usage peak is ~3 GB:
orders
| summarize arg_max(o_orderdate, o_totalprice) by o_custkey 
| where o_totalprice < 1000
| count
Output
| Count | 
|---|
| 1086 | 
While using shuffle strategy with summarize, the query ends after ~7 seconds and the memory usage peak is 0.43 GB:
orders
| summarize hint.strategy = shuffle arg_max(o_orderdate, o_totalprice) by o_custkey 
| where o_totalprice < 1000
| count
Output
| Count | 
|---|
| 1086 | 
The following example demonstrates performance on a cluster that has two cluster nodes, with a table that has 60M records, where the cardinality of the group by key is 2M.
Running the query without hint.num_partitions will use only two partitions (as cluster nodes number) and the following query will take ~1:10 mins:
lineitem 
| summarize hint.strategy = shuffle dcount(l_comment), dcount(l_shipdate) by l_partkey 
| consume
If setting the partitions number to 10, the query will end after 23 seconds:
lineitem 
| summarize hint.strategy = shuffle hint.num_partitions = 10 dcount(l_comment), dcount(l_shipdate) by l_partkey 
| consume
Use join with shuffle to improve performance
The following example shows how using shuffle strategy with the join operator improves performance.
The examples were sampled on a cluster with 10 nodes where the data is spread over all these nodes.
The query's left-side source table has 15M records where the cardinality of the join key is ~14M. The query's right-side source has 150M records and the cardinality of the join key is 10M. The query ends after ~28 seconds and the memory usage peak is 1.43 GB:
customer
| join
    orders
on $left.c_custkey == $right.o_custkey
| summarize sum(c_acctbal) by c_nationkey
When using shuffle strategy with a join operator, the query ends after ~4 seconds and the memory usage peak is 0.3 GB:
customer
| join
    hint.strategy = shuffle orders
on $left.c_custkey == $right.o_custkey
| summarize sum(c_acctbal) by c_nationkey
In another example, we try the same queries on a larger dataset with the following conditions:
- Left-side source of the joinis 150M and the cardinality of the key is 148M.
- Right-side source of the joinis 1.5B, and the cardinality of the key is ~100M.
The query with just the join operator hits limits and times-out after 4 mins. However, when using shuffle strategy with the join operator, the query ends after ~34 seconds and the memory usage peak is 1.23 GB.
The following example shows the improvement on a cluster that has two cluster nodes, with a table of 60M records, where the cardinality of the join key is 2M.
Running the query without hint.num_partitions will use only two partitions (as cluster nodes number) and the following query will take ~1:10 mins:
lineitem
| summarize dcount(l_comment), dcount(l_shipdate) by l_partkey
| join
    hint.shufflekey = l_partkey   part
on $left.l_partkey == $right.p_partkey
| consume
When setting the partitions number to 10, the query will end after 23 seconds:
lineitem
| summarize dcount(l_comment), dcount(l_shipdate) by l_partkey
| join
    hint.shufflekey = l_partkey  hint.num_partitions = 10    part
on $left.l_partkey == $right.p_partkey
| consume