Feasibility of reading result of SQL query into Pyspark dataframe using Singlestore Spark Connector

Hi, I am new to Singlestore and PySpark.
I am referring the below link for getting information on how to use Pyspark and Singlestore together:-

I have run into a requirement wherein I have to load the result of an SQL into a Pyspark dataframe.
The SQL will be of the form
SELECT FROM table1 LEFT JOIN table2 LEFT JOIN table3 LEFT JOIN table4

Based on the information in the above link, it seems it can be done using the “dbtable” or “query” options.
Can you guide me to any documentation on exactly how it can be done ? I don’t find any suitable examples on this feature.

Any help would be greatly appreciated.

HI @nandaswayam, welcome to SingleStore forums!

I’m working on getting an answer to your question. Please stay tuned!

Hi @nandaswayam ,
There are several ways, you can load the result.

  1. Pass the entire SQL query to the SingleStore with the “query” option
df = spark.read \
    .format("singlestore") \
    .option("ddlEndpoint", "singlestore-ciab-for-zeppelin:3306") \
    .option("user", "root") \
    .option("password", "my_password") \
    .option("database", "demoDB") \
    .option("query", "SELECT * FROM table1 JOIN table2 JOIN table3 JOIN table4") \
    .load()

To use LEFT JOIN, you will need to specify the join condition in the query (SELECT * FROM table1 LEFT JOIN table2 ON a=b). This is applicable for all methods.

  1. Create several DataFrames for each table and join them. When you will try to get data, the connector will try to use SQLPushdown to push all computations as one SQL query to the database.
df1 = spark.read \
    .format("singlestore") \
    .option("ddlEndpoint", "singlestore-ciab-for-zeppelin:3306") \
    .option("user", "root") \
    .option("password", "my_password") \
    .load("demoDB.table1")

df2 = spark.read \
    .format("singlestore") \
    .option("ddlEndpoint", "singlestore-ciab-for-zeppelin:3306") \
    .option("user", "root") \
    .option("password", "my_password") \
    .load("demoDB.table2")

df3 = spark.read \
    .format("singlestore") \
    .option("ddlEndpoint", "singlestore-ciab-for-zeppelin:3306") \
    .option("user", "root") \
    .option("password", "my_password") \
    .load("demoDB.table3")

df4 = spark.read \
    .format("singlestore") \
    .option("ddlEndpoint", "singlestore-ciab-for-zeppelin:3306") \
    .option("user", "root") \
    .option("password", "my_password") \
    .load("demoDB.table4")

df = df1.join(df2).join(df3).join(df4)
  1. Use Spark SQL. The connector will try to convert Spark SQL to the SQL query that can be executed in the SingleStore in the same way as in the previous method. Here is a little bit more about this way of Spark table creation CREATE DATASOURCE TABLE - Spark 3.4.1 Documentation
spark.sql("create database demoDB")
spark.sql("use demoDB")
spark.sql("create table table1 using singlestore options ('dbtable'='demoDB.table1', 'ddlEndpoint'='singlestore-ciab-for-zeppelin:3306', 'user'='root', 'password'='my_password')")
spark.sql("create table table2 using singlestore options ('dbtable'='demoDB.table2', 'ddlEndpoint'='singlestore-ciab-for-zeppelin:3306', 'user'='root', 'password'='my_password')")
spark.sql("create table table3 using singlestore options ('dbtable'='demoDB.table3', 'ddlEndpoint'='singlestore-ciab-for-zeppelin:3306', 'user'='root', 'password'='my_password')")
spark.sql("create table table4 using singlestore options ('dbtable'='demoDB.table4', 'ddlEndpoint'='singlestore-ciab-for-zeppelin:3306', 'user'='root', 'password'='my_password')")
df = spark.sql("select * from table1 join table2 join table3 join table4") 

In all these examples, you can set some options globally to don’t repeat them every time. For example:

spark.conf.set("spark.datasource.singlestore.user", "root")

Here you can find a demo with PySpark examples https://github.com/memsql/singlestore-spark-connector/tree/master/demo

for me doesn’t work, like when I try to do ```
df = df1.join(df2).join(df3).join(df4)

its like spark loads all data to edgenode and compute, but when I do 
    executeStatementFromSingleStore(
      Map(
        ("database", "x"),
        (
          "query","SELECT * FROM table1 JOIN table2 JOIN table3 JOIN table4")))

this is actually run inside singlestore and its way much more faster