Databricks as a target
Databricks is a cloud-native SQL analytics engine that provides high-performance, scalable analytics for data lakes, data warehouses, and other big data use cases. In this document, we will discuss how to use Databricks SQL as a target in Data Integration.
Prerequisites
Unity catalog
Databricks Unity Catalog is a feature of the Databricks Unified Data Analytics Platform that allows organizations to easily manage, discover, and collaborate on their data assets. By offering consolidated access control, tracking, lineage, and data exploration functions throughout Databricks workspaces, Unity Catalog simplifies the management of data assets. In this section, we will cover how to work with DataBricks Unity Catalog.
Catalog prerequisites
- A Premium plan or a higher tier for your Databricks account.
- You must be able to set up IAM roles, IAM policies, and S3 buckets in AWS.
- Create your first metastore and attach a Workspace.
- Enable the Unity Catalog toggle in your SQL Warehouse.
Create your first metastore and attach a workspace
In order to use the Unity Catalog, it is necessary to first create a metastore. A metastore is a centralized repository where metadata is stored and managed. This metadata includes information such as data types, file formats, and other characteristics that help users locate and use the data they need.
After creating the metastore, the next step is to link it to a workspace. Connecting the metastore to a workspace enables you to conveniently manage and access your data using the Unity catalog.
To create a Metastore and link it to a Workspace, follow the instructions below:
- Access the Databricks account console and log in.
- Navigate to the Catalog icon and click on it.
- Select the option Create metastore.
- Provide a name for the Metastore and specify the Region where it will be deployed. It is recommended to place the workspaces, metastore, and cloud storage location in the same cloud region for optimal performance.
- Enter the S3 bucket path for the storage bucket and IAM role name.
you can exclude "s3://" when entering the path.
- Click the Create to complete the process.
- Once prompted, select the workspaces you want to associate with the metastore and click Assign.
Enable the unity catalog toggle in your SQL warehouse
- Log in to your Databricks workspace.
- Locate the SQL Warehouse and click it.
- Identify the exact warehouse you are working with and proceed to click on the 3 dots icon, followed by selecting the Edit option.
- In the Advanced options section, enable the Unity Catalog toggle.
How to work with unity catalog
- Your access to catalogs within the current Metastore is limited to either all catalogs if you are a Metastore admin, or only the catalogs where you have ownership or USAGE privilege.
- In case the Catalog name is not specified, Data Integration will automatically load into the default catalog of the Hive_Metastore.
- Ensure that all prerequisites have been met.
- Make sure you have a valid Databricks connection in Data Integration.
- Input the name of the Catalog.
Schemas and tables
Databricks is a data warehousing platform that relies on a hierarchy of schemas and tables. The conventional approach to working with Databricks involves creating tables beforehand and then populating them with data. This process can be time-consuming and may require advanced technical skills.
However, Data Integration eliminates the need for this upfront table creation process. Data Integration will automatically create the necessary tables if they don't already exist and populate them with the extracted data. Additionally, Data Integration handles all the required changes and updates to your tables automatically. This simplifies your data integration process and help you to focus on extracting insights from your data.
Create a new schema
It is advisable to set up a dedicated Schema for Data Integration on Databricks. In order to do so, proceed with the following steps:
- Go to the Databricks workspace console, click on Queries, and then select Create Query.
- In the screen designated for query, execute the following SQL command by pasting and running it.
CREATE SCHEMA IF NOT EXISTS `data_integration`
COMMENT '{conKeyRefs.DataIntegration} Tables Schema';
Loading modes
Data Integration provides 3 types of loading modes in Databricks as a target.
Store in a custom location
Data Integration provides an option for storing external tables in DBFS (Databricks File System) or external location.
DBFS
DBFS (Databricks File System) is a distributed file system that is optimized for working with workloads, and provides a scalable and reliable way to store and manage data. With DBFS, users can store data in a variety of file formats, including Parquet, CSV, JSON, and more, and access it directly using SparkSQL commands.
External tables are a powerful feature in Databricks that enable users to work with data stored in a variety of locations and file formats. By providing support for DBFS, it makes it easy to manage and manipulate large datasets, while providing the flexibility and scalability that data workloads require.
To specify a DBFS prefix, users can check the "Store in a Custom Location" box and define the DBFS parameter.
The location will be set automatically under the abfss://<some_path>/<some_sub_path> path.
External location
In addition to DBFS, Data Integration also provides an option for storing in external tables. External tables are a useful feature in Databricks that allow users to create and manage tables. The external tables are stored in external locations, outside of Databricks' file system, and can be accessed and manipulated directly. This provides greater flexibility and scalability, as it allows users to work with data stored in a variety of file formats and external sources.
Creating an external table is straightforward. Users can specify an external location prefix when creating a Delta table, by checking the "Store in a Custom Location" box and setting the External Location parameter.
The location will be set automatically under the <storage>://<bucket_name>/<some_path> path.
This makes it easy to work with data stored in external locations, such as Amazon S3 or Azure Blob Storage, without having to move it into Databricks' file system.
Creating the target database if not exists
Data Integration checks and makes sure the database on the process exists in Databricks workspace. Therefore, if the database doesn't exist, Data Integration creates the database for you.
Your personal access token should include a sufficient permissions to create any new database in the system. If it isn't, it may fail the source to target process. You can ask your databricks account manager or admin to provid you a token that have a sufficient CREATE DATABASE permissions in Databricks.
Default database name If no database provided on the process, the default database will be default.
Staging table loading
- Creating a table with tmp_ prefix with the correct metadata.
- Loading the data to the tmp_ table using COPY command. If the source data format is csv - creates the tmp_ using the fields in the mapping (typed-converted). If json - create a one-columed table with
stringdata type. - Flattening the data (if json type in the source) using Json Extract Functions and casting with
cast([column] as [dataType]) as [alias]syntax. - Auto-scaling Integer columns to the right Integer data type based on their data length (SMALLINT->INTEGER->BIGINT->NUMERIC).
- Dropping the old tmp_ table with the json_data field (if exists).
Load from STG table to target table
In this stage - all of the following steps are performed in one transaction (Based on Postgres ability to run DDL and DML inside one transaction):
- Defining and updating any metadata changes on the target table, excluding dropping columns.
- If the target table does not exist - create it using CREATE AS SELECT clause.
- If append loading mode - clone the target table aside, and Add/Drop/Alter columns in the clone table.
- Load the data to the target table using the loading mode (as described above).
- Use DEEP CLONE to clone the tmp_ table into target table.
- Drop the tmp_ tables in the process.
The first step in the loading process is creating the target Schema if it doesn't already exist. Data Integration checks and ensures that the Schema is present in the Databricks workspace. If the Schema is not found, Data Integration creates the Schema for the user.
The personal access token provided to the user should have sufficient permissions to create new Schemas in the system; otherwise, the source to target process may fail. The user should contact their Databricks account manager or admin to provide them with a token that has CREATE SCHEMA permissions in Databricks. If no Schema is provided in the process, the default Schema name is "default".
The next step is staging table loading, which involves creating a table with the "tmp_" prefix and the correct metadata. Data Integration loads the data to the "tmp_" table using the COPY command. If the source data format is CSV, the "tmp_" table is created using the fields in the mapping, which are typed-converted. If the source data format is JSON, a one-columned table with string data type is created.
Data Integration then flattens the data using JSON Extract Functions (if the source data format is JSON) and casts it with cast([column] as [dataType]) as [alias] syntax.
Data Integration also auto-scales integer columns to the right integer data type based on their data length (SMALLINT->INTEGER->BIGINT->NUMERIC). After that, Data Integration drops the old "tmp_" table with the json_data field (if it exists).
In the last stage, Data Integration loads data from the staging table to the target table. All the following steps are performed in one transaction based on Postgres' ability to run DDL and DML inside one transaction: Data Integration defines and updates any metadata changes on the target table, excluding dropping columns. If the target table does not exist, Data Integration creates it using the CREATE AS SELECT clause. If the append loading mode is selected, Data Integration clones the target table and adds/drops/alters columns in the clone table. Data Integration then loads the data to the target table using the loading mode. Finally, Data Integration uses DEEP CLONE to clone the tmp_ table into the target table and drops the tmp_ tables in the process.