The community-supported Teradata package provides an integration with Teradata Vantage.
For more information, see the dagster-teradata GitHub repository.
dagster-teradata with Teradata Vantage
To begin integrating Dagster with Teradata Vantage for building and managing ETL pipelines, this guide provides step-by-step instructions on installing and configuring the required packages, setting up a Dagster project, and implementing a pipeline that interacts with Teradata Vantage.
Prerequisites
- 
Access to a Teradata Vantage instance. noteIf you need a test instance of Vantage, you can provision one for free at https://clearscape.teradata.com 
- 
Python 3.9 or higher, Python 3.13 is recommended. 
- 
pip 
Install dagster-teradata
With your virtual environment active, the next step is to install dagster and the Teradata provider package (dagster-teradata) to interact with Teradata Vantage.
- 
Install the Required Packages: - uv
- pip
 uv add dagster-teradatapip install dagster-teradata
- 
Note about Optional Dependencies: a) dagster-teradatarelies on dagster-aws for ingesting data from an S3 bucket into Teradata Vantage. Sincedagster-awsis an optional dependency, users can install it by running:- uv
- pip
 uv add dagster-teradata[aws]pip install dagster-teradata[aws]b) dagster-teradataalso relies ondagster-azurefor ingesting data from an Azure Blob Storage container into Teradata Vantage. To install this dependency, run:- uv
- pip
 uv add dagster-teradata[azure]pip install dagster-teradata[azure]
- 
Verify the Installation: To confirm that Dagster is correctly installed, run: dagster –versionIf installed correctly, it should show the version of Dagster. 
Initialize a Dagster Project
Now that you have the necessary packages installed, the next step is to create a new Dagster project.
Scaffold a New Dagster Project
Run the following command:
dagster project scaffold --name dagster-quickstart
This command will create a new project named dagster-quickstart. It will automatically generate the following directory structure:
dagster-quickstart
│   pyproject.toml
│   README.md
│   setup.cfg
│   setup.py
│
├───dagster_quickstart
│       assets.py
│       definitions.py
│       __init__.py
│
└───dagster_quickstart_tests
        test_assets.py
        __init__.py
Refer here to know more above this directory structure.
Create Sample Data
To simulate an ETL pipeline, create a CSV file with sample data that your pipeline will process.
Create the CSV File: Inside the dagster_quickstart/data/ directory, create a file named sample_data.csv with the following content:
id,name,age,city
1,Alice,28,New York
2,Bob,35,San Francisco
3,Charlie,42,Chicago
4,Diana,31,Los Angeles
This file represents sample data that will be used as input for your ETL pipeline.
Define Assets for the ETL Pipeline
Now, we'll define a series of assets for the ETL pipeline inside the assets.py file.
Edit the assets.py File: Open the dagster_quickstart/assets.py file and add the following code to define the pipeline:
import pandas as pd
from dagster import asset
@asset(required_resource_keys={"teradata"})
def read_csv_file(context):
    df = pd.read_csv("dagster_quickstart/data/sample_data.csv")
    context.log.info(df)
    return df
@asset(required_resource_keys={"teradata"})
def drop_table(context):
    result = context.resources.teradata.drop_table(["tmp_table"])
    context.log.info(result)
@asset(required_resource_keys={"teradata"})
def create_table(context, drop_table):
    result = context.resources.teradata.execute_query('''CREATE TABLE tmp_table (
                                                            id INTEGER,
                                                            name VARCHAR(50),
                                                            age INTEGER,
                                                            city VARCHAR(50));''')
    context.log.info(result)
@asset(required_resource_keys={"teradata"}, deps=[read_csv_file])
def insert_rows(context, create_table, read_csv_file):
    data_tuples = [tuple(row) for row in read_csv_file.to_numpy()]
    for row in data_tuples:
        result = context.resources.teradata.execute_query(
            f"INSERT INTO tmp_table (id, name, age, city) VALUES ({row[0]}, '{row[1]}', {row[2]}, '{row[3]}');"
        )
        context.log.info(result)
@asset(required_resource_keys={"teradata"})
def read_table(context, insert_rows):
    result = context.resources.teradata.execute_query("select * from tmp_table;", True)
    context.log.info(result)
This Dagster pipeline defines a series of assets that interact with Teradata. It starts by reading data from a CSV file, then drops and recreates a table in Teradata. After that, it inserts rows from the CSV into the table and finally retrieves the data from the table.
Define the Pipeline Definitions
The next step is to configure the pipeline by defining the necessary resources and jobs.
Edit the definitions.py File: Open dagster_quickstart/definitions.py and define your Dagster pipeline as follows:
from dagster import EnvVar, Definitions
from dagster_teradata import TeradataResource
from .assets import read_csv_file, read_table, create_table, drop_table, insert_rows
# Define the pipeline and resources
defs = Definitions(
    assets=[read_csv_file, read_table, create_table, drop_table, insert_rows],
    resources={
        "teradata": TeradataResource(
            host=EnvVar("TERADATA_HOST"),
            user=EnvVar("TERADATA_USER"),
            password=EnvVar("TERADATA_PASSWORD"),
            database=EnvVar("TERADATA_DATABASE"),
        )
    }
)
This code sets up a Dagster project that interacts with Teradata by defining assets and resources
- It imports necessary modules, including pandas, Dagster, and dagster-teradata.
- It imports asset functions (read_csv_file, read_table, create_table, drop_table, insert_rows) from the assets.py module.
- It registers these assets with Dagster using Definitions, allowing Dagster to track and execute them.
- It defines a Teradata resource (TeradataResource) that reads database connection details from environment variables (TERADATA_HOST, TERADATA_USER, TERADATA_PASSWORD, TERADATA_DATABASE).
Running the Pipeline
After setting up the project, you can now run your Dagster pipeline:
- 
Start the Dagster Dev Server: In your terminal, navigate to the root directory of your project and run: dagster dev After executing the command dagster dev, the Dagster logs will be displayed directly in the terminal. Any errors encountered during startup will also be logged here. Once you see a message similar to: 2025-02-04 09:15:46 +0530 - dagster-webserver - INFO - Serving dagster-webserver on http://127.0.0.1:3000 in process 32564,It indicates that the Dagster webserver is running successfully. At this point, you can proceed to the next step. 
 
- 
Access the Dagster UI: Open a web browser and navigate to http://127.0.0.1:3000. This will open the Dagster UI where you can manage and monitor your pipelines. 
 
- 
Run the Pipeline: 
- In the top navigation of the Dagster UI, click Assets > View global asset lineage.
- Click Materialize to execute the pipeline.
- In the popup window, click View to see the details of the pipeline run.
 
- Monitor the Run: The Dagster UI allows you to visualize the pipeline's progress, view logs, and inspect the status of each step. You can switch between different views to see the execution logs and metadata for each asset.
Below are some of the operations provided by the TeradataResource:
1. Execute a Query (execute_query)
This operation executes a SQL query within Teradata Vantage.
Args:
- sql(str) – The query to be executed.
- fetch_results(bool, optional) – If True, fetch the query results. Defaults to False.
- single_result_row(bool, optional) – If True, return only the first row of the result set. Effective only if- fetch_resultsis True. Defaults to False.
2. Execute Multiple Queries (execute_queries)
This operation executes a series of SQL queries within Teradata Vantage.
Args:
- sql_queries(Sequence[str]) – List of queries to be executed in series.
- fetch_results(bool, optional) – If True, fetch the query results. Defaults to False.
- single_result_row(bool, optional) – If True, return only the first row of the result set. Effective only if- fetch_resultsis True. Defaults to False.
3. Drop a Database (drop_database)
This operation drops one or more databases from Teradata Vantage.
Args:
- databases(Union[str, Sequence[str]]) – Database name or list of database names to drop.
4. Drop a Table (drop_table)
This operation drops one or more tables from Teradata Vantage.
Args:
- tables(Union[str, Sequence[str]]) – Table name or list of table names to drop.
Data Transfer from AWS S3 to Teradata Vantage Using dagster-teradata:
import os
from dagster import job, op, Definitions, EnvVar, DagsterError
from dagster_aws.s3 import S3Resource, s3_resource
from dagster_teradata import TeradataResource, teradata_resource
s3_resource = S3Resource(
    aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
    aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
    aws_session_token=os.getenv("AWS_SESSION_TOKEN"),
)
td_resource = TeradataResource(
    host=os.getenv("TERADATA_HOST"),
    user=os.getenv("TERADATA_USER"),
    password=os.getenv("TERADATA_PASSWORD"),
    database=os.getenv("TERADATA_DATABASE"),
)
@op(required_resource_keys={"teradata"})
def drop_existing_table(context):
     context.resources.teradata.drop_table("people")
     return "Tables Dropped"
@op(required_resource_keys={"teradata", "s3"})
def ingest_s3_to_teradata(context, status):
    if status == "Tables Dropped":
        context.resources.teradata.s3_to_teradata(s3_resource, os.getenv("AWS_S3_LOCATION"), "people")
    else:
        raise DagsterError("Tables not dropped")
@job(resource_defs={"teradata": td_resource, "s3": s3_resource})
def example_job():
     ingest_s3_to_teradata(drop_existing_table())
defs = Definitions(
    jobs=[example_job]
)
The s3_to_teradata method is used to load data from an S3 bucket into a Teradata table. It leverages Teradata Vantage Native Object Store (NOS), which allows direct querying and loading of external object store data (like AWS S3) into Teradata tables.
Arguments Supported by s3_blob_to_teradata
- 
s3 (S3Resource): The S3Resourceobject used to interact with the S3 bucket.
- 
s3_source_key (str): The URI specifying the location of the S3 bucket. The URI format is: /s3/YOUR-BUCKET.s3.amazonaws.com/YOUR-BUCKET-NAMEFor more details, refer to: Teradata Documentation - Native Object Store
- 
teradata_table (str): 
 The name of the Teradata table to which the data will be loaded.
- 
public_bucket (bool): 
 Indicates whether the provided S3 bucket is public. IfTrue, the objects within the bucket can be accessed via a URL without authentication. IfFalse, the bucket is considered private, and authentication must be provided.
 Defaults toFalse.
- 
teradata_authorization_name (str): 
 The name of the Teradata Authorization Database Object, which controls access to the S3 object store.
 For more details, refer to: Teradata Vantage Native Object Store - Setting Up Access
Data Transfer from Azure Blob to Teradata Vantage Using dagster-teradata:
import os
from dagster import job, op, Definitions, EnvVar, DagsterError
from dagster_azure.adls2 import ADLS2Resource, ADLS2SASToken
from dagster_teradata import TeradataResource, teradata_resource
azure_resource = ADLS2Resource(
    storage_account="",
    credential=ADLS2SASToken(token=""),
)
td_resource = TeradataResource(
    host=os.getenv("TERADATA_HOST"),
    user=os.getenv("TERADATA_USER"),
    password=os.getenv("TERADATA_PASSWORD"),
    database=os.getenv("TERADATA_DATABASE"),
)
@op(required_resource_keys={"teradata"})
def drop_existing_table(context):
     context.resources.teradata.drop_table("people")
     return "Tables Dropped"
@op(required_resource_keys={"teradata", "azure"})
def ingest_azure_to_teradata(context, status):
    if status == "Tables Dropped":
        context.resources.teradata.azure_blob_to_teradata(azure_resource, "/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/", "people", True)
    else:
        raise DagsterError("Tables not dropped")
@job(resource_defs={"teradata": td_resource, "azure": azure_resource})
def example_job():
     ingest_azure_to_teradata(drop_existing_table())
defs = Definitions(
    jobs=[example_job]
)
The azure_blob_to_teradata method is used to load data from Azure Data Lake Storage (ADLS) into a Teradata table. This method leverages Teradata Vantage Native Object Store (NOS) to directly query and load external object store data (such as Azure Blob Storage) into Teradata.
Arguments Supported by azure_blob_to_teradata
- 
azure (ADLS2Resource): 
 TheADLS2Resourceobject used to interact with the Azure Blob Storage.
- 
blob_source_key (str): 
 The URI specifying the location of the Azure Blob object. The format is:/az/YOUR-STORAGE-ACCOUNT.blob.core.windows.net/YOUR-CONTAINER/YOUR-BLOB-LOCATION
 For more details, refer to the Teradata documentation:
 Teradata Documentation - Native Object Store
- 
teradata_table (str): 
 The name of the Teradata table where the data will be loaded.
- 
public_bucket (bool, optional): 
 Indicates whether the Azure Blob container is public. IfTrue, the objects in the container can be accessed without authentication.
 Defaults toFalse.
- 
teradata_authorization_name (str, optional): 
 The name of the Teradata Authorization Database Object used to control access to the Azure Blob object store. This is required for secure access to private containers.
 Defaults to an empty string.
 For more details, refer to the documentation:
 Teradata Vantage Native Object Store - Setting Up Access
Transfer data from Private Blob Storage Container to Teradata instance
To successfully transfer data from a Private Blob Storage Container to a Teradata instance, the following prerequisites are necessary.
- 
An Azure account. You can start with a free account. 
- 
Create an Azure storage account 
- 
Create a blob container under Azure storage account 
- 
Upload CSV/JSON/Parquest format files to blob container 
- 
Create a Teradata Authorization object with the Azure Blob Storage Account and the Account Secret Key CREATE AUTHORIZATION azure_authorization USER 'azuretestquickstart' PASSWORD 'AZURE_BLOB_ACCOUNT_SECRET_KEY'noteReplace AZURE_BLOB_ACCOUNT_SECRET_KEYwith Azure storage accountazuretestquickstartaccess key
Manage VantageCloud Lake Compute Clusters with dagster-teradata:
from dagster import Definitions, DagsterError, op, materialize, job
from dagster_dbt import DbtCliResource
from dagster_teradata import teradata_resource, TeradataResource
from .assets import jaffle_shop_dbt_assets
from .project import jaffle_shop_project
from .schedules import schedules
@op(required_resource_keys={"teradata"})
def create_compute_cluster(context):
    context.resources.teradata.create_teradata_compute_cluster(
        "ShippingCG01",
        "Shipping",
        "STANDARD",
        "TD_COMPUTE_MEDIUM",
        "MIN_COMPUTE_COUNT(1) MAX_COMPUTE_COUNT(1) INITIALLY_SUSPENDED('FALSE')",
    )
    return "Compute Cluster Created"
@op(required_resource_keys={"teradata", "dbt"})
def run_dbt(context, status):
    if status == "Compute Cluster Created":
        materialize(
            [jaffle_shop_dbt_assets],
            resources={
                "dbt": DbtCliResource(project_dir=jaffle_shop_project)
            }
        )
        return "DBT Run Completed"
    else:
        raise DagsterError("DBT Run Failed")
@op(required_resource_keys={"teradata"})
def drop_compute_cluster(context, status):
    if status == "DBT Run Completed":
        context.resources.teradata.drop_teradata_compute_cluster("ShippingCG01", "Shipping", True)
    else:
        raise DagsterError("DBT Run Failed")
@job(resource_defs={"teradata": teradata_resource, "dbt": DbtCliResource})
def example_job():
    drop_compute_cluster(run_dbt(create_compute_cluster()))
defs = Definitions(
    assets=[jaffle_shop_dbt_assets],
    jobs=[example_job],
    schedules=schedules,
    resources={
        "dbt": DbtCliResource(project_dir=jaffle_shop_project),
        "teradata": TeradataResource(),
    },
)
Teradata VantageCloud Lake provides robust compute cluster management capabilities, enabling users to dynamically allocate, suspend, resume, and delete compute resources. These operations are fully supported through dagster-teradata, allowing users to manage compute clusters directly within their Dagster pipelines. This integration ensures optimal performance, scalability, and cost efficiency. The following operations facilitate seamless compute cluster management within Dagster:
1. Create a Compute Cluster (create_teradata_compute_cluster)
This operation provisions a new compute cluster within Teradata VantageCloud Lake using dagster-teradata. It enables users to define the cluster's configuration, including compute profiles, resource allocation, and query execution strategies, directly within a Dagster job.
Args:
- compute_profile_name(str) – Specifies the name of the compute profile.
- compute_group_name(str) – Identifies the compute group to which the profile belongs.
- query_strategy(str, optional, default="STANDARD") – Defines the method used by the Teradata Optimizer to execute SQL queries efficiently. Acceptable values:- STANDARD– The default strategy at the database level, optimized for general query execution.
- ANALYTIC– Optimized for complex analytical workloads.
 
- compute_map(Optional[str], default=None) – Maps compute resources to specific nodes within the cluster.
- compute_attribute(Optional[str], default=None) – Specifies additional configuration attributes for the compute profile, such as:- MIN_COMPUTE_COUNT(1) MAX_COMPUTE_COUNT(5) INITIALLY_SUSPENDED('FALSE')
 
- timeout(int, optional, default=constants.CC_OPR_TIME_OUT) – The maximum duration (in seconds) to wait for the cluster creation process to complete. Default: 20 minutes.
2. Suspend a Compute Cluster (suspend_teradata_compute_cluster)
This operation temporarily suspends a compute cluster within Teradata VantageCloud Lake using dagster-teradata, reducing resource consumption while retaining the compute profile for future use.
Args:
- compute_profile_name(str) – Specifies the name of the compute profile.
- compute_group_name(str) – Identifies the compute group associated with the profile.
- timeout(int, optional, default=constants.CC_OPR_TIME_OUT) – The maximum wait time for the suspension process to complete. Default: 20 minutes.
3. Resume a Compute Cluster (resume_teradata_compute_cluster)
This operation restores a previously suspended compute cluster using dagster-teradata, allowing workloads to resume execution within a Dagster pipeline.
Args:
- compute_profile_name(str) – Specifies the name of the compute profile.
- compute_group_name(str) – Identifies the compute group associated with the profile.
- timeout(int, optional, default=constants.CC_OPR_TIME_OUT) – The maximum wait time for the resumption process to complete. Default: 20 minutes.
4. Delete a Compute Cluster (drop_teradata_compute_cluster)
This operation removes a compute cluster from Teradata VantageCloud Lake using dagster-teradata, with an option to delete the associated compute group. You can run this operation directly from your Dagster workflow.
Args:
- compute_profile_name(str) – Specifies the name of the compute profile.
- compute_group_name(str) – Identifies the compute group associated with the profile.
- delete_compute_group(bool, optional, default=False) – Determines whether the compute group should be deleted:- True– Deletes the compute group.
- False– Retains the compute group without modifications.
 
These operations are designed to be fully integrated into dagster-teradata for managing compute clusters in Teradata VantageCloud Lake. By utilizing these operations within Dagster jobs, users can optimize resource allocation, perform complex transformations, and automate compute cluster management to align with workload demands.
Further reading
- dagster-teradata with Teradata Vantage
- Data Transfer from AWS S3 to Teradata Vantage Using dagster-teradata
- Data Transfer from Azure Blob to Teradata Vantage Using dagster-teradata
- Manage VantageCloud Lake Compute Clusters with dagster-teradata
- Teradata Authorization
- Teradata VantageCloud Lake Compute Clusters