Data Science | AI | DataOps | Engineering
backgroundGrey.png

Blog

Data Science & Data Engineering blogs

Seeding Files With dbt

File seeding is a useful way of maintaining and deploying static, or very slowly changing, reference data for a data model to use repeatably and reliably across environments, whilst benefitting from source control.

Seeding Files with dbt

Seed files exist in the `seed` directory of dbt and can be added like any other csv to a directory.

You can then run the `dbt seed` command which will insert the data into a table in Databricks, or whatever target system you’ve got set up for dbt.

For this, I’m using our list of airports from our previous example, having exported it as a csv and deposited it in our `seeds` folder.

As mentioned earlier, `dbt seed` will load the data into Databricks, as a managed delta table – meaning that the delta files will exist in dbfs rather than your data lake. `dbt run` will also load the data into Databricks as part of the dbt execution process.

Seeded files can be referenced by other queries in the same way as if they were a preceding view or table using the `ref` syntax e.g. `{{ ref('airports') }}`, therefore our DAG can be simpler, as we no longer need to have our bronze or silver tables for this data – going straight in as gold.

SELECT 
    monotonically_increasing_id() AS AirportKey,
    AirportCode,
    City,
    State,
    Country
FROM {{ ref('airports') }}

We’re simplifying our code base and data dependencies, thereby increasing the resiliency of our pipelines.

What does dbt file seeding actually do?

I think file seeding is a great concept and practice for managing slowly changing or static reference data. However, when I started interrogating the compiled code that dbt produces for a seed file, I’m less than impressed.

I appreciate that dbt is a SQL based tool and that it has many target systems beyond Databricks, such as Big Query and Snowflake, so I expect some element of code reusability across the systems. What I wasn’t expecting was the reality of such as practice:

Every value in the seed file is inserted into the table using ugly and, arguably, inefficient code. What I also don’t like about file seeding, and dbt in general, while using it with Databricks, is that the data that is created and transformed exists in dbfs via managed delta tables rather than in an accessible data lake, which means that the only way of accessing the data is through Databricks. If you are an organization which has many analytical tools for analytics and data science use cases – using a centralized serving and query layer isn’t always the most optimal approach.

File Seeding Without dbt

We’ve ascertained that I’m not a fan of how dbt does the file seeding, but I am a fan of file seeding in general. How do we go about seeding files without dbt?

The answer: Databricks Notebooks and the Databricks API.

The Notebook

The Notebook that will be used will read in the data from your seeded file and save it as a managed or unmanaged delta table. The benefit of using a notebook, is that you can use widgets which allow you to pass through parameters into the notebook to make it more flexible and dynamic.

We’re passing through fileName, tableName, schemaName (equivalent to database) and an optional writePath parameters through to the notebook, so that it knows which file to read and where to write it to. If a writePath is provided, the table moves from being a managed delta table, to an unmanaged delta table. The difference between the two is described at Databricks.

dbutils.widgets.text("fileName", "", "File Name")
dbutils.widgets.text("tableName", "", "Table Name")
dbutils.widgets.text("schemaName", "Enriched", "Schema Name")
dbutils.widgets.text("writePath", "", "Write Path (optional)")
# Setup variables to hold parameter inputs
fileName = dbutils.widgets.get("fileName")
tableName = dbutils.widgets.get("tableName")
schemaName = dbutils.widgets.get("schemaName")
writePath = dbutils.widgets.get("writePath")
# COMMAND ----------
df = (spark.read.option("header", True).option("delimiter", ",").csv(f'dbfs:/data/{fileName}'))
spark.sql(f"CREATE DATABASE IF NOT EXISTS {schemaName}")
if not writePath:
  df.write.mode("overwrite").saveAsTable(f'{schemaName}.{tableName}')
else:
  df.write.mode("overwrite").option("path", writePath).saveAsTable(f'{schemaName}.{tableName}')

The API

The Databricks API is, in my opinion, an under-used feature of Databricks. The API allows for automation of Databricks workspaces, jobs and other elements. We’ll be using it for uploading our seed file, running our notebook and checking the status of the notebook job.

The API scripts require a profiles.yml file, similar to what it is like in dbt where you can specify your different connections and endpoints.

For example, we have our connection to our Databricks workspace and a separate one for our SQL Warehouse.

In the code repository, this file will be missing – so you’ll need to create it yourself.

We’ll be using lots of Python to call the API but, as it is an API, you can use any language you want to seed the files yourself.

We’ve got a function to store the notebook parameters, another to run the notebook, another to check the run status of the notebook, as well as the general script that reads from the profiles, calls the functions and uploads the file to Databricks.

def notebook_parameters(file_name, table_name, schema_name, write_path):
    return {
        "fileName": file_name,
        "tableName": table_name,
        "schemaName": schema_name,
        "writePath": write_path,
    }
 def run_notebook(rest_url, token, cluster_id, notebook_path, notebook_params):
    # Create a new run
    jsonData = {
            'existing_cluster_id':cluster_id, 
            'notebook_task': {
                'notebook_path': notebook_path,
                'base_parameters': notebook_params
            }
        }
    print(jsonData)
    response = requests.post(rest_url, headers={'Authorization': 'Bearer %s' % token }, json=jsonData) # noqa E501
    print("response:",response)
    if response.status_code == 200:
        print(response.status_code)
        jsonResponse = response.json()
        print(jsonResponse['run_id'])
         return jsonResponse['run_id']
    else:
        raise Exception(response.text)
 def check_job_status(rest_url, token, run_id):
    # Check the status of a run
    run_identifier = int(run_id)
    response = requests.get(rest_url, headers={'Authorization': 'Bearer %s' % token }, json = {'run_id' : run_identifier})
     jsonResponse = response.json()
     life_cycle_state = jsonResponse['state']['life_cycle_state']
    # result_state = jsonResponse['state']['result_state']
     return life_cycle_state #, result_state
 with open('profiles.yml') as f:
    data = load(f, Loader=loader.SafeLoader)
    host = data['databricks']['outputs']['dev']['host']
    token = data['databricks']['outputs']['dev']['token']
    cluster_id = data['engineering']['outputs']['dev']['cluster_id']
 base_url = f'https://{host}/api/2.0/dbfs/put'
 file_name = "airports.csv"
 file_location = (
    "../seeds/"
    + file_name
)

 We are uploading to dbfs as a staging area, but we’re then reading from that staged file to create our delta table.

f = open(file_location, "rb")
files = {"content": (file_location, f)}
response = requests.post(
    base_url,
    files=files,
    headers={'Authorization': 'Bearer %s' % token },
    data={
        "path": f"/data/{file_name}",
        "overwrite": "true",
    },
)
if response.status_code == 200:
    print(response.status_code)
else:
    raise Exception(response.text)
 notebook_parameters = register_seed_as_delta.notebook_parameters(file_name, "Airports", "Enriched", "")
 print(notebook_parameters)
 dbrks_jobs_url = f"https://{host}/api/2.1/jobs/runs/submit"
 notebook_path = "/process_seed"
 submit_notebook = register_seed_as_delta.run_notebook(dbrks_jobs_url, token, cluster_id, notebook_path, notebook_parameters)
 dbrks_check_job_status_url = f"https://{host}/api/2.1/jobs/runs/get"
 life_cycle_state = "PENDING"
# result_state= ""
while life_cycle_state != "TERMINATED":
    # life_cycle_state, result_state = check_job_status.check_job_status(dbrks_check_job_status_url, token, submit_notebook)
    life_cycle_state = check_job_status.check_job_status(dbrks_check_job_status_url, token, submit_notebook)

 Summary

We’ve explored the concepts of file seeding, as well as how easy it is use file seeding within dbt. We’ve had a look at the limitations of dbt, namely the compiled code and the fact it uses managed tables, as well as an alternative to dbt for file seeding.

Hope you’ve enjoyed reading and following along – the full code for this post can be found in GitHub.

Ust OldfieldComment