Skip to content

Commit 029393a

Browse files
pan3793sarutak
authored andcommitted
[SPARK-54054][CONNECT] Support row position for SparkConnectResultSet
### What changes were proposed in this pull request? This PR implements below methods of the `java.sql.ResultSet` interface for `SparkConnectResultSet` ``` boolean isBeforeFirst() throws SQLException; boolean isAfterLast() throws SQLException; boolean isFirst() throws SQLException; boolean isLast() throws SQLException; int getRow() throws SQLException; void setFetchDirection(int direction) throws SQLException; int getFetchDirection() throws SQLException; ``` ### Why are the changes needed? Implement more JDBC APIs. ### Does this PR introduce _any_ user-facing change? No, it's new feature. ### How was this patch tested? New UTs are added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52756 from pan3793/SPARK-54054. Lead-authored-by: Cheng Pan <[email protected]> Co-authored-by: Cheng Pan <[email protected]> Signed-off-by: Kousuke Saruta <[email protected]>
1 parent 30ccf8d commit 029393a

File tree

3 files changed

+191
-10
lines changed

3 files changed

+191
-10
lines changed

sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectResultSet.scala

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import java.util.Calendar
2525

2626
import org.apache.spark.sql.Row
2727
import org.apache.spark.sql.connect.client.SparkResult
28+
import org.apache.spark.sql.connect.client.jdbc.util.JdbcErrorUtils
2829

2930
class SparkConnectResultSet(
3031
sparkResult: SparkResult[Row],
@@ -34,16 +35,25 @@ class SparkConnectResultSet(
3435

3536
private var currentRow: Row = _
3637

37-
private var _wasNull: Boolean = false
38+
// cursor is 1-based, range in [0, length + 1]
39+
// - 0 means beforeFirstRow
40+
// - value in [1, length] means the row number
41+
// - length + 1 means afterLastRow
42+
private var cursor: Int = 0
3843

44+
private var _wasNull: Boolean = false
3945
override def wasNull: Boolean = _wasNull
4046

4147
override def next(): Boolean = {
4248
val hasNext = iterator.hasNext
4349
if (hasNext) {
4450
currentRow = iterator.next()
51+
cursor += 1
4552
} else {
4653
currentRow = null
54+
if (cursor > 0 && cursor == sparkResult.length) {
55+
cursor += 1
56+
}
4757
}
4858
hasNext
4959
}
@@ -253,13 +263,25 @@ class SparkConnectResultSet(
253263
override def getBigDecimal(columnLabel: String): java.math.BigDecimal =
254264
throw new SQLFeatureNotSupportedException
255265

256-
override def isBeforeFirst: Boolean = throw new SQLFeatureNotSupportedException
266+
override def isBeforeFirst: Boolean = {
267+
checkOpen()
268+
cursor < 1 && sparkResult.length > 0
269+
}
257270

258-
override def isAfterLast: Boolean = throw new SQLFeatureNotSupportedException
271+
override def isFirst: Boolean = {
272+
checkOpen()
273+
cursor == 1
274+
}
259275

260-
override def isFirst: Boolean = throw new SQLFeatureNotSupportedException
276+
override def isLast: Boolean = {
277+
checkOpen()
278+
cursor > 0 && cursor == sparkResult.length
279+
}
261280

262-
override def isLast: Boolean = throw new SQLFeatureNotSupportedException
281+
override def isAfterLast: Boolean = {
282+
checkOpen()
283+
cursor > 0 && cursor > sparkResult.length
284+
}
263285

264286
override def beforeFirst(): Unit = throw new SQLFeatureNotSupportedException
265287

@@ -269,19 +291,37 @@ class SparkConnectResultSet(
269291

270292
override def last(): Boolean = throw new SQLFeatureNotSupportedException
271293

272-
override def getRow: Int = throw new SQLFeatureNotSupportedException
294+
override def getRow: Int = {
295+
checkOpen()
296+
297+
if (cursor < 1 || cursor > sparkResult.length) {
298+
0
299+
} else {
300+
cursor
301+
}
302+
}
273303

274304
override def absolute(row: Int): Boolean = throw new SQLFeatureNotSupportedException
275305

276306
override def relative(rows: Int): Boolean = throw new SQLFeatureNotSupportedException
277307

278308
override def previous(): Boolean = throw new SQLFeatureNotSupportedException
279309

280-
override def setFetchDirection(direction: Int): Unit =
281-
throw new SQLFeatureNotSupportedException
310+
override def setFetchDirection(direction: Int): Unit = {
311+
checkOpen()
312+
assert(this.getType == ResultSet.TYPE_FORWARD_ONLY)
282313

283-
override def getFetchDirection: Int =
284-
throw new SQLFeatureNotSupportedException
314+
if (direction != ResultSet.FETCH_FORWARD) {
315+
throw new SQLException(
316+
s"Fetch direction ${JdbcErrorUtils.stringifyFetchDirection(direction)} is not supported " +
317+
s"for ${JdbcErrorUtils.stringifyResultSetType(ResultSet.TYPE_FORWARD_ONLY)} result set.")
318+
}
319+
}
320+
321+
override def getFetchDirection: Int = {
322+
checkOpen()
323+
ResultSet.FETCH_FORWARD
324+
}
285325

286326
override def setFetchSize(rows: Int): Unit =
287327
throw new SQLFeatureNotSupportedException

sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/util/JdbcErrorUtils.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,20 @@ private[jdbc] object JdbcErrorUtils {
3737
case _ =>
3838
throw new IllegalArgumentException(s"Invalid holdability: $holdability")
3939
}
40+
41+
def stringifyResultSetType(typ: Int): String = typ match {
42+
case ResultSet.TYPE_FORWARD_ONLY => "FORWARD_ONLY"
43+
case ResultSet.TYPE_SCROLL_INSENSITIVE => "SCROLL_INSENSITIVE"
44+
case ResultSet.TYPE_SCROLL_SENSITIVE => "SCROLL_SENSITIVE"
45+
case _ =>
46+
throw new IllegalArgumentException(s"Invalid ResultSet type: $typ")
47+
}
48+
49+
def stringifyFetchDirection(direction: Int): String = direction match {
50+
case ResultSet.FETCH_FORWARD => "FETCH_FORWARD"
51+
case ResultSet.FETCH_REVERSE => "FETCH_REVERSE"
52+
case ResultSet.FETCH_UNKNOWN => "FETCH_UNKNOWN"
53+
case _ =>
54+
throw new IllegalArgumentException(s"Invalid fetch direction: $direction")
55+
}
4056
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connect.client.jdbc
19+
20+
import java.sql.{Array => _, _}
21+
22+
import org.apache.spark.sql.connect.client.jdbc.test.JdbcHelper
23+
import org.apache.spark.sql.connect.test.{ConnectFunSuite, RemoteSparkSession}
24+
25+
class SparkConnectResultSetSuite extends ConnectFunSuite with RemoteSparkSession
26+
with JdbcHelper {
27+
28+
def jdbcUrl: String = s"jdbc:sc://localhost:$serverPort"
29+
30+
test("type, concurrency, fetch direction of result set") {
31+
withExecuteQuery("SELECT 1") { rs =>
32+
rs.getType === ResultSet.TYPE_FORWARD_ONLY
33+
rs.getConcurrency === ResultSet.CONCUR_READ_ONLY
34+
rs.getFetchDirection === ResultSet.FETCH_FORWARD
35+
Seq(ResultSet.FETCH_FORWARD, ResultSet.FETCH_REVERSE,
36+
ResultSet.FETCH_UNKNOWN).foreach { direction =>
37+
if (direction == ResultSet.FETCH_FORWARD) {
38+
rs.setFetchDirection(direction)
39+
} else {
40+
intercept[SQLException] {
41+
rs.setFetchDirection(direction)
42+
}
43+
}
44+
}
45+
}
46+
}
47+
48+
test("row position for empty result set") {
49+
withExecuteQuery("SELECT * FROM range(0)") { rs =>
50+
assert(rs.getRow === 0)
51+
assert(!rs.isBeforeFirst)
52+
assert(!rs.isFirst)
53+
assert(!rs.isLast)
54+
assert(!rs.isAfterLast)
55+
56+
assert(!rs.next())
57+
58+
assert(rs.getRow === 0)
59+
assert(!rs.isBeforeFirst)
60+
assert(!rs.isFirst)
61+
assert(!rs.isLast)
62+
assert(!rs.isAfterLast)
63+
}
64+
}
65+
66+
test("row position for one row result set") {
67+
withExecuteQuery("SELECT * FROM range(1)") { rs =>
68+
assert(rs.getRow === 0)
69+
assert(rs.isBeforeFirst)
70+
assert(!rs.isFirst)
71+
assert(!rs.isLast)
72+
assert(!rs.isAfterLast)
73+
74+
assert(rs.next())
75+
76+
assert(rs.getRow === 1)
77+
assert(!rs.isBeforeFirst)
78+
assert(rs.isFirst)
79+
assert(rs.isLast)
80+
assert(!rs.isAfterLast)
81+
82+
assert(!rs.next())
83+
84+
assert(rs.getRow === 0)
85+
assert(!rs.isBeforeFirst)
86+
assert(!rs.isFirst)
87+
assert(!rs.isLast)
88+
assert(rs.isAfterLast)
89+
}
90+
}
91+
92+
test("row position for multiple rows result set") {
93+
withExecuteQuery("SELECT * FROM range(2)") { rs =>
94+
assert(rs.getRow === 0)
95+
assert(rs.isBeforeFirst)
96+
assert(!rs.isFirst)
97+
assert(!rs.isLast)
98+
assert(!rs.isAfterLast)
99+
100+
assert(rs.next())
101+
102+
assert(rs.getRow === 1)
103+
assert(!rs.isBeforeFirst)
104+
assert(rs.isFirst)
105+
assert(!rs.isLast)
106+
assert(!rs.isAfterLast)
107+
108+
assert(rs.next())
109+
110+
assert(rs.getRow === 2)
111+
assert(!rs.isBeforeFirst)
112+
assert(!rs.isFirst)
113+
assert(rs.isLast)
114+
assert(!rs.isAfterLast)
115+
116+
assert(!rs.next())
117+
118+
assert(rs.getRow === 0)
119+
assert(!rs.isBeforeFirst)
120+
assert(!rs.isFirst)
121+
assert(!rs.isLast)
122+
assert(rs.isAfterLast)
123+
}
124+
}
125+
}

0 commit comments

Comments
 (0)