Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@
},
"INVALID_SQL_ARG" : {
"message" : [
"The argument <name> of `sql()` is invalid. Consider to replace it by a SQL literal statement."
"The argument <name> of `sql()` is invalid. Consider to replace it by a SQL literal."
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed this to address the review comment at #38864 (comment)

]
},
"INVALID_SQL_SYNTAX" : {
Expand Down Expand Up @@ -1164,7 +1164,7 @@
},
"UNBOUND_SQL_PARAMETER" : {
"message" : [
"Found the unbound parameter: <name>. Please, fix `args` and provide a mapping of the parameter to a SQL literal statement."
"Found the unbound parameter: <name>. Please, fix `args` and provide a mapping of the parameter to a SQL literal."
]
},
"UNCLOSED_BRACKETED_COMMENT" : {
Expand Down Expand Up @@ -5219,4 +5219,4 @@
"grouping() can only be used with GroupingSets/Cube/Rollup"
]
}
}
}
2 changes: 2 additions & 0 deletions python/docs/source/migration_guide/pyspark_3.3_to_3.4.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ Upgrading from PySpark 3.3 to 3.4
* In Spark 3.4, the ``Series.concat`` sort parameter will be respected to follow pandas 1.4 behaviors.

* In Spark 3.4, the ``DataFrame.__setitem__`` will make a copy and replace pre-existing arrays, which will NOT be over-written to follow pandas 1.4 behaviors.

* In Spark 3.4, the ``SparkSession.sql`` and the Pandas on Spark API ``sql`` have got new parameter ``args`` which provides binding of named parameters to their SQL literals.
20 changes: 18 additions & 2 deletions python/pyspark/pandas/sql_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import os
import string
from typing import Any, Optional, Union, List, Sequence, Mapping, Tuple
from typing import Any, Dict, Optional, Union, List, Sequence, Mapping, Tuple
import uuid
import warnings

Expand All @@ -43,6 +43,7 @@
def sql(
query: str,
index_col: Optional[Union[str, List[str]]] = None,
args: Dict[str, str] = {},
**kwargs: Any,
) -> DataFrame:
"""
Expand All @@ -57,6 +58,8 @@ def sql(
* pandas Series
* string

Also the method can bind named parameters to SQL literals from `args`.

Parameters
----------
query : str
Expand Down Expand Up @@ -99,6 +102,12 @@ def sql(
e f 3 6

Also note that the index name(s) should be matched to the existing name.
args : dict
A dictionary of named parameters that begin from the `:` marker and
their SQL literals for substituting.

.. versionadded:: 3.4.0
Comment thread
MaxGekk marked this conversation as resolved.

kwargs
other variables that the user want to set that can be referenced in the query

Expand Down Expand Up @@ -152,6 +161,13 @@ def sql(
0 1
1 2
2 3

And substitude named parameters with the `:` prefix by SQL literals.

>>> ps.sql("SELECT * FROM range(10) WHERE id > :bound1", args={"bound1":"7"})
id
0 8
1 9
"""
if os.environ.get("PYSPARK_PANDAS_SQL_LEGACY") == "1":
from pyspark.pandas import sql_processor
Expand All @@ -166,7 +182,7 @@ def sql(
session = default_session()
formatter = PandasSQLStringFormatter(session)
try:
sdf = session.sql(formatter.format(query, **kwargs))
sdf = session.sql(formatter.format(query, **kwargs), args)
finally:
formatter.clear()

Expand Down
23 changes: 19 additions & 4 deletions python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1293,20 +1293,26 @@ def prepare(obj: Any) -> Any:
df._schema = struct
return df

def sql(self, sqlQuery: str, **kwargs: Any) -> DataFrame:
def sql(self, sqlQuery: str, args: Dict[str, str] = {}, **kwargs: Any) -> DataFrame:
"""Returns a :class:`DataFrame` representing the result of the given query.
When ``kwargs`` is specified, this method formats the given string by using the Python
standard formatter.
standard formatter. The method binds named parameters to SQL literals from `args`.

.. versionadded:: 2.0.0

.. versionchanged:: 3.4.0
Support Spark Connect.
Support Spark Connect and parameterized SQL.

Parameters
----------
sqlQuery : str
SQL query string.
args : dict
A dictionary of named parameters that begin from the `:` marker and
their SQL literals for substituting.

.. versionadded:: 3.4.0
Comment thread
HyukjinKwon marked this conversation as resolved.

kwargs : dict
Other variables that the user wants to set that can be referenced in the query

Expand Down Expand Up @@ -1380,13 +1386,22 @@ def sql(self, sqlQuery: str, **kwargs: Any) -> DataFrame:
| 2| 4|
| 3| 6|
+---+---+

And substitude named parameters with the `:` prefix by SQL literals.

>>> spark.sql("SELECT * FROM {df} WHERE {df[B]} > :minB", {"minB" : "5"}, df=mydf).show()
+---+---+
| A| B|
+---+---+
| 3| 6|
+---+---+
"""

formatter = SQLStringFormatter(self)
if len(kwargs) > 0:
sqlQuery = formatter.format(sqlQuery, **kwargs)
try:
return DataFrame(self._jsparkSession.sql(sqlQuery), self)
return DataFrame(self._jsparkSession.sql(sqlQuery, args), self)
finally:
if len(kwargs) > 0:
formatter.clear()
Expand Down