Using PySpark to write spark dataframe into memsql

Hi,

I have a local memsql cluster setup on my Ubuntu 18 VM with 1 masternode, 1 aggregator node and 1 leaf node.

I want to read a parquet file from s3 and save that data frame into memsql table using spark.write()

following is what I have done and its working
All I want to know if it is correct and is it okay to use this in real time.

def read_from_s3(objName):
# Reading from s3
df = spark.read.parquet(“s3n://python-s3-upload-test/”+objName)
return df

df = read_from_s3(‘sample.parquet’)

def save_to_memsql_db(df,table_name, db_name):
df.write.format(‘jdbc’).options(
url=‘jdbc:mysql://localhost:3307/’+db_name,
driver=‘com.mysql.jdbc.Driver’,
dbtable= table_name,
user=‘root’,
password=‘root’).mode(‘append’).save()

save_to_memsql_db(new_df,‘t2’,‘books’)

This code does work and it does write my DF into memsql table.
I have not added any “memsql connector”
I am using mysql connector just added the port of memsql.
Let me know if this is good to use.

Thanks & Regards.

I don’t see any reason why it’s not safe. It looks like a regular jdbc call. I’ll ask people who know more about this to take a look.

1 Like

To add to Hanson’s response, the code looks good to using the JDCB driver directly as you demonstrated. This will work, but you may not have as optimal performance as compared to using SingleStore’s Spark Connector:

There are many benefits of using the latest SingleStore Spark Connector(version 3.0) instead of a standard JDBC driver. Our connector has robust support for SQL pushdown & compression on ingest, so you have overall better performance. The connector is compatible with Spark’s DataSource API, so you can use pyspark, Scala, and Java, and it is integrated with Spark’s optimizer.

Is there a reason you don’t want to use the Spark Connector directly?

Here is some additional content (videos) showing the performance differences of using a JDBC connector vs SingleStore’s Spark Connector for ingest & machine learning models:

1 Like

Thank you so much for your reply,

I was going to use memsql spark connector, but even after adding the jar in “/home/vishwa/spark-3.0.1-bin-hadoop2.7/jars/memsql-connector_2.10-1.3.3.jar”

and using your setting :

spark.conf.set(“spark.datasource.memsql.ddlEndpoint”, “memsql-master.cluster.internal”)
spark.conf.set(“spark.datasource.memsql.dmlEndpoints”, “memsql-master.cluster.internal,memsql-child-1.cluster.internal:3307”)
spark.conf.set(“spark.datasource.memsql.user”, “admin”)
spark.conf.set(“spark.datasource.memsql.password”, “s3cur3-pa$$word”)

the following won’t work:

df = spark
.read
.format(“memsql”)
.option(“ddlEndpoint”, “memsql-master.cluster.internal”)
.option(“user”, “admin”)
.load(“foo”)

I don’t the specific error right now, I can share that with you on Monday.

Till then you can pls let me know, if I use normal JDBC call just to write data directly from S3 (parquet file) to memsql table with only type 2 CDC additional columns. is it okay to use in real time ??

also, how do I add your connector or any other connector in EMR PySpark cluster ??

I am new to this and I am still learning. I hope you will help as you did for my previous question.

Thank you so much :slight_smile:

Thank you hanson,

Please let me know if it is okay to use it in real time.
and also if I use JDBC connector to write data into huge memsql cluster, how my performance will get effected? For example if I write 100k records from parquet file to a memsql table, how much time it will take me.

And same question I asked to your colleague, If I decided to use memsql Spark connector, how do I add it in the EMR PySpark environment. or how do I add it in my local pyspark environment on Ubuntu (which I am using for learning).

If possible can u pls share the direct jar download link and spark dataframe reading configuration to use that memsql Spark connector?.

Again, Thank you for your help.

Looking forward to hearing from you again. :slight_smile:

Hello again - No problem, and thank you for your replies.

You should be able to use the JDBC call you provided to achieve, but the performance could be better if you use the connector, as mentioned above. However, it is difficult to say how much because that depends on the data and other hardware characteristics.

There are a couple of reasons why the connector version may not be working for you based on the info you provided:

  1. It appears you are using the older version of the memsql connector in your example (e.g., memsql-connector_2.10-1.3.3.jar), but you are using the newer syntax. The newer syntax won’t work for the older connector. All the artifacts for the latest connector are available here (via Maven). You can also access the Github repo here
  2. Your hostname and user/password must map to the settings you set from our example (e.g., the example we provided shows that the Master (ddlEndpoint) hostname is memsql-master.cluster.internal, that the password is s3cur3-pa$$word) Did you align your cluster details with this? Otherwise, it will not work
  3. What version of Spark are you using?
    The MemSQL 3.0.* connector versions are compatible with Spark 2.3 and 2.4. The 3.1.-* (in beta) is compatible with Spark 3. If you are using Spark Version 3, you should test our Beta version of 3.0.1. If you are using Spark 2.3 and 2.4, you should test with our production releases of the 3.0.* connector (e.g., everything from 3.0.0 to 3.0.5 here)

To add the dependency to your cluster, you technically do not need the jar file. If you use the jar file and build it, then you will need to install all dependencies manually as well:

-To add the dependency to using Pyspark, you can use the following command:

$SPARK_HOME/bin/spark-shell --jars com.memsql:memsql-spark-connector_2.11:3.0.<insert-connector-version>spark-<insert-spark-version>

For example, if you wanted to use Spark Connector version 3.0.5 w/ Spark 2.4, this would be:

$SPARK_HOME/bin/spark-shell --packages com.memsql:memsql-spark-connector_2.11:3.0.5-spark-2.4.4

Alternatively, you can use Maven or SBT to add the dependency.

If you require adding the jar file manually, you will also have to install these dependencies:

"org.mariadb.jdbc"       % "mariadb-java-client"     % "2.+",
"io.spray"               %% "spray-json"             % "1.3.5",

If you do use the jar file, you can use the similar command to add packages provided above via:

$SPARK_HOME/bin/spark-shell --jars <path_to_spark-connector>,<path_to_additional_lib1>,<path_to_additional_lib2>

Best,
Roxanna

1 Like

Hi Roxanna,

Thank you for a quick response.
I use spark version 3, so to use that with your connector I can use:
$SPARK_HOME/bin/spark-shell --jars com.memsql:memsql-spark-connector_2.11:3.0.1-spark-3.0.1
correct ?

And for the configurations to connect to memsql cluster in local VM should be as follows, right ?:

spark.conf.set(“spark.datasource.memsql.ddlEndpoint”, “localhost”)
spark.conf.set(“spark.datasource.memsql.dmlEndpoints”, “localhost:3308”)
spark.conf.set(“spark.datasource.memsql.user”, “root”)
spark.conf.set(“spark.datasource.memsql.password”, “root”)

I saw the video you shared( Fast #ETL in #Spark and #SingleStore - the SingleStore Spark Connector 3.0 - YouTube ), so as per that video I can directly add memsql spark connector from my jupyter notebook, right ?

Lets say my memsql cluster is setup in AWS VPC and I want to connect it through EMR pyspark jupyter notebook, what configurations do I need to do and can you please make a video on it ?

If I create a EMR cluster in same VPC where my memsql is installed, I should be able to access it directly through the memsql connector, right ?? This question is also from the video you shared ( Fast #ETL in #Spark and #SingleStore - the SingleStore Spark Connector 3.0 - YouTube ) as there is no prior representation of where that Jupyter Notebook is open.

I am still learning, so I hope you guys will help me again.
Thank you again and hoping to hear from you soon :slight_smile:

Best,
Vishwajeet

Hi Roxanna,

Here is the code I am using to connect to Spark 3.0.1 :

import os
os.environ[‘PYSPARK_SUBMIT_ARGS’] =‘–packages “com.memsql:memsql-spark-connector_2.11:3.0.1-spark-3.0.1” pyspark-shell’

spark.conf.set(“spark.datasource.memsql.disablePushdown”,“false”)

(Not adding code to create a dataframe)

df.write.format(“memsql”)
.option(“ddlEndPoint”,“localhost”)
.option(“user”,“root”)
.option(“password”,“root”)
.option(“database”,“test”)
.option(“truncate”,“false”)
.mode(“overwrite”)
.save(“t1”)

Error:
An error occurred while calling o48.save.
: java.lang.ClassNotFoundException: Failed to find data source: memsql. Please find packages at Third-Party Projects | Apache Spark

Can you please tell me how should I add that connector ??

For your reference here are some details:
Spark version : spark-3.0.1-bin-hadoop2.7
Os: Ubuntu 18
memsql version: 7.1.11
python 3.8.3 (anaconda)

This is my learning cluster setup.

Please do let me know what needs to be configured so I can connect using memsql spark connector :slight_smile:

Best,
Vishwajeet

Hi @vishwajeetdabholkar,
you provided wrong spark-connector package version, please try the same with this one

 --packages com.memsql:memsql-spark-connector_2.12:3.1.0-beta1-spark-3.0.0

Best Regards,
Blinov Ivan

1 Like

Hi Ivan,

Tried as you suggested:

import os
os.environ[‘PYSPARK_SUBMIT_ARGS’] =’ --packages “com.memsql:memsql-spark-connector_2.12:3.1.0-beta1-spark-3.0.0” pyspark-shell’

df11 = spark.read.format(“memsql”)
.option(“ddlEndpoint”,“localhost”)
.option(“user”,“root”)
.option(“password”,“root”)
.option(“database”,“test”)
.load(“t1”)

still getting following error:

Py4JJavaError: An error occurred while calling o43.load.
: java.lang.ClassNotFoundException: Failed to find data source: memsql. Please find packages at Third-Party Projects | Apache Spark

For your reference here are some details:
Spark version : spark-3.0.1-bin-hadoop2.7
Os: Ubuntu 18
memsql version: 7.1.11
python 3.8.3 (anaconda)

Thank you and looking forward to your reply.

Best,
Vishwajeet

Hi Vishwajeet,
not sure that

os.environ[‘PYSPARK_SUBMIT_ARGS’] =’ --packages “com.memsql:memsql-spark-connector_2.12:3.1.0-beta1-spark-3.0.0” pyspark-shell’

is the right way to provide packages. Could you please provide packages when you run spark-shell, like this:

$SPARK_HOME/bin/spark-shell --packages com.memsql:memsql-spark-connector_2.12:3.1.0-beta1-spark-3.0.0

Best,
Blinov Ivan

1 Like

Hi Ivan,
Pls take a look at following images.
The screens are from : Fast #ETL in #Spark and #SingleStore - the SingleStore Spark Connector 3.0 - YouTube

can you pls check this once and let me know ?
thanks,
vishwajeet

Vishwajeet,
will you able to have a quick call today to discuss all questions?
If so, could you please provide available time zones for you and email to send an invite?

Best,
Blinov Ivan

1 Like

Hi Ivan,

Thank you for all the help.
I am able to read data from memsql table withe the SPARK connector details you gave.
and for the show() issue, I did downgraded my java version (from java11 to java8) and now everything is working fine!!

For others who might face the same issue, I used the following configurations:

for Spark 2.4:
memsql 7
Python 3.7
Java 8

and while starting the spark terminal use:

pyspark --packages com.memsql:memsql-spark-connector_2.11:3.0.5-spark-2.4.4

Thanks,
Vishwajeet

1 Like

Hi Vishwajeet,
I was happy to help.
If you have any questions feel free to contact us.

Best Regards,
Blinov Ivan

1 Like

Hi vishwa,
I’m facing the issue while connect spark in pyhton. even if i tried the same “pyspark --packages com.memsql:memsql-spark-connector_2.11:3.0.5-spark-2.4.4” in jupyter notebook.

Pleas help me to resolve this.

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "com.memsql:memsql-spark-connector_2.11:3.0.5-spark-2.4.4" pyspark-shell'