Reading and writing data from and to Azure Synapse SQL (dedicated SQL pool) using native connectors
In this recipe, you will learn how to read and write data to Azure Synapse Analytics using Azure Databricks.
Azure Synapse Analytics is a data warehouse hosted in the cloud that leverages massively parallel processing (MPP) to run complex queries across large volumes of data.
Azure Synapse can be accessed from Databricks using the Azure Synapse connector. DataFrames can be directly loaded as a table in a Synapse Spark pool. Azure Blob storage is used as temporary storage to upload data between Azure Databricks and Azure Synapse with the Azure Synapse connector.
Getting ready
You will need to ensure you have the following items before starting to work on this recipe:
- An Azure Synapse Analytics workspace. You can follow the steps mentioned in the following link to create Synapse Workspace and SQL Pool.
https://docs.microsoft.com/en-in/azure/synapse-analytics/get-started-create-workspace
- We are using Azure Databrick Runtime 7.3 LTS with Spark Cluster 3.0.1
- A database master key for Azure Synapse Dedicated SQL Pool. Run the following command in the SQL Pool to create the Master Key.
CREATE MASTER KEY
- Account Name and Storage keys for Azure Storage Blob —get these by following the steps mentioned in the Mounting ADLS Gen-2 and Azure Blob storage to Azure DBFS recipe of this chapter
- Service principal details—get these by following the steps mentioned in the Mounting ADLS Gen-2 and Azure Blob storage to Azure DBFS recipe of this chapter
- A mounted ADLS Gen-2 storage account—get this by following the steps mentioned in the Mounting ADLS Gen-2 and Azure Blob storage to Azure DBFS recipe of this chapter
- Use the same Customer
csvFiles
that was used in the Reading and writing data from and to Azure Blob recipe. - Get the JDBC connection string of Synapse Workspace. Go to the Dedicated SQL Pool that you have created in the Synapse Workspace and in the Overview tab click on Show database connection strings. Under the Connection strings tab, click on JDBC and copy the connection string under JDBC (SQL authentication)
You can follow the steps by running the steps in the 2_5.Reading and Writing Data from and to Azure Synapse.ipynb
notebook in your local cloned repository in the Chapter02
folder.
Upload the csvFiles
folder in the Chapter02/Customer
folder to the ADLS Gen2 account in the rawdata
file system. We have mounted the rawdata
container as /mnt/Gen2Source
.
How to do it…
In this section, you will see the steps for reading data from an ADLS Gen2 storage account and writing data to Azure Synapse Analytics using the Azure Synapse connector:
- You need to mount the storage location for accessing the data files from the storage account. You can find detailed steps on how to mount ADLS Gen-2 in the Mounting ADLS Gen-2 and Azure Blob Storage to Azure Databricks File System recipe. Run the following to mount ADLS Gen-2 Storage Account.
storageAccount="teststoragegen2" mountpoint = "/mnt/Gen2Source" storageEndPoint ="abfss://rawdata@{}.dfs.core.windows.net/".format(storageAccount) print ('Mount Point ='+mountpoint) #ClientId, TenantId and Secret is for the Application(ADLSGen2App) was have created as part of this recipe clientID ="xxx-xxx-xx-xxx" tenantID ="xx-xxx-xxx-xxx" clientSecret ="xx-xxx-xxx-xxx" oauth2Endpoint = "https://login.microsoftonline.com/{}/oauth2/token".format(tenantID) configs = {"fs.azure.account.auth.type": "OAuth", "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", "fs.azure.account.oauth2.client.id": clientID, "fs.azure.account.oauth2.client.secret": clientSecret, "fs.azure.account.oauth2.client.endpoint": oauth2Endpoint} dbutils.fs.mount( source = storageEndPoint, mount_point = mountpoint, extra_configs = configs)
- Copy and run the following code to see the
csv
files in the ADLS Gen-2 account:display(dbutils.fs.ls("/mnt/Gen2Source/Customer/csvFiles "))
- Provide the Azure Storage account configuration like Storage Account name and Storage Key. This is used by both the Azure Databricks cluster and Azure Synapse Dedicated SQL Pool to access a common Blob Storage Account for exchanging data between them. Azure Synapse connector triggers the Spark job in Azure Databricks cluster to read and write data from and to the common Blob Storage Account.
blobStorage = "stgcccookbook.blob.core.windows.net" blobContainer = "synapse" blobAccessKey = "xxxxxxxxxxxxxxxxxxxxxxxxx"
- The following code will be used for specifying an intermediate temporary folder required for moving data between Azure Databricks and Azure Synapse:
tempDir = "wasbs://" + blobContainer + "@" + blobStorage +"/tempDirs"
- Execute the following code to store the Azure Blob storage access keys in the Notebook session configuration. This configuration will not impact other Notebooks attached to the same cluster.
acntInfo = "fs.azure.account.key."+ blobStorage spark.conf.set( acntInfo, blobAccessKey)
- Run the following code to load the DataFrame from the
csv
files in the ADLS Gen-2 Storage Account:customerDF = spark.read.format("csv").option("header",True).option("inferSchema", True).load("dbfs:/mnt/Gen2Source/Customer/csvFiles")
- Run the following command to store the JDBC URL for the Synapse Dedicated SQL Pool in a variable.
# We have changed trustServerCertificate=true from trustServerCertificate=false. In certain cases, you might get errors like ''' The driver could not establish a secure connection to SQL Server by using Secure Sockets Layer (SSL) encryption. Error: "Failed to validate the server name in a certificate during Secure Sockets Layer (SSL) initialization.". ClientConnectionId:sadasd-asds-asdasd [ErrorCode = 0] [SQLState = 08S ''' sqlDwUrl="jdbc:sqlserver://synapsetestdemoworkspace.sql.azuresynapse.net:1433;database=sqldwpool1;user=sqladminuser@synapsetestdemoworkspace;password=xxxxxxx;encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.sql.azuresynapse.net;loginTimeout=30;" db_table = "dbo.customer"
- Execute the following code to load the
customerDF
DataFrame as a table into Azure Synapse Dedicated SQL Pool. This creates a table namedCustomerTable
in the SQL database. You can query the table using SQL Server Management Studio (SSMS) or from Synapse Studio. This is the default save mode where when writing a DataFrame to Dedicated SQL Pool, if table already exists then an exception is thrown else it will create a table and populated the data in the table:customerDF.write \ .format("com.databricks.spark.sqldw")\ .option("url", sqlDwUrl)\ .option("forwardSparkAzureStorageCredentials", "true")\ .option("dbTable", db_table)\ .option("tempDir", tempDir)\ .save()
- Connect to the SQL database using SSMS, and you can query data from the
CustomerTable
table: - Run the following code which will append the data to an existing
dbo.Customer
table.# This code is writing to data into SQL Pool with append save option. In append save option data is appended to existing table. customerDF.write \ .format("com.databricks.spark.sqldw")\ .option("url", sqlDwUrl)\ .option("forwardSparkAzureStorageCredentials", "true")\ .option("dbTable", db_table)\ .option("tempDir", tempDir)\ .mode("append")\ .save()
- Run the following code which will overwrite the data in an existing
dbo.Customer
table.customerDF.write \ .format("com.databricks.spark.sqldw")\ .option("url", sqlDwUrl)\ .option("forwardSparkAzureStorageCredentials", "true")\ .option("dbTable", db_table)\ .option("tempDir", tempDir)\ .mode("overwrite")\ .save()
- Run the following code to read data from Azure Synapse Dedicated SQL Pool using an Azure Synapse connector:
customerTabledf = spark.read \ .format("com.databricks.spark.sqldw") \ .option("url", sqlDwUrl) \ .option("tempDir", tempDir) \ .option("forwardSparkAzureStorageCredentials", "true") \ .option("dbTable", db_table) \ .load()
- You can see the result in the DataFrame by using the following code:
customerTabledf.show()
- Instead of table you can use query to read data from Dedicated SQL Pool. You can see from the following code that we have used
option("query",query)
to run a query on the database.query= " select C_MKTSEGMENT, count(*) as Cnt from [dbo].[customer] group by C_MKTSEGMENT" df_query = spark.read \ .format("com.databricks.spark.sqldw") \ .option("url", sqlDwUrl) \ .option("tempDir", tempDir) \ .option("forwardSparkAzureStorageCredentials", "true") \ .option("query", query) \ .load()
- You can execute the following code to display the results.
display(df_query.limit(5))
By the end of this recipe, you have learnt how to read and write data from and to Synapse Dedicated SQL Pool using the Azure Synapse Connector from Azure Databricks.
How it works…
In Azure Synapse, data loading and unloading operations are performed by PolyBase and are triggered by the Azure Synapse connector through JDBC.
For Databricks Runtime 7.0 and above, the Azure Synapse connector through JDBC uses COPY
to load data into Azure Synapse.
The Azure Synapse connector triggers Apache Spark jobs to read and write data to the Blob storage container.
We are using spark.conf.set
(acntInfo, blobAccessKey) so that Spark connects to the Storage Blob container using the built-in connectors. We can use ADLS Gen-2 as well to the store the data read from Synapse or data written to Synapse Dedicated SQL Pool.
Spark connects to Synapse using the JDBC drivers with username and password. Azure Synapse connects to Azure Storage Blob account using account key and it can use Managed Service Identity as well to connect.
Following is the JDBC connection string used in the Notebook. In this case we have changed trustServerCertificate value to true
from false
(which is default value) and by doing so, Microsoft JDBC Driver for SQL Server won't validate the SQL Server TLS certificate. This change must be done only in test environments and not in production.
sqlDwUrl="jdbc:sqlserver://synapsetestdemoworkspace.sql.azuresynapse.net:1433;database=sqldwpool1;user=sqladminuser@synapsetestdemoworkspace;password=xxxxxxx;encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.sql.azuresynapse.net;loginTimeout=30;"
If you are not creating the Master Key in the Synapse Dedicated SQL Pool you will be getting the following error while reading the data from the Dedicated SQL Pool from Azure Databricks Notebook.
Underlying SQLException(s): - com.microsoft.sqlserver.jdbc.SQLServerException: Please create a master key in the database or open the master key in the session before performing this operation. [ErrorCode = 15581] [SQLState = S0006]
When you are writing the data to Dedicated SQL Pools you will see the following messages while code is getting executed n the Notebook.
Waiting for Azure Synapse Analytics to load intermediate data from wasbs://[email protected]/tempDirs/2021-08-07/04-19-43-514/cb6dd9a2-da6b-4748-a34d-0961d2df981f/ into "dbo"."customer" using COPY.
Spark is writing the csv
files to the common Blob Storage as parquet files and then Synapse uses COPY
statement to load the parquet files to the final tables. You can check in Blob Storage Account, and you will find the parquet files created. When you look at the DAG for the execution (you will learn more about DAG in upcoming recipes) you will find that Spark is reading the CSV files and writing to Blob Storage as parquet format.
When you are reading data from Synapse Dedicated SQL Pool tables you will see the following messages in the Notebook while the code is getting executed.
Waiting for Azure Synapse Analytics to unload intermediate data under wasbs://[email protected]/tempDirs/2021-08-07/04-24-56-781/esds43c1-bc9e-4dc4-bc07-c4368d20c467/ using Polybase
Azure Synapse is writing the data from Dedicated SQL Pool to common Blob storage as parquet files first with snappy compression and then its is read by Spark and displayed to the end user when customerTabledf.show()
is executed.
We are setting up the account key and secret for the common Blob Storage in the session configuration associated with the Notebook using spark.conf.set
and setting the value forwardSparkAzureStorageCredentials as true
. By setting the value as true
, Azure Synapse connector will forward the Storage Account access key to the Azure Synapse Dedicated Pool by creating a temporary Azure Database Scoped Credentials.