Dagster & Sling (Pythonic)
If you are just getting started with the Sling integration, we recommend using the new Sling component.
Sling provides an easy-to-use YAML configuration layer for loading data from files, replicating data between databases, exporting custom SQL queries to cloud storage, and much more.
How it works
The Dagster integration allows you to derive Dagster assets from a replication configuration file. The typical pattern for building an ELT pipeline with Sling has three steps:
- 
Define a Sling replication.yamlfile that specifies the source and target connections, as well as which streams to sync from.
- 
Create a SlingResourceand pass a list ofSlingConnectionResourcefor each connection to theconnectionparameter, ensuring the resource uses the same name given to the connection in the Sling configuration.
- 
Use the @dagster_sling.sling_assetsdecorator to define an asset that runs the Sling replication job and yields from theSlingResourcemethod to run the sync.
We'll walk you through each of these steps in this guide.
Prerequisites
To follow the steps in this guide:
- 
Familiarize yourself with Sling's replication configuration, if you've never worked with Sling before. The replication configuration is a YAML file that specifies the source and target connections, as well as which streams to sync from. The dagster-slingintegration uses this configuration to build assets for both sources and destinations.
- 
To install the following libraries: - uv
- pip
 uv add dagster-slingpip install dagster-slingRefer to the Dagster installation guide for more info. 
Step 1: Set up a Sling replication configuration
Dagster's Sling integration is built around Sling's replication configuration. You may provide either a path to an existing replication.yaml file or construct a dictionary that represents the configuration in Python. This configuration is passed to the Sling CLI to run the replication job.
- replication.yaml
- Python
replication.yaml
This example creates a replication configuration in a replication.yaml file:
# replication.yaml
source: MY_POSTGRES
target: MY_SNOWFLAKE
defaults:
  mode: full-refresh
  object: '{stream_schema}_{stream_table}'
streams:
  public.accounts:
  public.users:
  public.finance_departments:
    object: 'departments'
Python
This example creates a replication configuration using Python:
replication_config = {
    "source": "MY_POSTGRES",
    "target": "MY_DUCKDB",
    "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
    "streams": {
        "public.accounts": None,
        "public.users": None,
        "public.finance_departments": {"object": "departments"},
    },
}
Step 2: Create a Sling resource
Next, you'll create a SlingResource object that contains references to the connections specified in the replication configuration:
# pyright: reportCallIssue=none
from dagster_sling import SlingConnectionResource, SlingResource
from dagster import EnvVar
sling_resource = SlingResource(
    connections=[
        # Using a connection string from an environment variable
        SlingConnectionResource(
            name="MY_POSTGRES",
            type="postgres",
            connection_string=EnvVar("POSTGRES_CONNECTION_STRING"),
        ),
        # Using a hard-coded connection string
        SlingConnectionResource(
            name="MY_DUCKDB",
            type="duckdb",
            connection_string="duckdb:///var/tmp/duckdb.db",
        ),
        # Using a keyword-argument constructor
        SlingConnectionResource(
            name="MY_SNOWFLAKE",
            type="snowflake",
            host=EnvVar("SNOWFLAKE_HOST"),
            user=EnvVar("SNOWFLAKE_USER"),
            role="REPORTING",
        ),
    ]
)
A SlingResource takes a connections parameter, where each SlingConnectionResource represents a connection to a source or target database. You may provide as many connections to the SlingResource as needed.
The name parameter in the SlingConnectionResource should match the source and target keys in the replication configuration.
You can pass a connection string or arbitrary keyword arguments to the SlingConnectionResource to specify the connection details. Refer to Sling's connections reference for the specific connection types and parameters.
Step 3: Define the Sling assets
Next, define a Sling asset using the @dagster_sling.sling_assets decorator. Dagster will read the replication configuration to produce assets.
Each stream will render two assets, one for the source stream and one for the target destination. You can override how assets are named by passing in a custom DagsterSlingTranslator object.
from dagster_sling import SlingResource, sling_assets
from dagster import Definitions, file_relative_path
replication_config = file_relative_path(__file__, "../sling_replication.yaml")
sling_resource = SlingResource(connections=[...])  # Add connections here
@sling_assets(replication_config=replication_config)
def my_assets(context, sling: SlingResource):
    yield from sling.replicate(context=context)
    for row in sling.stream_raw_logs():
        context.log.info(row)
Step 4: Create the Definitions object
The last step is to include the Sling assets and resource in a Definitions object. This enables Dagster tools to load everything we've defined:
defs = Definitions(
    assets=[
        my_assets,
    ],
    resources={
        "sling": sling_resource,
    },
)
That's it! You should now be able to view your assets in the Dagster UI and run the replication job.
Examples
Example 1: Database to database
To set up a Sling sync between two databases, such as Postgres and Snowflake, you could do something like the following:
# pyright: reportCallIssue=none
# pyright: reportOptionalMemberAccess=none
from dagster_sling import SlingConnectionResource, SlingResource, sling_assets
from dagster import EnvVar
source = SlingConnectionResource(
    name="MY_PG",
    type="postgres",
    host="localhost",
    port=5432,
    database="my_database",
    user="my_user",
    password=EnvVar("PG_PASS"),
)
target = SlingConnectionResource(
    name="MY_SF",
    type="snowflake",
    host="hostname.snowflake",
    user="username",
    database="database",
    password=EnvVar("SF_PASSWORD"),
    role="role",
)
sling = SlingResource(
    connections=[
        source,
        target,
    ]
)
replication_config = {
    "SOURCE": "MY_PG",
    "TARGET": "MY_SF",
    "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
    "streams": {
        "public.accounts": None,
        "public.users": None,
        "public.finance_departments": {"object": "departments"},
    },
}
@sling_assets(replication_config=replication_config)
def my_assets(context, sling: SlingResource):
    yield from sling.replicate(context=context)
Example 2: File to database
To set up a Sling sync between a file in an object store and a database, such as from Amazon S3 to Snowflake, you could do something like the following:
from dagster_sling import SlingConnectionResource, SlingResource, sling_assets
from dagster import EnvVar
target = SlingConnectionResource(
    name="MY_SF",
    type="snowflake",
    host="hostname.snowflake",
    user="username",
    database="database",
    password=EnvVar("SF_PASSWORD"),
    role="role",
)
source = SlingConnectionResource(
    name="MY_S3",
    type="s3",
    bucket="sling-bucket",
    access_key_id=EnvVar("AWS_ACCESS_KEY_ID"),
    secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"),
)
sling = SlingResource(connections=[source, target])
replication_config = {
    "SOURCE": "MY_S3",
    "TARGET": "MY_SF",
    "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
    "streams": {
        "s3://my-bucket/my_file.parquet": {
            "object": "marts.my_table",
            "primary_key": "id",
        },
    },
}
@sling_assets(replication_config=replication_config)
def my_assets(context, sling: SlingResource):
    yield from sling.replicate(context=context)
Advanced usage
Creating Sling jobs and schedules
Once you have your Sling assets, you can define a job to materialize them.
my_sling_assets_job = dg.define_asset_job(
    name="my_sling_assets_job",
    selection=[my_sling_assets],
)
defs = dg.Definitions(
    assets=[my_sling_assets],
    jobs=[my_sling_assets_job],
    resources={
        "sling": SlingResource(
            connections=[
                source,
                target,
            ]
        )
    },
)
Also, jobs created for your Sling assets can be scheduled.
my_sling_assets_schedule = dg.ScheduleDefinition(
    job=my_sling_assets_job,
    cron_schedule="0 0 * * *",  # Runs at midnight daily
)
defs = dg.Definitions(
    assets=[my_sling_assets],
    jobs=[my_sling_assets_job],
    schedules=[my_sling_assets_schedule],
    resources={
        "sling": SlingResource(
            connections=[
                source,
                target,
            ]
        )
    },
)
Customize upstream dependencies
By default, Dagster sets upstream dependencies when generating asset specs for your Sling assets. To do so, Dagster parses information about assets that are upstream of specific Sling assets from the Sling replication configuration itself. You can customize how upstream dependencies are set on your Sling assets by passing an instance of the custom DagsterSlingTranslator to the sling_assets decorator.
class CustomDagsterSlingTranslator(DagsterSlingTranslator):
    def get_asset_spec(self, stream_definition: Mapping[str, Any]) -> dg.AssetSpec:
        """Overrides asset spec to override upstream asset key to be a single source asset."""
        # We create the default asset spec using super()
        default_spec = super().get_asset_spec(stream_definition)
        # We set an upstream dependency for our assets
        return default_spec.replace_attributes(
            deps=[dg.AssetKey("common_upstream_sling_dependency")],
        )
@sling_assets(
    replication_config=replication_config,
    dagster_sling_translator=CustomDagsterSlingTranslator(),
)
def my_sling_assets(context, sling: SlingResource):
    yield from sling.replicate(context=context)
Note that super() is called in each of the overridden methods to generate the default asset spec. It is best practice to generate the default asset spec before customizing it.
Define downstream dependencies
Dagster allows you to define assets that are downstream of specific Sling streams using their asset keys. The asset key for a Sling stream can be retrieved using the DagsterSlingTranslator. The below example defines my_downstream_asset as a downstream dependency of my_sling_stream:
from dagster_sling.asset_decorator import get_streams_from_replication
@sling_assets(
    replication_config=replication_config,
)
def my_sling_assets(context, sling: SlingResource):
    yield from sling.replicate(context=context)
my_sling_stream_asset_key = next(
    iter(
        [
            DagsterSlingTranslator().get_asset_spec(stream_definition=stream)
            for stream in get_streams_from_replication(replication_config)
            if stream["name"] == "my_sling_stream"
        ]
    )
)
@dg.asset(deps=[my_sling_stream_asset_key])
def my_downstream_asset(): ...
In the downstream asset, you may want direct access to the contents of the Sling asset. To do so, you can customize the code within your @asset-decorated function to load upstream data.
APIs in this guide
| Name | Description | 
|---|---|
| @dagster_sling.sling_assets | The core Sling asset factory for building syncs | 
| SlingResource | The Sling resource used for handing credentials to databases and object stores | 
| DagsterSlingTranslator | A translator for specifying how to map between Sling and Dagster types | 
| SlingConnectionResource | A Sling connection resource for specifying database and storage connection credentials | 
About Sling
Sling provides an easy-to-use YAML configuration layer for loading data from files, replicating data between databases, exporting custom SQL queries to cloud storage, and much more.
Key Features
- 
Data Movement: Transfer data between different storage systems and databases efficiently 
- 
Flexible Connectivity: Support for numerous databases, data warehouses, and file storage systems 
- 
Transformation Capabilities: Built-in data transformation features during transfer 
- 
Multiple Operation Modes: Support for various replication modes including full-refresh, incremental, and snapshot 
- 
Production-Ready: Deployable with monitoring, scheduling, and error handling