AWS (dagster-aws)
Utilities for interfacing with AWS with Dagster.
S3
- dagster_aws.s3.S3Resource ResourceDefinition [source]
- Resource that gives access to S3. - The underlying S3 session is created by calling - boto3.session.Session(profile_name). The returned resource object is an S3 client, an instance of botocore.client.S3.- Example: - from dagster import job, op, Definitions
 from dagster_aws.s3 import S3Resource
 @op
 def example_s3_op(s3: S3Resource):
 return s3.get_client().list_objects_v2(
 Bucket='my-bucket',
 Prefix='some-key'
 )
 @job
 def example_job():
 example_s3_op()
 Definitions(
 jobs=[example_job],
 resources={'s3': S3Resource(region_name='us-west-1')}
 )
- dagster_aws.s3.S3PickleIOManager IOManagerDefinition [source]
- Persistent IO manager using S3 for storage. - Serializes objects via pickling. Suitable for objects storage for distributed executors, so long as each execution node has network connectivity and credentials for S3 and the backing bucket. - Assigns each op output to a unique filepath containing run ID, step key, and output name. Assigns each asset to a single filesystem path, at “<base_dir>/<asset_key>”. If the asset key has multiple components, the final component is used as the name of the file, and the preceding components as parent directories under the base_dir. - Subsequent materializations of an asset will overwrite previous materializations of that asset. With a base directory of “/my/base/path”, an asset with key AssetKey([“one”, “two”, “three”]) would be stored in a file called “three” in a directory with path “/my/base/path/one/two/”. - Example usage: - from dagster import asset, Definitions
 from dagster_aws.s3 import S3PickleIOManager, S3Resource
 @asset
 def asset1():
 # create df ...
 return df
 @asset
 def asset2(asset1):
 return asset1[:5]
 Definitions(
 assets=[asset1, asset2],
 resources={
 "io_manager": S3PickleIOManager(
 s3_resource=S3Resource(),
 s3_bucket="my-cool-bucket",
 s3_prefix="my-cool-prefix",
 )
 }
 )
- classdagster_aws.s3.S3ComputeLogManager [source]
- Logs compute function stdout and stderr to S3. - Users should not instantiate this class directly. Instead, use a YAML block in - dagster.yamlsuch as the following:- compute_logs:
 module: dagster_aws.s3.compute_log_manager
 class: S3ComputeLogManager
 config:
 bucket: "mycorp-dagster-compute-logs"
 local_dir: "/tmp/cool"
 prefix: "dagster-test-"
 use_ssl: true
 verify: true
 verify_cert_path: "/path/to/cert/bundle.pem"
 endpoint_url: "http://alternate-s3-host.io"
 skip_empty_files: true
 upload_interval: 30
 upload_extra_args:
 ServerSideEncryption: "AES256"
 show_url_only: false
 region: "us-west-1"- Parameters: - bucket (str) – The name of the s3 bucket to which to log.
- local_dir (Optional[str]) – Path to the local directory in which to stage logs. Default: dagster_shared.seven.get_system_temp_directory().
- prefix (Optional[str]) – Prefix for the log file keys.
- use_ssl (Optional[bool]) – Whether or not to use SSL. Default True.
- verify (Optional[bool]) – Whether or not to verify SSL certificates. Default True.
- verify_cert_path (Optional[str]) – A filename of the CA cert bundle to use. Only used if verify set to False.
- endpoint_url (Optional[str]) – Override for the S3 endpoint url.
- skip_empty_files – (Optional[bool]): Skip upload of empty log files.
- upload_interval – (Optional[int]): Interval in seconds to upload partial log files to S3. By default, will only upload when the capture is complete.
- upload_extra_args – (Optional[dict]): Extra args for S3 file upload
- show_url_only – (Optional[bool]): Only show the URL of the log file in the UI, instead of fetching and displaying the full content. Default False.
- region – (Optional[str]): The region of the S3 bucket. If not specified, will use the default region of the AWS session.
- inst_data (Optional[ConfigurableClassData]) – Serializable representation of the compute log manager when newed up from config.
 
- dagster_aws.s3.S3Coordinate DagsterType
- A - dagster.DagsterTypeintended to make it easier to pass information about files on S3 from op to op. Objects of this type should be dicts with- 'bucket'and- 'key'keys, and may be hydrated from config in the intuitive way, e.g., for an input with the name- s3_file:- inputs:
 s3_file:
 value:
 bucket: my-bucket
 key: my-key
File Manager
- classdagster_aws.s3.S3FileHandle [source]
- A reference to a file on S3. 
- dagster_aws.s3.S3FileManagerResource ResourceDefinition [source]
- Base class for Dagster resources that utilize structured config. - This class is a subclass of both - ResourceDefinitionand- Config.- Example definition: - class WriterResource(ConfigurableResource):
 prefix: str
 def output(self, text: str) -> None:
 print(f"{self.prefix}{text}")- Example usage: - @asset
 def asset_that_uses_writer(writer: WriterResource):
 writer.output("text")
 defs = Definitions(
 assets=[asset_that_uses_writer],
 resources={"writer": WriterResource(prefix="a_prefix")},
 )- You can optionally use this class to model configuration only and vend an object of a different type for use at runtime. This is useful for those who wish to have a separate object that manages configuration and a separate object at runtime. Or where you want to directly use a third-party class that you do not control. - To do this you override the create_resource methods to return a different object. - class WriterResource(ConfigurableResource):
 prefix: str
 def create_resource(self, context: InitResourceContext) -> Writer:
 # Writer is pre-existing class defined else
 return Writer(self.prefix)- Example usage: - @asset
 def use_preexisting_writer_as_resource(writer: ResourceParam[Writer]):
 writer.output("text")
 defs = Definitions(
 assets=[use_preexisting_writer_as_resource],
 resources={"writer": WriterResource(prefix="a_prefix")},
 )
ECS
- dagster_aws.ecs.EcsRunLauncher RunLauncher [source]
- RunLauncher that starts a task in ECS for each Dagster job run. - Parameters: - inst_data (Optional[ConfigurableClassData]) – If not provided, defaults to None.
- task_definition – If not provided, defaults to None.
- container_name (str) – If not provided, defaults to “run”.
- secrets (Optional[list[str]]) – If not provided, defaults to None.
- secrets_tag (str) – If not provided, defaults to “dagster”.
- env_vars (Optional[Sequence[str]]) – If not provided, defaults to None.
- include_sidecars (bool) – If not provided, defaults to False.
- use_current_ecs_task_config (bool) – If not provided, defaults to True.
- run_task_kwargs (Optional[Mapping[str, Any]]) – If not provided, defaults to None.
- run_resources (Optional[dict[str, Any]]) – If not provided, defaults to None.
- run_ecs_tags (Optional[list[dict[str, Optional[str]]]]) – If not provided, defaults to None.
- propagate_tags (Optional[dict[str, Any]]) – If not provided, defaults to None.
- task_definition_prefix (str) – If not provided, defaults to “run”.
 
- dagster_aws.ecs.ecs_executor ExecutorDefinition [source]
- Executor which launches steps as ECS tasks. - To use the ecs_executor, set it as the executor_def when defining a job: - from dagster_aws.ecs import ecs_executor
 from dagster import job, op
 @op(
 tags={"ecs/cpu": "256", "ecs/memory": "512"},
 )
 def ecs_op():
 pass
 @job(executor_def=ecs_executor)
 def ecs_job():
 ecs_op()- Then you can configure the executor with run config as follows: - execution:
 config:
 cpu: 1024
 memory: 2048
 ephemeral_storage: 10
 task_overrides:
 containerOverrides:
 - name: run
 environment:
 - name: MY_ENV_VAR
 value: "my_value"- max_concurrent limits the number of ECS tasks that will execute concurrently for one run. By default there is no limit- it will maximally parallel as allowed by the DAG. Note that this is not a global limit. - Configuration set on the ECS tasks created by the ECSRunLauncher will also be set on the tasks created by the ecs_executor. - Configuration set using tags on a @job will only apply to the run level. For configuration to apply at each step it must be set using tags for each @op. 
RDS
- dagster_aws.rds.RDSResource ResourceDefinition [source]
- A resource for interacting with the AWS RDS service. - It wraps both the AWS RDS client (https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds.html), and the AWS RDS Data client (https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds-data.html). - The AWS-RDS client ( - RDSResource.get_rds_client()) allows access to the management layer of RDS (creating, starting, configuring databases). The AWS RDS Data (- RDSResource.get_data_client) allows executing queries on the SQL databases themselves. Note that AWS RDS Data service is only available for Aurora database. For accessing data from other types of RDS databases, you should directly use the corresponding SQL client instead (e.g. Postgres/MySQL).- Example: - from dagster import Definitions, asset
 from dagster_aws.rds import RDSResource
 @asset
 def my_table(rds_resource: RDSResource):
 with rds_resource.get_rds_client() as rds_client:
 rds_client.describe_db_instances()['DBInstances']
 with rds_resource.get_data_client() as data_client:
 data_client.execute_statement(
 resourceArn="RESOURCE_ARN",
 secretArn="SECRET_ARN",
 sql="SELECT * from mytable",
 )
 Definitions(
 assets=[my_table],
 resources={
 "rds_resource": RDSResource(
 region_name="us-west-1"
 )
 }
 )
Redshift
- dagster_aws.redshift.RedshiftClientResource ResourceDefinition
- This resource enables connecting to a Redshift cluster and issuing queries against that cluster. - Example: - from dagster import Definitions, asset, EnvVar
 from dagster_aws.redshift import RedshiftClientResource
 @asset
 def example_redshift_asset(context, redshift: RedshiftClientResource):
 redshift.get_client().execute_query('SELECT 1', fetch_results=True)
 redshift_configured = RedshiftClientResource(
 host='my-redshift-cluster.us-east-1.redshift.amazonaws.com',
 port=5439,
 user='dagster',
 password=EnvVar("DAGSTER_REDSHIFT_PASSWORD"),
 database='dev',
 )
 Definitions(
 assets=[example_redshift_asset],
 resources={'redshift': redshift_configured},
 )
Testing
- dagster_aws.redshift.FakeRedshiftClientResource ResourceDefinition
- This resource enables connecting to a Redshift cluster and issuing queries against that cluster. - Example: - from dagster import Definitions, asset, EnvVar
 from dagster_aws.redshift import RedshiftClientResource
 @asset
 def example_redshift_asset(context, redshift: RedshiftClientResource):
 redshift.get_client().execute_query('SELECT 1', fetch_results=True)
 redshift_configured = RedshiftClientResource(
 host='my-redshift-cluster.us-east-1.redshift.amazonaws.com',
 port=5439,
 user='dagster',
 password=EnvVar("DAGSTER_REDSHIFT_PASSWORD"),
 database='dev',
 )
 Definitions(
 assets=[example_redshift_asset],
 resources={'redshift': redshift_configured},
 )
EMR
- dagster_aws.emr.emr_pyspark_step_launcher ResourceDefinition [source]
- supersededThis API has been superseded. While there is no plan to remove this functionality, for new projects, we recommend using Dagster Pipes. For more information, see https://docs.dagster.io/guides/build/external-pipelines. - spark_config:
- cluster_id: Name of the job flow (cluster) on which to execute.
- region_name: The AWS region that the cluster is in.
- action_on_failure: The EMR action to take when the cluster step fails: https://docs.aws.amazon.com/emr/latest/APIReference/API_StepConfig.html
- staging_bucket: S3 bucket to use for passing files between the plan process and EMR process.
- staging_prefix: S3 key prefix inside the staging_bucket to use for files passed the plan process and EMR process
- wait_for_logs: If set, the system will wait for EMR logs to appear on S3. Note that logs are copied every 5 minutes, so enabling this will add several minutes to the job runtime.
- local_job_package_path: Absolute path to the package that contains the job definition(s) whose steps will execute remotely on EMR. This is a path on the local fileystem of the process executing the job. The expectation is that this package will also be available on the python path of the launched process running the Spark step on EMR, either deployed on step launch via the deploy_local_job_package option, referenced on s3 via the s3_job_package_path option, or installed on the cluster via bootstrap actions.
- local_pipeline_package_path: (legacy) Absolute path to the package that contains the pipeline definition(s) whose steps will execute remotely on EMR. This is a path on the local fileystem of the process executing the pipeline. The expectation is that this package will also be available on the python path of the launched process running the Spark step on EMR, either deployed on step launch via the deploy_local_pipeline_package option, referenced on s3 via the s3_pipeline_package_path option, or installed on the cluster via bootstrap actions.
- deploy_local_job_package: If set, before every step run, the launcher will zip up all the code in local_job_package_path, upload it to s3, and pass it to spark-submit’s –py-files option. This gives the remote process access to up-to-date user code. If not set, the assumption is that some other mechanism is used for distributing code to the EMR cluster. If this option is set to True, s3_job_package_path should not also be set.
- deploy_local_pipeline_package: (legacy) If set, before every step run, the launcher will zip up all the code in local_job_package_path, upload it to s3, and pass it to spark-submit’s –py-files option. This gives the remote process access to up-to-date user code. If not set, the assumption is that some other mechanism is used for distributing code to the EMR cluster. If this option is set to True, s3_job_package_path should not also be set.
- s3_job_package_path: If set, this path will be passed to the –py-files option of spark-submit. This should usually be a path to a zip file. If this option is set, deploy_local_job_package should not be set to True.
- s3_pipeline_package_path: If set, this path will be passed to the –py-files option of spark-submit. This should usually be a path to a zip file. If this option is set, deploy_local_pipeline_package should not be set to True.
 
- dagster_aws.emr.EmrClusterState =<enum 'EmrClusterState'> [source]
- Cluster state for EMR. 
- dagster_aws.emr.EmrStepState =<enum 'EmrStepState'> [source]
- Step state for EMR. 
CloudWatch
- dagster_aws.cloudwatch.cloudwatch_logger LoggerDefinition [source]
- Core class for defining loggers. - Loggers are job-scoped logging handlers, which will be automatically invoked whenever dagster messages are logged from within a job. - Parameters: - logger_fn (Callable[[InitLoggerContext], logging.Logger]) – User-provided function to instantiate the logger. This logger will be automatically invoked whenever the methods on context.logare called from within job compute logic.
- config_schema (Optional[ConfigSchema]) – The schema for the config. Configuration data available in init_context.logger_config. If not set, Dagster will accept any config provided.
- description (Optional[str]) – A human-readable description of this logger.
 
- logger_fn (Callable[[InitLoggerContext], logging.Logger]) – User-provided function to instantiate the logger. This logger will be automatically invoked whenever the methods on 
SecretsManager
Resources which surface SecretsManager secrets for use in Dagster resources and jobs.
- dagster_aws.secretsmanager.SecretsManagerResource ResourceDefinition [source]
- betaThis API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases. Resource that gives access to AWS SecretsManager. The underlying SecretsManager session is created by calling boto3.session.Session(profile_name). The returned resource object is a SecretsManager client, an instance of botocore.client.SecretsManager.Example: from dagster import build_op_context, job, op
 from dagster_aws.secretsmanager import SecretsManagerResource
 @op
 def example_secretsmanager_op(secretsmanager: SecretsManagerResource):
 return secretsmanager.get_client().get_secret_value(
 SecretId='arn:aws:secretsmanager:region:aws_account_id:secret:appauthexample-AbCdEf'
 )
 @job
 def example_job():
 example_secretsmanager_op()
 Definitions(
 jobs=[example_job],
 resources={
 'secretsmanager': SecretsManagerResource(
 region_name='us-west-1'
 )
 }
 )
- dagster_aws.secretsmanager.SecretsManagerSecretsResource ResourceDefinition [source]
- betaThis API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases. Resource that provides a dict which maps selected SecretsManager secrets to their string values. Also optionally sets chosen secrets as environment variables. Example: import os
 from dagster import build_op_context, job, op, ResourceParam
 from dagster_aws.secretsmanager import SecretsManagerSecretsResource
 @op
 def example_secretsmanager_secrets_op(secrets: SecretsManagerSecretsResource):
 return secrets.fetch_secrets().get("my-secret-name")
 @op
 def example_secretsmanager_secrets_op_2(secrets: SecretsManagerSecretsResource):
 with secrets.secrets_in_environment():
 return os.getenv("my-other-secret-name")
 @job
 def example_job():
 example_secretsmanager_secrets_op()
 example_secretsmanager_secrets_op_2()
 Definitions(
 jobs=[example_job],
 resources={
 'secrets': SecretsManagerSecretsResource(
 region_name='us-west-1',
 secrets_tag="dagster",
 add_to_environment=True,
 )
 }
 )Note that your ops must also declare that they require this resource with or it will not be initialized for the execution of their compute functions. 
SSM
- dagster_aws.ssm.SSMResource ResourceDefinition [source]
- betaThis API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases. Resource that gives access to AWS Systems Manager Parameter Store. The underlying Parameter Store session is created by calling boto3.session.Session(profile_name). The returned resource object is a Systems Manager client, an instance of botocore.client.ssm.Example: from typing import Any
 from dagster import build_op_context, job, op
 from dagster_aws.ssm import SSMResource
 @op
 def example_ssm_op(ssm: SSMResource):
 return ssm.get_client().get_parameter(
 Name="a_parameter"
 )
 @job
 def example_job():
 example_ssm_op()
 Definitions(
 jobs=[example_job],
 resources={
 'ssm': SSMResource(
 region_name='us-west-1'
 )
 }
 )
- dagster_aws.ssm.ParameterStoreResource ResourceDefinition [source]
- betaThis API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases. Resource that provides a dict which maps selected SSM Parameter Store parameters to their string values. Optionally sets selected parameters as environment variables. Example: import os
 from typing import Dict
 from dagster import build_op_context, job, op
 from dagster_aws.ssm import ParameterStoreResource, ParameterStoreTag
 @op
 def example_parameter_store_op(parameter_store: ParameterStoreResource):
 return parameter_store.fetch_parameters().get("my-parameter-name")
 @op
 def example_parameter_store_op_2(parameter_store: ParameterStoreResource):
 with parameter_store.parameters_in_environment():
 return os.getenv("my-other-parameter-name")
 @job
 def example_job():
 example_parameter_store_op()
 example_parameter_store_op_2()
 defs = Definitions(
 jobs=[example_job],
 resource_defs={
 'parameter_store': ParameterStoreResource(
 region_name='us-west-1',
 parameter_tags=[ParameterStoreTag(key='my-tag-key', values=['my-tag-value'])],
 add_to_environment=True,
 with_decryption=True,
 )
 },
 )
- classdagster_aws.ssm.ParameterStoreTag [source]
- betaThis API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases. 
Pipes
Context Injectors
- classdagster_aws.pipes.PipesS3ContextInjector [source]
- A context injector that injects context by writing to a temporary S3 location. - Parameters: - bucket (str) – The S3 bucket to write to.
- client (boto3.client) – A boto3 client to use to write to S3.
- key_prefix (Optional[str]) – An optional prefix to use for the S3 key. Defaults to a random string.
 
- classdagster_aws.pipes.PipesLambdaEventContextInjector [source]
- Injects context via AWS Lambda event input. Should be paired with :py:class - ~dagster_pipes.PipesMappingParamsLoaderon the Lambda side.
Message Readers
- classdagster_aws.pipes.PipesS3MessageReader [source]
- Message reader that reads messages by periodically reading message chunks from a specified S3 bucket. - If log_readers is passed, this reader will also start the passed readers when the first message is received from the external process. - Parameters: - interval (float) – interval in seconds between attempts to download a chunk
- bucket (str) – The S3 bucket to read from.
- client (boto3.client) – A boto3 S3 client.
- log_readers (Optional[Sequence[PipesLogReader]]) – A set of log readers for logs on S3.
- include_stdio_in_messages (bool) – Whether to send stdout/stderr to Dagster via Pipes messages. Defaults to False.
 
- classdagster_aws.pipes.PipesCloudWatchMessageReader [source]
- Message reader that consumes AWS CloudWatch logs to read pipes messages. 
Clients
- classdagster_aws.pipes.PipesLambdaClient [source]
- A pipes client for invoking AWS lambda. - By default context is injected via the lambda input event and messages are parsed out of the 4k tail of logs. - Parameters: - client (boto3.client) – The boto lambda client used to call invoke.
- context_injector (Optional[PipesContextInjector]) – A context injector to use to inject context into the lambda function. Defaults to PipesLambdaEventContextInjector.
- message_reader (Optional[PipesMessageReader]) – A message reader to use to read messages from the lambda function. Defaults to PipesLambdaLogsMessageReader.
 - run [source]
- Synchronously invoke a lambda function, enriched with the pipes protocol. - Parameters: - function_name (str) – The name of the function to use.
- event (Mapping[str, Any]) – A JSON serializable object to pass as input to the lambda.
- context (Union[OpExecutionContext, AssetExecutionContext]) – The context of the currently executing Dagster op or asset.
 - Returns: Wrapper containing results reported by the external process.Return type: PipesClientCompletedInvocation 
 
- classdagster_aws.pipes.PipesGlueClient [source]
- A pipes client for invoking AWS Glue jobs. - Parameters: - context_injector (Optional[PipesContextInjector]) – A context injector to use to inject context into the Glue job, for example, PipesS3ContextInjector.
- message_reader (Optional[PipesMessageReader]) – A message reader to use to read messages from the glue job run. Defaults to PipesCloudWatchsMessageReader. When provided withPipesCloudWatchMessageReader, it will be used to recieve logs and events from the.../output/\<job-run-id>CloudWatch log stream created by AWS Glue. Note that AWS Glue routes bothstderrandstdoutfrom the main job process into this LogStream.
- client (Optional[boto3.client]) – The boto Glue client used to launch the Glue job
- forward_termination (bool) – Whether to cancel the Glue job run when the Dagster process receives a termination signal.
 - run [source]
- Start a Glue job, enriched with the pipes protocol. - See also: AWS API Documentation - Parameters: - context (Union[OpExecutionContext, AssetExecutionContext]) – The context of the currently executing Dagster op or asset.
- start_job_run_params (Dict) – Parameters for the start_job_runboto3 Glue client call.
- extras (Optional[Dict[str, Any]]) – Additional Dagster metadata to pass to the Glue job.
 - Returns: Wrapper containing results reported by the external process.Return type: PipesClientCompletedInvocation 
 
- context_injector (Optional[PipesContextInjector]) – A context injector to use to inject context into the Glue job, for example, 
- classdagster_aws.pipes.PipesECSClient [source]
- A pipes client for running AWS ECS tasks. - Parameters: - client (Any) – The boto ECS client used to launch the ECS task
- context_injector (Optional[PipesContextInjector]) – A context injector to use to inject context into the ECS task. Defaults to PipesEnvContextInjector.
- message_reader (Optional[PipesMessageReader]) – A message reader to use to read messages from the ECS task. Defaults to PipesCloudWatchMessageReader.
- forward_termination (bool) – Whether to cancel the ECS task when the Dagster process receives a termination signal.
 - run [source]
- Run ECS tasks, enriched with the pipes protocol. - Parameters: - context (Union[OpExecutionContext, AssetExecutionContext]) – The context of the currently executing Dagster op or asset.
- run_task_params (dict) – Parameters for the run_taskboto3 ECS client call. Must containtaskDefinitionkey. See Boto3 API Documentation
- extras (Optional[Dict[str, Any]]) – Additional information to pass to the Pipes session in the external process.
- pipes_container_name (Optional[str]) – If running more than one container in the task, and using PipesCloudWatchMessageReader, specify the container name which will be running Pipes.
- waiter_config (Optional[WaiterConfig]) – Optional waiter configuration to use. Defaults to 70 days (Delay: 6, MaxAttempts: 1000000).
 - Returns: Wrapper containing results reported by the external process.Return type: PipesClientCompletedInvocation 
 
- classdagster_aws.pipes.PipesEMRClient [source]
- A pipes client for running jobs on AWS EMR. - Parameters: - message_reader (Optional[PipesMessageReader]) – A message reader to use to read messages from the EMR jobs. Recommended to use PipesS3MessageReaderwith expect_s3_message_writer set to True.
- client (Optional[boto3.client]) – The boto3 EMR client used to interact with AWS EMR.
- context_injector (Optional[PipesContextInjector]) – A context injector to use to inject context into AWS EMR job. Defaults to PipesEnvContextInjector.
- forward_termination (bool) – Whether to cancel the EMR job if the Dagster process receives a termination signal.
- wait_for_s3_logs_seconds (int) – The number of seconds to wait for S3 logs to be written after execution completes.
- s3_application_logs_prefix (str) – The prefix to use when looking for application logs in S3. Defaults to containers. Another common value is steps (for non-yarn clusters).
 - run [source]
- Run a job on AWS EMR, enriched with the pipes protocol. - Starts a new EMR cluster for each invocation. - Parameters: - context (Union[OpExecutionContext, AssetExecutionContext]) – The context of the currently executing Dagster op or asset.
- run_job_flow_params (Optional[dict]) – Parameters for the run_job_flowboto3 EMR client call. See Boto3 EMR API Documentation
- extras (Optional[Dict[str, Any]]) – Additional information to pass to the Pipes session in the external process.
 - Returns: Wrapper containing results reported by the external process.Return type: PipesClientCompletedInvocation 
 
- message_reader (Optional[PipesMessageReader]) – A message reader to use to read messages from the EMR jobs. Recommended to use 
- classdagster_aws.pipes.PipesEMRContainersClient [source]
- previewThis API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use. A pipes client for running workloads on AWS EMR Containers. Parameters: - client (Optional[boto3.client]) – The boto3 AWS EMR containers client used to interact with AWS EMR Containers.
- context_injector (Optional[PipesContextInjector]) – A context injector to use to inject context into AWS EMR Containers workload. Defaults to PipesEnvContextInjector.
- message_reader (Optional[PipesMessageReader]) – A message reader to use to read messages from the AWS EMR Containers workload. It’s recommended to use PipesS3MessageReader.
- forward_termination (bool) – Whether to cancel the AWS EMR Containers workload if the Dagster process receives a termination signal.
- pipes_params_bootstrap_method (Literal["args", "env"]) – The method to use to inject parameters into the AWS EMR Containers workload. Defaults to “args”.
- waiter_config (Optional[WaiterConfig]) – Optional waiter configuration to use. Defaults to 70 days (Delay: 6, MaxAttempts: 1000000).
 - run [source]
- Run a workload on AWS EMR Containers, enriched with the pipes protocol. - Parameters: - context (Union[OpExecutionContext, AssetExecutionContext]) – The context of the currently executing Dagster op or asset.
- params (dict) – Parameters for the start_job_runboto3 AWS EMR Containers client call. See Boto3 EMR Containers API Documentation
- extras (Optional[Dict[str, Any]]) – Additional information to pass to the Pipes session in the external process.
 - Returns: Wrapper containing results reported by the external process.Return type: PipesClientCompletedInvocation 
 
- classdagster_aws.pipes.PipesEMRServerlessClient [source]
- A pipes client for running workloads on AWS EMR Serverless. - Parameters: - client (Optional[boto3.client]) – The boto3 AWS EMR Serverless client used to interact with AWS EMR Serverless.
- context_injector (Optional[PipesContextInjector]) – A context injector to use to inject context into AWS EMR Serverless workload. Defaults to PipesEnvContextInjector.
- message_reader (Optional[PipesMessageReader]) – A message reader to use to read messages from the AWS EMR Serverless workload. Defaults to PipesCloudWatchMessageReader.
- forward_termination (bool) – Whether to cancel the AWS EMR Serverless workload if the Dagster process receives a termination signal.
- poll_interval (float) – The interval in seconds to poll the AWS EMR Serverless workload for status updates. Defaults to 5 seconds.
 - run [source]
- Run a workload on AWS EMR Serverless, enriched with the pipes protocol. - Parameters: - context (Union[OpExecutionContext, AssetExecutionContext]) – The context of the currently executing Dagster op or asset.
- params (dict) – Parameters for the start_job_runboto3 AWS EMR Serverless client call. See Boto3 EMR Serverless API Documentation
- extras (Optional[Dict[str, Any]]) – Additional information to pass to the Pipes session in the external process.
 - Returns: Wrapper containing results reported by the external process.Return type: PipesClientCompletedInvocation 
 
Legacy
- dagster_aws.s3.ConfigurablePickledObjectS3IOManager IOManagerDefinition [source]
- deprecatedThis API will be removed in version 2.0. Please use S3PickleIOManager instead.. Renamed to S3PickleIOManager. See S3PickleIOManager for documentation. 
- dagster_aws.s3.s3_resource ResourceDefinition [source]
- Resource that gives access to S3. - The underlying S3 session is created by calling - boto3.session.Session(profile_name). The returned resource object is an S3 client, an instance of botocore.client.S3.- Example: - from dagster import build_op_context, job, op
 from dagster_aws.s3 import s3_resource
 @op(required_resource_keys={'s3'})
 def example_s3_op(context):
 return context.resources.s3.list_objects_v2(
 Bucket='my-bucket',
 Prefix='some-key'
 )
 @job(resource_defs={'s3': s3_resource})
 def example_job():
 example_s3_op()
 example_job.execute_in_process(
 run_config={
 'resources': {
 's3': {
 'config': {
 'region_name': 'us-west-1',
 }
 }
 }
 }
 )- Note that your ops must also declare that they require this resource with required_resource_keys, or it will not be initialized for the execution of their compute functions. - You may configure this resource as follows: - resources:
 s3:
 config:
 region_name: "us-west-1"
 # Optional[str]: Specifies a custom region for the S3 session. Default is chosen
 # through the ordinary boto credential chain.
 use_unsigned_session: false
 # Optional[bool]: Specifies whether to use an unsigned S3 session. Default: True
 endpoint_url: "http://localhost"
 # Optional[str]: Specifies a custom endpoint for the S3 session. Default is None.
 profile_name: "dev"
 # Optional[str]: Specifies a custom profile for S3 session. Default is default
 # profile as specified in ~/.aws/credentials file
 use_ssl: true
 # Optional[bool]: Whether or not to use SSL. By default, SSL is used.
 verify: None
 # Optional[str]: Whether or not to verify SSL certificates. By default SSL certificates are verified.
 # You can also specify this argument if you want to use a different CA cert bundle than the one used by botocore."
 aws_access_key_id: None
 # Optional[str]: The access key to use when creating the client.
 aws_secret_access_key: None
 # Optional[str]: The secret key to use when creating the client.
 aws_session_token: None
 # Optional[str]: The session token to use when creating the client.
- dagster_aws.s3.s3_pickle_io_manager IOManagerDefinition [source]
- Persistent IO manager using S3 for storage. - Serializes objects via pickling. Suitable for objects storage for distributed executors, so long as each execution node has network connectivity and credentials for S3 and the backing bucket. - Assigns each op output to a unique filepath containing run ID, step key, and output name. Assigns each asset to a single filesystem path, at “<base_dir>/<asset_key>”. If the asset key has multiple components, the final component is used as the name of the file, and the preceding components as parent directories under the base_dir. - Subsequent materializations of an asset will overwrite previous materializations of that asset. With a base directory of “/my/base/path”, an asset with key AssetKey([“one”, “two”, “three”]) would be stored in a file called “three” in a directory with path “/my/base/path/one/two/”. - Example usage: - Attach this IO manager to a set of assets.
from dagster import Definitions, asset
 from dagster_aws.s3 import s3_pickle_io_manager, s3_resource
 @asset
 def asset1():
 # create df ...
 return df
 @asset
 def asset2(asset1):
 return asset1[:5]
 Definitions(
 assets=[asset1, asset2],
 resources={
 "io_manager": s3_pickle_io_manager.configured(
 {"s3_bucket": "my-cool-bucket", "s3_prefix": "my-cool-prefix"}
 ),
 "s3": s3_resource,
 },
 )
- Attach this IO manager to your job to make it available to your ops.
from dagster import job
 from dagster_aws.s3 import s3_pickle_io_manager, s3_resource
 @job(
 resource_defs={
 "io_manager": s3_pickle_io_manager.configured(
 {"s3_bucket": "my-cool-bucket", "s3_prefix": "my-cool-prefix"}
 ),
 "s3": s3_resource,
 },
 )
 def my_job():
 ...
 
- Attach this IO manager to a set of assets.
- dagster_aws.s3.s3_file_manager ResourceDefinition [source]
- FileManager that provides abstract access to S3. - Implements the - FileManagerAPI.
- dagster_aws.redshift.redshift_resource ResourceDefinition
- This resource enables connecting to a Redshift cluster and issuing queries against that cluster. - Example: - from dagster import build_op_context, op
 from dagster_aws.redshift import redshift_resource
 @op(required_resource_keys={'redshift'})
 def example_redshift_op(context):
 return context.resources.redshift.execute_query('SELECT 1', fetch_results=True)
 redshift_configured = redshift_resource.configured({
 'host': 'my-redshift-cluster.us-east-1.redshift.amazonaws.com',
 'port': 5439,
 'user': 'dagster',
 'password': 'dagster',
 'database': 'dev',
 })
 context = build_op_context(resources={'redshift': redshift_configured})
 assert example_redshift_op(context) == [(1,)]
- dagster_aws.secretsmanager.secretsmanager_resource ResourceDefinition [source]
- betaThis API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases. Resource that gives access to AWS SecretsManager. The underlying SecretsManager session is created by calling boto3.session.Session(profile_name). The returned resource object is a SecretsManager client, an instance of botocore.client.SecretsManager.Example: from dagster import build_op_context, job, op
 from dagster_aws.secretsmanager import secretsmanager_resource
 @op(required_resource_keys={'secretsmanager'})
 def example_secretsmanager_op(context):
 return context.resources.secretsmanager.get_secret_value(
 SecretId='arn:aws:secretsmanager:region:aws_account_id:secret:appauthexample-AbCdEf'
 )
 @job(resource_defs={'secretsmanager': secretsmanager_resource})
 def example_job():
 example_secretsmanager_op()
 example_job.execute_in_process(
 run_config={
 'resources': {
 'secretsmanager': {
 'config': {
 'region_name': 'us-west-1',
 }
 }
 }
 }
 )You may configure this resource as follows: resources:
 secretsmanager:
 config:
 region_name: "us-west-1"
 # Optional[str]: Specifies a custom region for the SecretsManager session. Default is chosen
 # through the ordinary boto credential chain.
 profile_name: "dev"
 # Optional[str]: Specifies a custom profile for SecretsManager session. Default is default
 # profile as specified in ~/.aws/credentials file
- dagster_aws.secretsmanager.secretsmanager_secrets_resource ResourceDefinition [source]
- betaThis API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases. Resource that provides a dict which maps selected SecretsManager secrets to their string values. Also optionally sets chosen secrets as environment variables. Example: import os
 from dagster import build_op_context, job, op
 from dagster_aws.secretsmanager import secretsmanager_secrets_resource
 @op(required_resource_keys={'secrets'})
 def example_secretsmanager_secrets_op(context):
 return context.resources.secrets.get("my-secret-name")
 @op(required_resource_keys={'secrets'})
 def example_secretsmanager_secrets_op_2(context):
 return os.getenv("my-other-secret-name")
 @job(resource_defs={'secrets': secretsmanager_secrets_resource})
 def example_job():
 example_secretsmanager_secrets_op()
 example_secretsmanager_secrets_op_2()
 example_job.execute_in_process(
 run_config={
 'resources': {
 'secrets': {
 'config': {
 'region_name': 'us-west-1',
 'secrets_tag': 'dagster',
 'add_to_environment': True,
 }
 }
 }
 }
 )Note that your ops must also declare that they require this resource with required_resource_keys, or it will not be initialized for the execution of their compute functions. You may configure this resource as follows: resources:
 secretsmanager:
 config:
 region_name: "us-west-1"
 # Optional[str]: Specifies a custom region for the SecretsManager session. Default is chosen
 # through the ordinary boto credential chain.
 profile_name: "dev"
 # Optional[str]: Specifies a custom profile for SecretsManager session. Default is default
 # profile as specified in ~/.aws/credentials file
 secrets: ["arn:aws:secretsmanager:region:aws_account_id:secret:appauthexample-AbCdEf"]
 # Optional[List[str]]: Specifies a list of secret ARNs to pull from SecretsManager.
 secrets_tag: "dagster"
 # Optional[str]: Specifies a tag, all secrets which have the tag set will be pulled
 # from SecretsManager.
 add_to_environment: true
 # Optional[bool]: Whether to set the selected secrets as environment variables. Defaults
 # to false.