diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 91bfe70d53..03197530ca 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -236,6 +236,30 @@ index 0efe0877e9b..423d3b3d76d 100644 -- -- SELECT_HAVING -- https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/select_having.sql +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala +index c7c09bf7c79..5eaa5222142 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala +@@ -280,7 +280,8 @@ abstract class CTEInlineSuiteBase + } + } + +- test("CTE Predicate push-down and column pruning") { ++ test("CTE Predicate push-down and column pruning", ++ IgnoreComet("Comet changes the exchange reuse count asserted by this test")) { + withTempView("t") { + Seq((0, 1), (1, 2)).toDF("c1", "c2").createOrReplaceTempView("t") + val df = sql( +@@ -330,7 +331,8 @@ abstract class CTEInlineSuiteBase + } + } + +- test("CTE Predicate push-down and column pruning - combined predicate") { ++ test("CTE Predicate push-down and column pruning - combined predicate", ++ IgnoreComet("Comet changes the exchange reuse count asserted by this test")) { + withTempView("t") { + Seq((0, 1, 2), (1, 2, 3)).toDF("c1", "c2", "c3").createOrReplaceTempView("t") + val df = sql( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index cf40e944c09..bdd5be4f462 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -497,7 +521,7 @@ index f33432ddb6f..b375e285dde 100644 } assert(scanOption.isDefined) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala -index a6b295578d6..91acca4306f 100644 +index a6b295578d6..1167bbe6554 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -260,7 +260,8 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite @@ -1474,7 +1498,7 @@ index ac710c32296..2854b433dd3 100644 import testImplicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala -index 593bd7bb4ba..32af28b0238 100644 +index 593bd7bb4ba..7f68e3bd8d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -26,9 +26,11 @@ import org.scalatest.time.SpanSugar._ @@ -1742,7 +1766,27 @@ index 593bd7bb4ba..32af28b0238 100644 withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { val (_, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT key FROM testData GROUP BY key") -@@ -1599,7 +1628,7 @@ class AdaptiveQueryExecSuite +@@ -1541,7 +1570,8 @@ class AdaptiveQueryExecSuite + } + } + +- test("SPARK-35442: Support propagate empty relation through aggregate") { ++ test("SPARK-35442: Support propagate empty relation through aggregate", ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4412")) { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult( + "SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key") +@@ -1560,7 +1590,8 @@ class AdaptiveQueryExecSuite + } + } + +- test("SPARK-35442: Support propagate empty relation through union") { ++ test("SPARK-35442: Support propagate empty relation through union", ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4412")) { + def checkNumUnion(plan: SparkPlan, numUnion: Int): Unit = { + assert( + collect(plan) { +@@ -1599,7 +1630,7 @@ class AdaptiveQueryExecSuite val (_, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT id FROM v1 GROUP BY id DISTRIBUTE BY id") assert(collect(adaptivePlan) { @@ -1751,7 +1795,7 @@ index 593bd7bb4ba..32af28b0238 100644 }.length == 1) } } -@@ -1679,7 +1708,8 @@ class AdaptiveQueryExecSuite +@@ -1679,7 +1710,8 @@ class AdaptiveQueryExecSuite } } @@ -1761,7 +1805,7 @@ index 593bd7bb4ba..32af28b0238 100644 def hasRepartitionShuffle(plan: SparkPlan): Boolean = { find(plan) { case s: ShuffleExchangeLike => -@@ -1864,6 +1894,9 @@ class AdaptiveQueryExecSuite +@@ -1864,6 +1896,9 @@ class AdaptiveQueryExecSuite def checkNoCoalescePartitions(ds: Dataset[Row], origin: ShuffleOrigin): Unit = { assert(collect(ds.queryExecution.executedPlan) { case s: ShuffleExchangeExec if s.shuffleOrigin == origin && s.numPartitions == 2 => s @@ -1771,7 +1815,7 @@ index 593bd7bb4ba..32af28b0238 100644 }.size == 1) ds.collect() val plan = ds.queryExecution.executedPlan -@@ -1872,6 +1905,9 @@ class AdaptiveQueryExecSuite +@@ -1872,6 +1907,9 @@ class AdaptiveQueryExecSuite }.isEmpty) assert(collect(plan) { case s: ShuffleExchangeExec if s.shuffleOrigin == origin && s.numPartitions == 2 => s @@ -1781,7 +1825,15 @@ index 593bd7bb4ba..32af28b0238 100644 }.size == 1) checkAnswer(ds, testData) } -@@ -2028,7 +2064,8 @@ class AdaptiveQueryExecSuite +@@ -1901,6 +1939,7 @@ class AdaptiveQueryExecSuite + df.collect() + assert(collect(df.queryExecution.executedPlan) { + case u: UnionExec => u ++ case u: CometUnionExec => u.originalPlan.asInstanceOf[UnionExec] + }.size == numUnion) + assert(collect(df.queryExecution.executedPlan) { + case r: AQEShuffleReadExec => r +@@ -2028,7 +2067,8 @@ class AdaptiveQueryExecSuite } } @@ -1791,7 +1843,7 @@ index 593bd7bb4ba..32af28b0238 100644 withTempView("t1", "t2") { def checkJoinStrategy(shouldShuffleHashJoin: Boolean): Unit = { Seq("100", "100000").foreach { size => -@@ -2114,7 +2151,8 @@ class AdaptiveQueryExecSuite +@@ -2114,7 +2154,8 @@ class AdaptiveQueryExecSuite } } @@ -1801,7 +1853,7 @@ index 593bd7bb4ba..32af28b0238 100644 withTempView("v") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", -@@ -2213,7 +2251,7 @@ class AdaptiveQueryExecSuite +@@ -2213,7 +2254,7 @@ class AdaptiveQueryExecSuite runAdaptiveAndVerifyResult(s"SELECT $repartition key1 FROM skewData1 " + s"JOIN skewData2 ON key1 = key2 GROUP BY key1") val shuffles1 = collect(adaptive1) { @@ -1810,7 +1862,7 @@ index 593bd7bb4ba..32af28b0238 100644 } assert(shuffles1.size == 3) // shuffles1.head is the top-level shuffle under the Aggregate operator -@@ -2226,7 +2264,7 @@ class AdaptiveQueryExecSuite +@@ -2226,7 +2267,7 @@ class AdaptiveQueryExecSuite runAdaptiveAndVerifyResult(s"SELECT $repartition key1 FROM skewData1 " + s"JOIN skewData2 ON key1 = key2") val shuffles2 = collect(adaptive2) { @@ -1819,7 +1871,7 @@ index 593bd7bb4ba..32af28b0238 100644 } if (hasRequiredDistribution) { assert(shuffles2.size == 3) -@@ -2260,7 +2298,8 @@ class AdaptiveQueryExecSuite +@@ -2260,7 +2301,8 @@ class AdaptiveQueryExecSuite } } @@ -1829,7 +1881,7 @@ index 593bd7bb4ba..32af28b0238 100644 CostEvaluator.instantiate( classOf[SimpleShuffleSortCostEvaluator].getCanonicalName, spark.sparkContext.getConf) intercept[IllegalArgumentException] { -@@ -2404,6 +2443,7 @@ class AdaptiveQueryExecSuite +@@ -2404,6 +2446,7 @@ class AdaptiveQueryExecSuite val (_, adaptive) = runAdaptiveAndVerifyResult(query) assert(adaptive.collect { case sort: SortExec => sort @@ -1837,7 +1889,7 @@ index 593bd7bb4ba..32af28b0238 100644 }.size == 1) val read = collect(adaptive) { case read: AQEShuffleReadExec => read -@@ -2421,7 +2461,8 @@ class AdaptiveQueryExecSuite +@@ -2421,7 +2464,8 @@ class AdaptiveQueryExecSuite } } @@ -1847,7 +1899,7 @@ index 593bd7bb4ba..32af28b0238 100644 withTempView("v") { withSQLConf( SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED.key -> "true", -@@ -2533,7 +2574,7 @@ class AdaptiveQueryExecSuite +@@ -2533,7 +2577,7 @@ class AdaptiveQueryExecSuite runAdaptiveAndVerifyResult("SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 " + "JOIN skewData3 ON value2 = value3") val shuffles1 = collect(adaptive1) { @@ -1856,7 +1908,7 @@ index 593bd7bb4ba..32af28b0238 100644 } assert(shuffles1.size == 4) val smj1 = findTopLevelSortMergeJoin(adaptive1) -@@ -2544,7 +2585,7 @@ class AdaptiveQueryExecSuite +@@ -2544,7 +2588,7 @@ class AdaptiveQueryExecSuite runAdaptiveAndVerifyResult("SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 " + "JOIN skewData3 ON value1 = value3") val shuffles2 = collect(adaptive2) { @@ -2327,7 +2379,7 @@ index bf5c51b89bb..f7402b7d883 100644 val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) assert(e.getCause.isInstanceOf[SparkException]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala -index 3a0bd35cb70..b28f06a757f 100644 +index 3a0bd35cb70..99b70606261 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.debug @@ -2338,7 +2390,27 @@ index 3a0bd35cb70..b28f06a757f 100644 import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext -@@ -124,7 +125,8 @@ class DebuggingSuite extends DebuggingSuiteBase with DisableAdaptiveExecutionSui +@@ -41,7 +42,8 @@ abstract class DebuggingSuiteBase extends SharedSparkSession { + testData.as[TestData].debug() + } + +- test("debugCodegen") { ++ test("debugCodegen", ++ IgnoreComet("Comet changes the WholeStageCodegen subtree count")) { + val df = spark.range(10).groupBy(col("id") * 2).count() + df.collect() + val res = codegenString(df.queryExecution.executedPlan) +@@ -50,7 +52,8 @@ abstract class DebuggingSuiteBase extends SharedSparkSession { + assert(res.contains("Object[]")) + } + +- test("debugCodegenStringSeq") { ++ test("debugCodegenStringSeq", ++ IgnoreComet("Comet changes the WholeStageCodegen subtree count")) { + val df = spark.range(10).groupBy(col("id") * 2).count() + df.collect() + val res = codegenStringSeq(df.queryExecution.executedPlan) +@@ -124,7 +127,8 @@ class DebuggingSuite extends DebuggingSuiteBase with DisableAdaptiveExecutionSui | id LongType: {}""".stripMargin)) } @@ -2448,6 +2520,29 @@ index d083cac48ff..3c11bcde807 100644 import testImplicits._ +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +index fdc633f3556..99474724c26 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +@@ -36,7 +36,7 @@ import org.apache.spark.internal.config.Status._ + import org.apache.spark.rdd.RDD + import org.apache.spark.resource.ResourceProfile + import org.apache.spark.scheduler._ +-import org.apache.spark.sql.{DataFrame, SparkSession} ++import org.apache.spark.sql.{DataFrame, IgnoreComet, SparkSession} + import org.apache.spark.sql.catalyst.InternalRow + import org.apache.spark.sql.catalyst.expressions.Attribute + import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +@@ -699,7 +699,8 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes + } + + test("SPARK-29894 test Codegen Stage Id in SparkPlanInfo", +- DisableAdaptiveExecution("WSCG rule is applied later in AQE")) { ++ DisableAdaptiveExecution("WSCG rule is applied later in AQE"), ++ IgnoreComet("Comet changes the WholeStageCodegen subtree count")) { + // with AQE on, the WholeStageCodegen rule is applied when running QueryStageExec. + val df = createTestDataFrame.select(count("*")) + val sparkPlanInfo = SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 266bb343526..f8ad838e2b2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 51c5054f91..e50d76d036 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -217,6 +217,30 @@ index 0efe0877e9b..423d3b3d76d 100644 -- -- SELECT_HAVING -- https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/select_having.sql +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala +index 73f5b742715..5de6754c70d 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala +@@ -280,7 +280,8 @@ abstract class CTEInlineSuiteBase + } + } + +- test("CTE Predicate push-down and column pruning") { ++ test("CTE Predicate push-down and column pruning", ++ IgnoreComet("Comet changes the exchange reuse count asserted by this test")) { + withTempView("t") { + Seq((0, 1), (1, 2)).toDF("c1", "c2").createOrReplaceTempView("t") + val df = sql( +@@ -330,7 +331,8 @@ abstract class CTEInlineSuiteBase + } + } + +- test("CTE Predicate push-down and column pruning - combined predicate") { ++ test("CTE Predicate push-down and column pruning - combined predicate", ++ IgnoreComet("Comet changes the exchange reuse count asserted by this test")) { + withTempView("t") { + Seq((0, 1, 2), (1, 2, 3)).toDF("c1", "c2", "c3").createOrReplaceTempView("t") + val df = sql( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index e5494726695..00937f025c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -1439,7 +1463,7 @@ index 5a413c77754..207b66e1d7b 100644 import testImplicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala -index 2f8e401e743..dbcf3171946 100644 +index 2f8e401e743..f408405807b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -27,9 +27,11 @@ import org.scalatest.time.SpanSugar._ @@ -1714,7 +1738,27 @@ index 2f8e401e743..dbcf3171946 100644 withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { val (_, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT key FROM testData GROUP BY key") -@@ -1625,7 +1655,7 @@ class AdaptiveQueryExecSuite +@@ -1567,7 +1597,8 @@ class AdaptiveQueryExecSuite + } + } + +- test("SPARK-35442: Support propagate empty relation through aggregate") { ++ test("SPARK-35442: Support propagate empty relation through aggregate", ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4412")) { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult( + "SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key") +@@ -1586,7 +1617,8 @@ class AdaptiveQueryExecSuite + } + } + +- test("SPARK-35442: Support propagate empty relation through union") { ++ test("SPARK-35442: Support propagate empty relation through union", ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4412")) { + def checkNumUnion(plan: SparkPlan, numUnion: Int): Unit = { + assert( + collect(plan) { +@@ -1625,7 +1657,7 @@ class AdaptiveQueryExecSuite val (_, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT id FROM v1 GROUP BY id DISTRIBUTE BY id") assert(collect(adaptivePlan) { @@ -1723,7 +1767,7 @@ index 2f8e401e743..dbcf3171946 100644 }.length == 1) } } -@@ -1705,7 +1735,8 @@ class AdaptiveQueryExecSuite +@@ -1705,7 +1737,8 @@ class AdaptiveQueryExecSuite } } @@ -1733,7 +1777,7 @@ index 2f8e401e743..dbcf3171946 100644 def hasRepartitionShuffle(plan: SparkPlan): Boolean = { find(plan) { case s: ShuffleExchangeLike => -@@ -1890,6 +1921,9 @@ class AdaptiveQueryExecSuite +@@ -1890,6 +1923,9 @@ class AdaptiveQueryExecSuite def checkNoCoalescePartitions(ds: Dataset[Row], origin: ShuffleOrigin): Unit = { assert(collect(ds.queryExecution.executedPlan) { case s: ShuffleExchangeExec if s.shuffleOrigin == origin && s.numPartitions == 2 => s @@ -1743,7 +1787,7 @@ index 2f8e401e743..dbcf3171946 100644 }.size == 1) ds.collect() val plan = ds.queryExecution.executedPlan -@@ -1898,6 +1932,9 @@ class AdaptiveQueryExecSuite +@@ -1898,6 +1934,9 @@ class AdaptiveQueryExecSuite }.isEmpty) assert(collect(plan) { case s: ShuffleExchangeExec if s.shuffleOrigin == origin && s.numPartitions == 2 => s @@ -1753,7 +1797,15 @@ index 2f8e401e743..dbcf3171946 100644 }.size == 1) checkAnswer(ds, testData) } -@@ -2054,7 +2091,8 @@ class AdaptiveQueryExecSuite +@@ -1927,6 +1966,7 @@ class AdaptiveQueryExecSuite + df.collect() + assert(collect(df.queryExecution.executedPlan) { + case u: UnionExec => u ++ case u: CometUnionExec => u.originalPlan.asInstanceOf[UnionExec] + }.size == numUnion) + assert(collect(df.queryExecution.executedPlan) { + case r: AQEShuffleReadExec => r +@@ -2054,7 +2094,8 @@ class AdaptiveQueryExecSuite } } @@ -1763,7 +1815,7 @@ index 2f8e401e743..dbcf3171946 100644 withTempView("t1", "t2") { def checkJoinStrategy(shouldShuffleHashJoin: Boolean): Unit = { Seq("100", "100000").foreach { size => -@@ -2140,7 +2178,8 @@ class AdaptiveQueryExecSuite +@@ -2140,7 +2181,8 @@ class AdaptiveQueryExecSuite } } @@ -1773,7 +1825,7 @@ index 2f8e401e743..dbcf3171946 100644 withTempView("v") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", -@@ -2239,7 +2278,7 @@ class AdaptiveQueryExecSuite +@@ -2239,7 +2281,7 @@ class AdaptiveQueryExecSuite runAdaptiveAndVerifyResult(s"SELECT $repartition key1 FROM skewData1 " + s"JOIN skewData2 ON key1 = key2 GROUP BY key1") val shuffles1 = collect(adaptive1) { @@ -1782,7 +1834,7 @@ index 2f8e401e743..dbcf3171946 100644 } assert(shuffles1.size == 3) // shuffles1.head is the top-level shuffle under the Aggregate operator -@@ -2252,7 +2291,7 @@ class AdaptiveQueryExecSuite +@@ -2252,7 +2294,7 @@ class AdaptiveQueryExecSuite runAdaptiveAndVerifyResult(s"SELECT $repartition key1 FROM skewData1 " + s"JOIN skewData2 ON key1 = key2") val shuffles2 = collect(adaptive2) { @@ -1791,7 +1843,7 @@ index 2f8e401e743..dbcf3171946 100644 } if (hasRequiredDistribution) { assert(shuffles2.size == 3) -@@ -2286,7 +2325,8 @@ class AdaptiveQueryExecSuite +@@ -2286,7 +2328,8 @@ class AdaptiveQueryExecSuite } } @@ -1801,7 +1853,7 @@ index 2f8e401e743..dbcf3171946 100644 CostEvaluator.instantiate( classOf[SimpleShuffleSortCostEvaluator].getCanonicalName, spark.sparkContext.getConf) intercept[IllegalArgumentException] { -@@ -2452,6 +2492,7 @@ class AdaptiveQueryExecSuite +@@ -2452,6 +2495,7 @@ class AdaptiveQueryExecSuite val (_, adaptive) = runAdaptiveAndVerifyResult(query) assert(adaptive.collect { case sort: SortExec => sort @@ -1809,7 +1861,7 @@ index 2f8e401e743..dbcf3171946 100644 }.size == 1) val read = collect(adaptive) { case read: AQEShuffleReadExec => read -@@ -2469,7 +2510,8 @@ class AdaptiveQueryExecSuite +@@ -2469,7 +2513,8 @@ class AdaptiveQueryExecSuite } } @@ -1819,7 +1871,7 @@ index 2f8e401e743..dbcf3171946 100644 withTempView("v") { withSQLConf( SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED.key -> "true", -@@ -2581,7 +2623,7 @@ class AdaptiveQueryExecSuite +@@ -2581,7 +2626,7 @@ class AdaptiveQueryExecSuite runAdaptiveAndVerifyResult("SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 " + "JOIN skewData3 ON value2 = value3") val shuffles1 = collect(adaptive1) { @@ -1828,7 +1880,7 @@ index 2f8e401e743..dbcf3171946 100644 } assert(shuffles1.size == 4) val smj1 = findTopLevelSortMergeJoin(adaptive1) -@@ -2592,7 +2634,7 @@ class AdaptiveQueryExecSuite +@@ -2592,7 +2637,7 @@ class AdaptiveQueryExecSuite runAdaptiveAndVerifyResult("SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 " + "JOIN skewData3 ON value1 = value3") val shuffles2 = collect(adaptive2) { @@ -1837,7 +1889,7 @@ index 2f8e401e743..dbcf3171946 100644 } assert(shuffles2.size == 4) val smj2 = findTopLevelSortMergeJoin(adaptive2) -@@ -2850,6 +2892,7 @@ class AdaptiveQueryExecSuite +@@ -2850,6 +2895,7 @@ class AdaptiveQueryExecSuite }.size == (if (firstAccess) 1 else 0)) assert(collect(initialExecutedPlan) { case s: SortExec => s @@ -1845,7 +1897,7 @@ index 2f8e401e743..dbcf3171946 100644 }.size == (if (firstAccess) 2 else 0)) assert(collect(initialExecutedPlan) { case i: InMemoryTableScanLike => i -@@ -2980,7 +3023,9 @@ class AdaptiveQueryExecSuite +@@ -2980,7 +3026,9 @@ class AdaptiveQueryExecSuite val plan = df.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] assert(plan.inputPlan.isInstanceOf[TakeOrderedAndProjectExec]) @@ -2290,7 +2342,7 @@ index 3f47c5e506f..8e8d2a1634d 100644 val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) assert(e.getCause.isInstanceOf[SparkException]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala -index b8f3ea3c6f3..bbd44221288 100644 +index b8f3ea3c6f3..0d92198bd83 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.debug @@ -2301,7 +2353,27 @@ index b8f3ea3c6f3..bbd44221288 100644 import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext -@@ -125,7 +126,8 @@ class DebuggingSuite extends DebuggingSuiteBase with DisableAdaptiveExecutionSui +@@ -42,7 +43,8 @@ abstract class DebuggingSuiteBase extends SharedSparkSession { + testData.as[TestData].debug() + } + +- test("debugCodegen") { ++ test("debugCodegen", ++ IgnoreComet("Comet changes the WholeStageCodegen subtree count")) { + val df = spark.range(10).groupBy(col("id") * 2).count() + df.collect() + val res = codegenString(df.queryExecution.executedPlan) +@@ -51,7 +53,8 @@ abstract class DebuggingSuiteBase extends SharedSparkSession { + assert(res.contains("Object[]")) + } + +- test("debugCodegenStringSeq") { ++ test("debugCodegenStringSeq", ++ IgnoreComet("Comet changes the WholeStageCodegen subtree count")) { + val df = spark.range(10).groupBy(col("id") * 2).count() + df.collect() + val res = codegenStringSeq(df.queryExecution.executedPlan) +@@ -125,7 +128,8 @@ class DebuggingSuite extends DebuggingSuiteBase with DisableAdaptiveExecutionSui | id LongType: {}""".stripMargin)) } @@ -2411,6 +2483,29 @@ index d083cac48ff..3c11bcde807 100644 import testImplicits._ +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +index 67206e9c655..b90e914c113 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +@@ -35,7 +35,7 @@ import org.apache.spark.internal.config.Status._ + import org.apache.spark.rdd.RDD + import org.apache.spark.resource.ResourceProfile + import org.apache.spark.scheduler._ +-import org.apache.spark.sql.{DataFrame, SparkSession} ++import org.apache.spark.sql.{DataFrame, IgnoreComet, SparkSession} + import org.apache.spark.sql.catalyst.InternalRow + import org.apache.spark.sql.catalyst.expressions.Attribute + import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +@@ -699,7 +699,8 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes + } + + test("SPARK-29894 test Codegen Stage Id in SparkPlanInfo", +- DisableAdaptiveExecution("WSCG rule is applied later in AQE")) { ++ DisableAdaptiveExecution("WSCG rule is applied later in AQE"), ++ IgnoreComet("Comet changes the WholeStageCodegen subtree count")) { + // with AQE on, the WholeStageCodegen rule is applied when running QueryStageExec. + val df = createTestDataFrame.select(count("*")) + val sparkPlanInfo = SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 746f289c393..e5dc13b87d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala diff --git a/dev/diffs/4.0.2.diff b/dev/diffs/4.0.2.diff index 5bfc423cd5..cdcbbfa6e1 100644 --- a/dev/diffs/4.0.2.diff +++ b/dev/diffs/4.0.2.diff @@ -332,6 +332,30 @@ index 21a3ce1e122..f4762ab98f0 100644 SET spark.sql.ansi.enabled = false; -- In COMPENSATION views get invalidated if the type can't cast +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala +index 80450b79a9a..41d1bf57938 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala +@@ -282,7 +282,8 @@ abstract class CTEInlineSuiteBase + } + } + +- test("CTE Predicate push-down and column pruning") { ++ test("CTE Predicate push-down and column pruning", ++ IgnoreComet("Comet changes the exchange reuse count asserted by this test")) { + withTempView("t") { + Seq((0, 1), (1, 2)).toDF("c1", "c2").createOrReplaceTempView("t") + val df = sql( +@@ -332,7 +333,8 @@ abstract class CTEInlineSuiteBase + } + } + +- test("CTE Predicate push-down and column pruning - combined predicate") { ++ test("CTE Predicate push-down and column pruning - combined predicate", ++ IgnoreComet("Comet changes the exchange reuse count asserted by this test")) { + withTempView("t") { + Seq((0, 1, 2, 3), (1, 2, 3, 4)).toDF("c1", "c2", "c3", "c4").createOrReplaceTempView("t") + val df = sql( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 0f42502f1d9..e9ff802141f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -2023,7 +2047,7 @@ index a3cfdc5a240..3793b6191bf 100644 }) checkAnswer(distinctWithId, Seq(Row(1, 0), Row(1, 0))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala -index 272be70f9fe..d38a6d41a47 100644 +index 272be70f9fe..8cea57eef5c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -28,12 +28,14 @@ import org.apache.spark.SparkException @@ -2301,7 +2325,27 @@ index 272be70f9fe..d38a6d41a47 100644 withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { val (_, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT key FROM testData GROUP BY key") -@@ -1721,7 +1751,7 @@ class AdaptiveQueryExecSuite +@@ -1663,7 +1693,8 @@ class AdaptiveQueryExecSuite + } + } + +- test("SPARK-35442: Support propagate empty relation through aggregate") { ++ test("SPARK-35442: Support propagate empty relation through aggregate", ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4412")) { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult( + "SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key") +@@ -1682,7 +1713,8 @@ class AdaptiveQueryExecSuite + } + } + +- test("SPARK-35442: Support propagate empty relation through union") { ++ test("SPARK-35442: Support propagate empty relation through union", ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4412")) { + def checkNumUnion(plan: SparkPlan, numUnion: Int): Unit = { + assert( + collect(plan) { +@@ -1721,7 +1753,7 @@ class AdaptiveQueryExecSuite val (_, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT id FROM v1 GROUP BY id DISTRIBUTE BY id") assert(collect(adaptivePlan) { @@ -2310,7 +2354,7 @@ index 272be70f9fe..d38a6d41a47 100644 }.length == 1) } } -@@ -1801,7 +1831,8 @@ class AdaptiveQueryExecSuite +@@ -1801,7 +1833,8 @@ class AdaptiveQueryExecSuite } } @@ -2320,7 +2364,7 @@ index 272be70f9fe..d38a6d41a47 100644 def hasRepartitionShuffle(plan: SparkPlan): Boolean = { find(plan) { case s: ShuffleExchangeLike => -@@ -1986,6 +2017,9 @@ class AdaptiveQueryExecSuite +@@ -1986,6 +2019,9 @@ class AdaptiveQueryExecSuite def checkNoCoalescePartitions(ds: Dataset[Row], origin: ShuffleOrigin): Unit = { assert(collect(ds.queryExecution.executedPlan) { case s: ShuffleExchangeExec if s.shuffleOrigin == origin && s.numPartitions == 2 => s @@ -2330,7 +2374,7 @@ index 272be70f9fe..d38a6d41a47 100644 }.size == 1) ds.collect() val plan = ds.queryExecution.executedPlan -@@ -1994,6 +2028,9 @@ class AdaptiveQueryExecSuite +@@ -1994,6 +2030,9 @@ class AdaptiveQueryExecSuite }.isEmpty) assert(collect(plan) { case s: ShuffleExchangeExec if s.shuffleOrigin == origin && s.numPartitions == 2 => s @@ -2340,7 +2384,15 @@ index 272be70f9fe..d38a6d41a47 100644 }.size == 1) checkAnswer(ds, testData) } -@@ -2150,7 +2187,8 @@ class AdaptiveQueryExecSuite +@@ -2023,6 +2062,7 @@ class AdaptiveQueryExecSuite + df.collect() + assert(collect(df.queryExecution.executedPlan) { + case u: UnionExec => u ++ case u: CometUnionExec => u.originalPlan.asInstanceOf[UnionExec] + }.size == numUnion) + assert(collect(df.queryExecution.executedPlan) { + case r: AQEShuffleReadExec => r +@@ -2150,7 +2190,8 @@ class AdaptiveQueryExecSuite } } @@ -2350,7 +2402,7 @@ index 272be70f9fe..d38a6d41a47 100644 withTempView("t1", "t2") { def checkJoinStrategy(shouldShuffleHashJoin: Boolean): Unit = { Seq("100", "100000").foreach { size => -@@ -2236,7 +2274,8 @@ class AdaptiveQueryExecSuite +@@ -2236,7 +2277,8 @@ class AdaptiveQueryExecSuite } } @@ -2360,7 +2412,7 @@ index 272be70f9fe..d38a6d41a47 100644 withTempView("v") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", -@@ -2335,7 +2374,7 @@ class AdaptiveQueryExecSuite +@@ -2335,7 +2377,7 @@ class AdaptiveQueryExecSuite runAdaptiveAndVerifyResult(s"SELECT $repartition key1 FROM skewData1 " + s"JOIN skewData2 ON key1 = key2 GROUP BY key1") val shuffles1 = collect(adaptive1) { @@ -2369,7 +2421,7 @@ index 272be70f9fe..d38a6d41a47 100644 } assert(shuffles1.size == 3) // shuffles1.head is the top-level shuffle under the Aggregate operator -@@ -2348,7 +2387,7 @@ class AdaptiveQueryExecSuite +@@ -2348,7 +2390,7 @@ class AdaptiveQueryExecSuite runAdaptiveAndVerifyResult(s"SELECT $repartition key1 FROM skewData1 " + s"JOIN skewData2 ON key1 = key2") val shuffles2 = collect(adaptive2) { @@ -2378,7 +2430,7 @@ index 272be70f9fe..d38a6d41a47 100644 } if (hasRequiredDistribution) { assert(shuffles2.size == 3) -@@ -2382,7 +2421,8 @@ class AdaptiveQueryExecSuite +@@ -2382,7 +2424,8 @@ class AdaptiveQueryExecSuite } } @@ -2388,7 +2440,7 @@ index 272be70f9fe..d38a6d41a47 100644 CostEvaluator.instantiate( classOf[SimpleShuffleSortCostEvaluator].getCanonicalName, spark.sparkContext.getConf) intercept[IllegalArgumentException] { -@@ -2548,6 +2588,7 @@ class AdaptiveQueryExecSuite +@@ -2548,6 +2591,7 @@ class AdaptiveQueryExecSuite val (_, adaptive) = runAdaptiveAndVerifyResult(query) assert(adaptive.collect { case sort: SortExec => sort @@ -2396,7 +2448,7 @@ index 272be70f9fe..d38a6d41a47 100644 }.size == 1) val read = collect(adaptive) { case read: AQEShuffleReadExec => read -@@ -2565,7 +2606,8 @@ class AdaptiveQueryExecSuite +@@ -2565,7 +2609,8 @@ class AdaptiveQueryExecSuite } } @@ -2406,7 +2458,7 @@ index 272be70f9fe..d38a6d41a47 100644 withTempView("v") { withSQLConf( SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED.key -> "true", -@@ -2677,7 +2719,7 @@ class AdaptiveQueryExecSuite +@@ -2677,7 +2722,7 @@ class AdaptiveQueryExecSuite runAdaptiveAndVerifyResult("SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 " + "JOIN skewData3 ON value2 = value3") val shuffles1 = collect(adaptive1) { @@ -2415,7 +2467,7 @@ index 272be70f9fe..d38a6d41a47 100644 } assert(shuffles1.size == 4) val smj1 = findTopLevelSortMergeJoin(adaptive1) -@@ -2688,7 +2730,7 @@ class AdaptiveQueryExecSuite +@@ -2688,7 +2733,7 @@ class AdaptiveQueryExecSuite runAdaptiveAndVerifyResult("SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 " + "JOIN skewData3 ON value1 = value3") val shuffles2 = collect(adaptive2) { @@ -2424,7 +2476,7 @@ index 272be70f9fe..d38a6d41a47 100644 } assert(shuffles2.size == 4) val smj2 = findTopLevelSortMergeJoin(adaptive2) -@@ -2946,6 +2988,7 @@ class AdaptiveQueryExecSuite +@@ -2946,6 +2991,7 @@ class AdaptiveQueryExecSuite }.size == (if (firstAccess) 1 else 0)) assert(collect(initialExecutedPlan) { case s: SortExec => s @@ -2432,7 +2484,7 @@ index 272be70f9fe..d38a6d41a47 100644 }.size == (if (firstAccess) 2 else 0)) assert(collect(initialExecutedPlan) { case i: InMemoryTableScanLike => i -@@ -2958,6 +3001,7 @@ class AdaptiveQueryExecSuite +@@ -2958,6 +3004,7 @@ class AdaptiveQueryExecSuite }.isEmpty) assert(collect(finalExecutedPlan) { case s: SortExec => s @@ -2904,7 +2956,7 @@ index 09ed6955a51..236a4e99824 100644 } test(s"parquet widening conversion $fromType -> $toType") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala -index b8f3ea3c6f3..bbd44221288 100644 +index b8f3ea3c6f3..0d92198bd83 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.debug @@ -2915,7 +2967,27 @@ index b8f3ea3c6f3..bbd44221288 100644 import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext -@@ -125,7 +126,8 @@ class DebuggingSuite extends DebuggingSuiteBase with DisableAdaptiveExecutionSui +@@ -42,7 +43,8 @@ abstract class DebuggingSuiteBase extends SharedSparkSession { + testData.as[TestData].debug() + } + +- test("debugCodegen") { ++ test("debugCodegen", ++ IgnoreComet("Comet changes the WholeStageCodegen subtree count")) { + val df = spark.range(10).groupBy(col("id") * 2).count() + df.collect() + val res = codegenString(df.queryExecution.executedPlan) +@@ -51,7 +53,8 @@ abstract class DebuggingSuiteBase extends SharedSparkSession { + assert(res.contains("Object[]")) + } + +- test("debugCodegenStringSeq") { ++ test("debugCodegenStringSeq", ++ IgnoreComet("Comet changes the WholeStageCodegen subtree count")) { + val df = spark.range(10).groupBy(col("id") * 2).count() + df.collect() + val res = codegenStringSeq(df.queryExecution.executedPlan) +@@ -125,7 +128,8 @@ class DebuggingSuite extends DebuggingSuiteBase with DisableAdaptiveExecutionSui | id LongType: {}""".stripMargin)) } @@ -3066,6 +3138,28 @@ index 89f22186f7e..425233f00b2 100644 before { StateStore.stop() require(!StateStore.isMaintenanceRunning) +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +index 800a58f0c1d..22a3c49414d 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +@@ -35,6 +35,7 @@ import org.apache.spark.internal.config.Status._ + import org.apache.spark.rdd.RDD + import org.apache.spark.resource.ResourceProfile + import org.apache.spark.scheduler._ ++import org.apache.spark.sql.IgnoreComet + import org.apache.spark.sql.catalyst.InternalRow + import org.apache.spark.sql.catalyst.expressions.Attribute + import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +@@ -699,7 +700,8 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes + } + + test("SPARK-29894 test Codegen Stage Id in SparkPlanInfo", +- DisableAdaptiveExecution("WSCG rule is applied later in AQE")) { ++ DisableAdaptiveExecution("WSCG rule is applied later in AQE"), ++ IgnoreComet("Comet changes the WholeStageCodegen subtree count")) { + // with AQE on, the WholeStageCodegen rule is applied when running QueryStageExec. + val df = createTestDataFrame.select(count("*")) + val sparkPlanInfo = SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index c4b09c4b289..75c3437788e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala diff --git a/dev/diffs/4.1.1.diff b/dev/diffs/4.1.1.diff index 5cf6326dbf..e870c2b895 100644 --- a/dev/diffs/4.1.1.diff +++ b/dev/diffs/4.1.1.diff @@ -346,6 +346,30 @@ index 21a3ce1e122..f4762ab98f0 100644 SET spark.sql.ansi.enabled = false; -- In COMPENSATION views get invalidated if the type can't cast +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala +index c1a001117be..4545ba2fc20 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala +@@ -282,7 +282,8 @@ abstract class CTEInlineSuiteBase + } + } + +- test("CTE Predicate push-down and column pruning") { ++ test("CTE Predicate push-down and column pruning", ++ IgnoreComet("Comet changes the exchange reuse count asserted by this test")) { + withTempView("t") { + Seq((0, 1), (1, 2)).toDF("c1", "c2").createOrReplaceTempView("t") + val df = sql( +@@ -332,7 +333,8 @@ abstract class CTEInlineSuiteBase + } + } + +- test("CTE Predicate push-down and column pruning - combined predicate") { ++ test("CTE Predicate push-down and column pruning - combined predicate", ++ IgnoreComet("Comet changes the exchange reuse count asserted by this test")) { + withTempView("t") { + Seq((0, 1, 2, 3), (1, 2, 3, 4)).toDF("c1", "c2", "c3", "c4").createOrReplaceTempView("t") + val df = sql( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 0d807aeae4d..6d7744e771b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -2140,7 +2164,7 @@ index a3cfdc5a240..3793b6191bf 100644 }) checkAnswer(distinctWithId, Seq(Row(1, 0), Row(1, 0))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala -index 3e7d26f74bd..04cfdf075ab 100644 +index 3e7d26f74bd..6354c9a62e9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -27,12 +27,14 @@ import org.apache.spark.SparkException @@ -2418,7 +2442,27 @@ index 3e7d26f74bd..04cfdf075ab 100644 withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { val (_, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT key FROM testData GROUP BY key") -@@ -1891,7 +1921,7 @@ class AdaptiveQueryExecSuite +@@ -1833,7 +1863,8 @@ class AdaptiveQueryExecSuite + } + } + +- test("SPARK-35442: Support propagate empty relation through aggregate") { ++ test("SPARK-35442: Support propagate empty relation through aggregate", ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4412")) { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult( + "SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key") +@@ -1852,7 +1883,8 @@ class AdaptiveQueryExecSuite + } + } + +- test("SPARK-35442: Support propagate empty relation through union") { ++ test("SPARK-35442: Support propagate empty relation through union", ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4412")) { + def checkNumUnion(plan: SparkPlan, numUnion: Int): Unit = { + assert( + collect(plan) { +@@ -1891,7 +1923,7 @@ class AdaptiveQueryExecSuite val (_, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT id FROM v1 GROUP BY id DISTRIBUTE BY id") assert(collect(adaptivePlan) { @@ -2427,7 +2471,7 @@ index 3e7d26f74bd..04cfdf075ab 100644 }.length == 1) } } -@@ -1972,7 +2002,8 @@ class AdaptiveQueryExecSuite +@@ -1972,7 +2004,8 @@ class AdaptiveQueryExecSuite } } @@ -2437,7 +2481,7 @@ index 3e7d26f74bd..04cfdf075ab 100644 def hasRepartitionShuffle(plan: SparkPlan): Boolean = { find(plan) { case s: ShuffleExchangeLike => -@@ -2159,6 +2190,9 @@ class AdaptiveQueryExecSuite +@@ -2159,6 +2192,9 @@ class AdaptiveQueryExecSuite def checkNoCoalescePartitions(ds: Dataset[Row], origin: ShuffleOrigin): Unit = { assert(collect(ds.queryExecution.executedPlan) { case s: ShuffleExchangeExec if s.shuffleOrigin == origin && s.numPartitions == 2 => s @@ -2447,7 +2491,7 @@ index 3e7d26f74bd..04cfdf075ab 100644 }.size == 1) ds.collect() val plan = ds.queryExecution.executedPlan -@@ -2167,6 +2201,9 @@ class AdaptiveQueryExecSuite +@@ -2167,6 +2203,9 @@ class AdaptiveQueryExecSuite }.isEmpty) assert(collect(plan) { case s: ShuffleExchangeExec if s.shuffleOrigin == origin && s.numPartitions == 2 => s @@ -2457,7 +2501,15 @@ index 3e7d26f74bd..04cfdf075ab 100644 }.size == 1) checkAnswer(ds, testData) } -@@ -2331,7 +2368,8 @@ class AdaptiveQueryExecSuite +@@ -2196,6 +2235,7 @@ class AdaptiveQueryExecSuite + df.collect() + assert(collect(df.queryExecution.executedPlan) { + case u: UnionExec => u ++ case u: CometUnionExec => u.originalPlan.asInstanceOf[UnionExec] + }.size == numUnion) + assert(collect(df.queryExecution.executedPlan) { + case r: AQEShuffleReadExec => r +@@ -2331,7 +2371,8 @@ class AdaptiveQueryExecSuite } } @@ -2467,7 +2519,7 @@ index 3e7d26f74bd..04cfdf075ab 100644 withTempView("t1", "t2") { def checkJoinStrategy(shouldShuffleHashJoin: Boolean): Unit = { Seq("100", "100000").foreach { size => -@@ -2417,7 +2455,8 @@ class AdaptiveQueryExecSuite +@@ -2417,7 +2458,8 @@ class AdaptiveQueryExecSuite } } @@ -2477,7 +2529,7 @@ index 3e7d26f74bd..04cfdf075ab 100644 withTempView("v") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", -@@ -2516,7 +2555,7 @@ class AdaptiveQueryExecSuite +@@ -2516,7 +2558,7 @@ class AdaptiveQueryExecSuite runAdaptiveAndVerifyResult(s"SELECT $repartition key1 FROM skewData1 " + s"JOIN skewData2 ON key1 = key2 GROUP BY key1") val shuffles1 = collect(adaptive1) { @@ -2486,7 +2538,7 @@ index 3e7d26f74bd..04cfdf075ab 100644 } assert(shuffles1.size == 3) // shuffles1.head is the top-level shuffle under the Aggregate operator -@@ -2529,7 +2568,7 @@ class AdaptiveQueryExecSuite +@@ -2529,7 +2571,7 @@ class AdaptiveQueryExecSuite runAdaptiveAndVerifyResult(s"SELECT $repartition key1 FROM skewData1 " + s"JOIN skewData2 ON key1 = key2") val shuffles2 = collect(adaptive2) { @@ -2495,7 +2547,7 @@ index 3e7d26f74bd..04cfdf075ab 100644 } if (hasRequiredDistribution) { assert(shuffles2.size == 3) -@@ -2563,7 +2602,8 @@ class AdaptiveQueryExecSuite +@@ -2563,7 +2605,8 @@ class AdaptiveQueryExecSuite } } @@ -2505,7 +2557,7 @@ index 3e7d26f74bd..04cfdf075ab 100644 CostEvaluator.instantiate( classOf[SimpleShuffleSortCostEvaluator].getCanonicalName, spark.sparkContext.getConf) intercept[IllegalArgumentException] { -@@ -2729,6 +2769,7 @@ class AdaptiveQueryExecSuite +@@ -2729,6 +2772,7 @@ class AdaptiveQueryExecSuite val (_, adaptive) = runAdaptiveAndVerifyResult(query) assert(adaptive.collect { case sort: SortExec => sort @@ -2513,7 +2565,7 @@ index 3e7d26f74bd..04cfdf075ab 100644 }.size == 1) val read = collect(adaptive) { case read: AQEShuffleReadExec => read -@@ -2746,7 +2787,8 @@ class AdaptiveQueryExecSuite +@@ -2746,7 +2790,8 @@ class AdaptiveQueryExecSuite } } @@ -2523,7 +2575,7 @@ index 3e7d26f74bd..04cfdf075ab 100644 withTempView("v") { withSQLConf( SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED.key -> "true", -@@ -2858,7 +2900,7 @@ class AdaptiveQueryExecSuite +@@ -2858,7 +2903,7 @@ class AdaptiveQueryExecSuite runAdaptiveAndVerifyResult("SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 " + "JOIN skewData3 ON value2 = value3") val shuffles1 = collect(adaptive1) { @@ -2532,7 +2584,7 @@ index 3e7d26f74bd..04cfdf075ab 100644 } assert(shuffles1.size == 4) val smj1 = findTopLevelSortMergeJoin(adaptive1) -@@ -2869,7 +2911,7 @@ class AdaptiveQueryExecSuite +@@ -2869,7 +2914,7 @@ class AdaptiveQueryExecSuite runAdaptiveAndVerifyResult("SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 " + "JOIN skewData3 ON value1 = value3") val shuffles2 = collect(adaptive2) { @@ -2541,7 +2593,7 @@ index 3e7d26f74bd..04cfdf075ab 100644 } assert(shuffles2.size == 4) val smj2 = findTopLevelSortMergeJoin(adaptive2) -@@ -3127,6 +3169,7 @@ class AdaptiveQueryExecSuite +@@ -3127,6 +3172,7 @@ class AdaptiveQueryExecSuite }.size == (if (firstAccess) 1 else 0)) assert(collect(initialExecutedPlan) { case s: SortExec => s @@ -2549,7 +2601,7 @@ index 3e7d26f74bd..04cfdf075ab 100644 }.size == (if (firstAccess) 2 else 0)) assert(collect(initialExecutedPlan) { case i: InMemoryTableScanLike => i -@@ -3139,6 +3182,7 @@ class AdaptiveQueryExecSuite +@@ -3139,6 +3185,7 @@ class AdaptiveQueryExecSuite }.isEmpty) assert(collect(finalExecutedPlan) { case s: SortExec => s @@ -2557,6 +2609,14 @@ index 3e7d26f74bd..04cfdf075ab 100644 }.isEmpty) assert(collect(initialExecutedPlan) { case i: InMemoryTableScanLike => i +@@ -3381,6 +3428,7 @@ class AdaptiveQueryExecSuite + df.collect() + assert(collect(df.queryExecution.executedPlan) { + case u: UnionExec => u ++ case u: CometUnionExec => u.originalPlan.asInstanceOf[UnionExec] + }.size == numUnion) + assert(collect(df.queryExecution.executedPlan) { + case r: AQEShuffleReadExec => r diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala index 47b935a2880..3e9b87f5c32 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala @@ -3087,7 +3147,7 @@ index 1cc6d3afbee..8275727fbb4 100644 private def testWithTempDir(name: String)(block: File => Unit): Unit = test(name) { withTempDir { dir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala -index b8f3ea3c6f3..bbd44221288 100644 +index b8f3ea3c6f3..0d92198bd83 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.debug @@ -3098,7 +3158,27 @@ index b8f3ea3c6f3..bbd44221288 100644 import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext -@@ -125,7 +126,8 @@ class DebuggingSuite extends DebuggingSuiteBase with DisableAdaptiveExecutionSui +@@ -42,7 +43,8 @@ abstract class DebuggingSuiteBase extends SharedSparkSession { + testData.as[TestData].debug() + } + +- test("debugCodegen") { ++ test("debugCodegen", ++ IgnoreComet("Comet changes the WholeStageCodegen subtree count")) { + val df = spark.range(10).groupBy(col("id") * 2).count() + df.collect() + val res = codegenString(df.queryExecution.executedPlan) +@@ -51,7 +53,8 @@ abstract class DebuggingSuiteBase extends SharedSparkSession { + assert(res.contains("Object[]")) + } + +- test("debugCodegenStringSeq") { ++ test("debugCodegenStringSeq", ++ IgnoreComet("Comet changes the WholeStageCodegen subtree count")) { + val df = spark.range(10).groupBy(col("id") * 2).count() + df.collect() + val res = codegenStringSeq(df.queryExecution.executedPlan) +@@ -125,7 +128,8 @@ class DebuggingSuite extends DebuggingSuiteBase with DisableAdaptiveExecutionSui | id LongType: {}""".stripMargin)) } @@ -3270,6 +3350,28 @@ index e839ccd35ec..d182aa07b44 100644 before { StateStore.stop() require(!StateStore.isMaintenanceRunning) +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +index 800a58f0c1d..22a3c49414d 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +@@ -35,6 +35,7 @@ import org.apache.spark.internal.config.Status._ + import org.apache.spark.rdd.RDD + import org.apache.spark.resource.ResourceProfile + import org.apache.spark.scheduler._ ++import org.apache.spark.sql.IgnoreComet + import org.apache.spark.sql.catalyst.InternalRow + import org.apache.spark.sql.catalyst.expressions.Attribute + import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +@@ -699,7 +700,8 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes + } + + test("SPARK-29894 test Codegen Stage Id in SparkPlanInfo", +- DisableAdaptiveExecution("WSCG rule is applied later in AQE")) { ++ DisableAdaptiveExecution("WSCG rule is applied later in AQE"), ++ IgnoreComet("Comet changes the WholeStageCodegen subtree count")) { + // with AQE on, the WholeStageCodegen rule is applied when running QueryStageExec. + val df = createTestDataFrame.select(count("*")) + val sparkPlanInfo = SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 83ebd24384c..32511091bb2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala diff --git a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala index 2714a7e466..5146138c4d 100644 --- a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala +++ b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala @@ -131,6 +131,9 @@ object CometMax extends CometAggregateExpressionSerde[Max] { } object CometCount extends CometAggregateExpressionSerde[Count] { + + override def supportsMixedPartialFinal: Boolean = true + override def convert( aggExpr: AggregateExpression, expr: Count, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 7d5398ae62..3334f0c362 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -1813,6 +1813,12 @@ case class CometHashAggregateExec( this.output == other.output && this.groupingExpressions == other.groupingExpressions && this.aggregateExpressions == other.aggregateExpressions && + // resultExpressions carries any post-aggregate projection fused into the Final + // aggregate (e.g. count+1 vs count-1 in count-bug decorrelation). It must + // participate in identity, otherwise two aggregates differing only in that + // projection canonicalize identically and an enclosing broadcast exchange is + // wrongly deduplicated by exchange reuse. See issue #4242. + this.resultExpressions == other.resultExpressions && this.input == other.input && this.modes == other.modes && this.child == other.child && @@ -1823,7 +1829,14 @@ case class CometHashAggregateExec( } override def hashCode(): Int = - Objects.hashCode(output, groupingExpressions, aggregateExpressions, input, modes, child) + Objects.hashCode( + output, + groupingExpressions, + aggregateExpressions, + resultExpressions, + input, + modes, + child) override protected def outputExpressions: Seq[NamedExpression] = resultExpressions } diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10/extended.txt index 07af300183..309ce2c012 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10/extended.txt @@ -1,61 +1,62 @@ -TakeOrderedAndProject -+- HashAggregate [COMET: Spark Final aggregate without Comet Partial requires compatible intermediate buffer formats] - +- Exchange - +- HashAggregate - +- Project - +- BroadcastHashJoin - :- Project - : +- BroadcastHashJoin - : :- Project - : : +- Filter - : : +- BroadcastHashJoin - : : :- BroadcastHashJoin [COMET: Unsupported join type ExistenceJoin(exists#1)] - : : : :- CometNativeColumnarToRow - : : : : +- CometBroadcastHashJoin - : : : : :- CometFilter - : : : : : +- CometNativeScan parquet spark_catalog.default.customer - : : : : +- CometBroadcastExchange - : : : : +- CometProject - : : : : +- CometBroadcastHashJoin - : : : : :- CometNativeScan parquet spark_catalog.default.store_sales - : : : : : +- CometSubqueryBroadcast - : : : : : +- CometBroadcastExchange - : : : : : +- CometProject - : : : : : +- CometFilter - : : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : : +- CometBroadcastExchange - : : : : +- CometProject - : : : : +- CometFilter - : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : +- BroadcastExchange - : : : +- CometNativeColumnarToRow - : : : +- CometProject - : : : +- CometBroadcastHashJoin - : : : :- CometNativeScan parquet spark_catalog.default.web_sales - : : : : +- ReusedSubquery - : : : +- CometBroadcastExchange - : : : +- CometProject - : : : +- CometFilter - : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : +- BroadcastExchange - : : +- CometNativeColumnarToRow - : : +- CometProject - : : +- CometBroadcastHashJoin - : : :- CometNativeScan parquet spark_catalog.default.catalog_sales - : : : +- ReusedSubquery - : : +- CometBroadcastExchange - : : +- CometProject - : : +- CometFilter - : : +- CometNativeScan parquet spark_catalog.default.date_dim - : +- BroadcastExchange - : +- CometNativeColumnarToRow - : +- CometProject - : +- CometFilter - : +- CometNativeScan parquet spark_catalog.default.customer_address - +- BroadcastExchange - +- CometNativeColumnarToRow - +- CometProject - +- CometFilter - +- CometNativeScan parquet spark_catalog.default.customer_demographics +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate + +- CometColumnarExchange + +- HashAggregate + +- Project + +- BroadcastHashJoin + :- Project + : +- BroadcastHashJoin + : :- Project + : : +- Filter + : : +- BroadcastHashJoin + : : :- BroadcastHashJoin [COMET: Unsupported join type ExistenceJoin(exists#1)] + : : : :- CometNativeColumnarToRow + : : : : +- CometBroadcastHashJoin + : : : : :- CometFilter + : : : : : +- CometNativeScan parquet spark_catalog.default.customer + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometBroadcastHashJoin + : : : : :- CometNativeScan parquet spark_catalog.default.store_sales + : : : : : +- CometSubqueryBroadcast + : : : : : +- CometBroadcastExchange + : : : : : +- CometProject + : : : : : +- CometFilter + : : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : +- BroadcastExchange + : : : +- CometNativeColumnarToRow + : : : +- CometProject + : : : +- CometBroadcastHashJoin + : : : :- CometNativeScan parquet spark_catalog.default.web_sales + : : : : +- ReusedSubquery + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : +- BroadcastExchange + : : +- CometNativeColumnarToRow + : : +- CometProject + : : +- CometBroadcastHashJoin + : : :- CometNativeScan parquet spark_catalog.default.catalog_sales + : : : +- ReusedSubquery + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.date_dim + : +- BroadcastExchange + : +- CometNativeColumnarToRow + : +- CometProject + : +- CometFilter + : +- CometNativeScan parquet spark_catalog.default.customer_address + +- BroadcastExchange + +- CometNativeColumnarToRow + +- CometProject + +- CometFilter + +- CometNativeScan parquet spark_catalog.default.customer_demographics -Comet accelerated 35 out of 54 eligible operators (64%). Final plan contains 5 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 38 out of 54 eligible operators (70%). Final plan contains 6 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q44/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q44/extended.txt index d2f31a9b37..65874ebdbc 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q44/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q44/extended.txt @@ -35,14 +35,7 @@ TakeOrderedAndProject : : +- CometSort : : +- CometExchange : : +- CometFilter - : : : +- Subquery - : : : +- CometNativeColumnarToRow - : : : +- CometHashAggregate - : : : +- CometExchange - : : : +- CometHashAggregate - : : : +- CometProject - : : : +- CometFilter - : : : +- CometNativeScan parquet spark_catalog.default.store_sales + : : : +- ReusedSubquery : : +- CometHashAggregate : : +- CometExchange : : +- CometHashAggregate @@ -60,4 +53,4 @@ TakeOrderedAndProject +- CometFilter +- CometNativeScan parquet spark_catalog.default.item -Comet accelerated 36 out of 55 eligible operators (65%). Final plan contains 6 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 30 out of 49 eligible operators (61%). Final plan contains 5 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index cd0beb56cc..ca670d52af 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -2067,6 +2067,62 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + // https://github.com/apache/datafusion-comet/issues/4242 + // AQE's PropagateEmptyRelationAfterAQE rule pattern-matches BaseAggregateExec only, not + // CometHashAggregateExec. With mixed Spark-Partial / Comet-Final COUNT, the Final escapes + // the rule and propagation of empty intermediate results stops, changing downstream results + // in some queries. + test("issue #4242: AQE PropagateEmptyRelation with mixed Spark Partial / Comet Final COUNT") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withTempDir { dir => + spark.range(10).toDF("a").write.parquet(dir.getCanonicalPath + "/t1") + spark.range(10).toDF("b").write.parquet(dir.getCanonicalPath + "/t2") + withTempView("t1", "t2") { + spark.read.parquet(dir.getCanonicalPath + "/t1").createOrReplaceTempView("t1") + spark.read.parquet(dir.getCanonicalPath + "/t2").createOrReplaceTempView("t2") + + // Empty left side joined with empty right side then grouped count. + val q1 = + """SELECT inner_a, COUNT(*) FROM ( + | SELECT t1.a AS inner_a FROM t1 LEFT OUTER JOIN t2 ON t1.a = t2.b + | WHERE t1.a > 100 AND t2.b > 100 + |) GROUP BY inner_a""".stripMargin + checkSparkAnswer(q1) + + // Global ungrouped count over an empty input should produce one row with COUNT = 0. + val q2 = "SELECT COUNT(*) FROM t1 WHERE a > 100" + checkSparkAnswer(q2) + } + } + } + } + + // https://github.com/apache/datafusion-comet/issues/4242 + // Mirrors the OR pattern in Spark's in-count-bug.sql. Decorrelating correlated IN with + // COUNT inside an OR is known to drop a row when partial/final aggregate stages are + // split between Spark and Comet. + test("issue #4242: count-bug decorrelation with correlated IN OR pattern") { + withTempView("t1", "t2") { + sql("CREATE TEMPORARY VIEW t1(c1, c2) AS VALUES (0, 1), (1, 2)") + sql("CREATE TEMPORARY VIEW t2(c1, c2) AS VALUES (0, 2), (0, 3)") + + val orQuery = + """SELECT * FROM t1 WHERE + | c1 IN (SELECT count(*) + 1 FROM t2 WHERE t2.c1 = t1.c1) OR + | c2 IN (SELECT count(*) - 1 FROM t2 WHERE t2.c1 = t1.c1)""".stripMargin + checkSparkAnswer(orQuery) + + val orAndNotInQuery = + """SELECT * FROM t1 WHERE + | (c1 IN (SELECT count(*) + 1 FROM t2 WHERE t2.c1 = t1.c1) OR + | c2 IN (SELECT count(*) - 1 FROM t2 WHERE t2.c1 = t1.c1)) AND + | c1 NOT IN (SELECT count(*) FROM t2 WHERE t2.c1 = t1.c2)""".stripMargin + checkSparkAnswer(orAndNotInQuery) + } + } + protected def checkSparkAnswerAndNumOfAggregates(query: String, numAggregates: Int): Unit = { val df = sql(query) checkSparkAnswer(df)