Skip to content

API Documentation

Dagger

Bases: Service

The Dagger class to create the workflow engine.

__create_app()

Initializes instance of Faust

Returns:

Type Description
faust.App

Instance of Faust

__init__(*, broker, datadir=None, store=StoreEnum.AEROSPIKE.value, application_name='dagger', package_name='dagger', kafka_partitions=1, task_update_topic=None, tasks_topic='dagger_task_topic', bootstrap_topic='dagger_bootstrap', beacon=None, kafka_broker_list=None, loop=None, tracing_sensor=None, datadog_sensor=None, trigger_interval=60, max_tasks_per_trigger=2000, restart_tasks_on_boot=True, aerospike_config=None, enable_changelog=True, max_correletable_keys_in_values=15000, schema_registry_url=None, message_serializer=None, delete_workflow_on_complete=False, task_update_callbacks=[], logging_config=None, **kwargs)

Initialize an instance of Dagger.

Parameters:

Name Type Description Default
broker str

Kafka broker address i.e. kafka://0.0.0.0:9092. Defaults to None.

required
datadir str

Directory where db data will reside. Defaults to None.

None
store str

DB to use. Defaults to "rocksdb://".

StoreEnum.AEROSPIKE.value
application_name str

Name of application. Defaults to "dagger".

'dagger'
package_name str

Name of package. Defaults to "dagger".

'dagger'
kafka_partitions int

Number of Kafka partitions. Defaults to 1.

1
task_update_topic Optional[str]

Name of topic where tasks that have updated in status will be sent. Defaults to "task_update_topic".

None
tasks_topic str

Name of topic where new tasks will be sent for execution. Defaults to "dagger_task_topic".

'dagger_task_topic'
bootstrap_topic str

Name of topic where tasks after restart will be sent for execution. Defaults to "dagger_task_topic".

'dagger_bootstrap'
beacon NodeT

Beacon used to track services in a dependency graph. Defaults to None.

None
loop asyncio.AbstractEventLoop

Asyncio event loop to attach to. Defaults to None.

None
tracing_sensor Sensor

Tracing Sensor to use for OpenTelemetry. The global tracer has to be initialized in the client. Defaults to None

None
datadog_sensor DatadogMonitor

datadog statsD sensor

None
aerospike_config AerospikeConfig

Config for Aerospike if enabled

None
enable_changelog bool

Flag to enable/disable events on the table changelog topic

True
max_correletable_keys_in_values int

maximum number of ids in the value part to chunk

15000
schema_registry_url str

the schema registry URK

None
message_serializer MessageSerializer

the message serializer instance using the schema registry

None
delete_workflow_on_complete bool

deletes the workflow instance when complete

False
task_update_callbacks List[Callable[[ITemplateDAGInstance], Awaitable[None]]]

callbacks when a workflow instance is updated

[]
logging_config

the logging config to use

None
**kwargs Any

Other Faust keyword arguments.

{}

__post_init__()

This method is called after initialization of dagger

add_topic(topic_name, topic)

Associate a topic instance with a name.

Parameters:

Name Type Description Default
topic_name str

Name of topic.

required
topic Topic

Instance of Topic.

required

chunk_and_store_correlatable_tasks(cor_instance, value, workflow_id) async

This method chunks the value of the key to the list so that we don't overflow the limit of the value size on the datastore used by dagger defined by max_correletable_keys_in_values

Parameters:

Name Type Description Default
cor_instance CorreletableKeyTasks

the CorreletableKeyTasks instance

required
value UUID

the new value to add to the lookup_keys

required
workflow_id UUID

the id of the workflow to which the cor_instance belongs to

required

create_topic(topic_name, key_type=None, value_type=None, key_serializer=None, value_serializer=None) classmethod

Create a Kafka topic using Faust

Parameters:

Name Type Description Default
topic_name str

Name of topic

required
key_type Optional[ModelArg]

Key type for the topic. Defaults to str.

None
value_type Optional[ModelArg]

Value type for the topic. Defaults to str.

None

Returns:

Type Description
TopicT

Instance of Faust Topic

get_correletable_key_instances(cor_instance) async

Gathers all the chunked values of the cor_instance

Parameters:

Name Type Description Default
cor_instance CorreletableKeyTasks

the CorreletableKeyTasks to gather

required

Returns:

Type Description
List[CorreletableKeyTasks]

A list of all the chunked CorreletableKeyTasks

get_db_options()

Get the DB options on the dagger store

Returns:

Type Description
Mapping[str, Any]

the DB options

get_instance(id, log=True) async

Get an instance of an ITask given it's id.

Parameters:

Name Type Description Default
id UUID

Id of the ITask.

required
log bool

suppress logging if set to True

True

Returns:

Type Description
ITask

Instance of the ITask.

get_monitoring_task(task, workflow_instance) async

Returns the monitoring task associated with this task

Parameters:

Name Type Description Default
task ITask

the task to check for

required

Returns:

Type Description
Optional[MonitoringTask]

the monitoring task instance or None

get_template(template_name)

Get the instance of a template that contains the workflow definition given it's name.

Parameters:

Name Type Description Default
template_name str

Name of template

required

Returns:

Type Description
ITemplateDAG

Instance of Template that contains the workflow definition

Raises:

Type Description
TemplateDoesNotExist

Template does not exist

get_topic(topic_name)

Get a topic based on the associated name.

Parameters:

Name Type Description Default
topic_name str

Name of topic.

required

Returns:

Type Description
TopicT

the instance of the topic

main(override_logging=False)

Main method that initializes the Dagger worker. This method is blocking.

register_process_template(process_template_name) classmethod

Registers a process template in Dagger.

Parameters:

Name Type Description Default
(str) process_template_name

Name of process to register in dagger.

required

register_template(template_name) classmethod

this method is used to register a workflow definition with Dagger. Refer to the examples in the documentation

Parameters:

Name Type Description Default
template_name str

Name of workflow to register

required

remove_task_from_correletable_keys_table(task, workflow_instance) async

Removes a task from the correletable keys table

Parameters:

Name Type Description Default
task ITask

the task to remove from the correletable keys table

required
workflow_instance ITemplateDAGInstance

the workflow instance

required

submit(task, *, repartition=True) async

Submits a workflow instance for execution.

Parameters:

Name Type Description Default
task ITask

The workflow instance.

required
repartition bool

if True it uses the the repartitioning key to submit the task for execution on the configured kafka topic. If false, it creates the workflow on the same node and submits it for execution

True

update_correletable_key_for_task(task_instance, key=None, new_task=False, workflow_instance=None) async

Updates the correletable key for a SensorTask within the datastore.

Parameters:

Name Type Description Default
task_instance ITask

the SensorTask for which the correletable key is updated

required
key str

the correletable key name to update.

None
new_task bool

If the task is not new, then the entire runtime paramters and subsequent tasks need to be updated

False
workflow_instance ITemplateDAGInstance

the workflow instance

None

ITemplateDAGBuilder

Base class to define the structure of workflow definition

__init__(app)

Parameters:

Name Type Description Default
app Service

The Dagger app instance

required

build() abstractmethod

Builds the ITemplateDAGBuilder object.

Returns:

Type Description
ITemplateDAG

The instance of ITemplateDAG

set_name(name) abstractmethod

Sets the name of the template.

Parameters:

Name Type Description Default
name str

Name of the template.

required

Returns:

Type Description
ITemplateDAGBuilder

An instance of the updated template builder.

set_root(template) abstractmethod

Sets the first process to execute in the template.

Parameters:

Name Type Description Default
template IProcessTemplateDAG

Instance of a process template containing the defintion of the process.

required

Returns:

Type Description
ITemplateDAGBuilder

An instance of the updated template builder.

set_type(template_type) abstractmethod

Sets the type of the process.

Parameters:

Name Type Description Default
template_type Type[ITemplateDAGInstance]

The type of the process.

required

Returns:

Type Description
ITemplateDAGBuilder

An instance of the updated template builder.

IProcessTemplateDAGBuilder

Skeleton builder class used to build a process definition within a workfow.

__init__(app)

Parameters:

Name Type Description Default
app Service

The Dagger instance

required

build() abstractmethod

Builds the IProcessTemplateDAG object

Returns:

Type Description
IProcessTemplateDAG

the instance of IProcessTemplateDAG to create the workflow definition

set_name(process_name) abstractmethod

Sets the name of the process.

Parameters:

Name Type Description Default
process_name str

Name of the process.

required

Returns:

Type Description
IProcessTemplateDAGBuilder

An instance of the updated process template builder.

set_next_process(task) abstractmethod

Set the next process in the execution of the workflow defintion.

Parameters:

Name Type Description Default
task IProcessTemplateDAG

TaskTemplate to be set as the next process.

required

Returns:

Type Description
IProcessTemplateDAGBuilder

An instance of the updated process template builder.

set_root_task(task) abstractmethod

Set the first task in the process definition of the workflow.

Parameters:

Name Type Description Default
task TaskTemplate

TaskTemplate to be set as the first task of the process execution.

required

Returns:

Type Description
IProcessTemplateDAGBuilder

An instance of the updated process template builder.

set_type(process_type) abstractmethod

Sets the type of the process.

Parameters:

Name Type Description Default
process_type Type[ITask]

The type of the process.

required

Returns:

Type Description
IProcessTemplateDAGBuilder

An instance of the updated process template builder.

TaskTemplateBuilder

Skeleton builder class used to build the definition of a task object within a workflow

__init__(app)

Parameters:

Name Type Description Default
app Service

The Dagger object

required

build() abstractmethod

Builds the TaskTemplate object

Returns:

Type Description
TaskTemplate

The TaskTemplate instance to add to the ProcessBuilder definition

set_allow_skip_to(allow_skip_to)

Set whether or not this task is allowed to be executed out of order (skipped to)

Parameters:

Name Type Description Default
allow_skip_to bool

If set to true a Sensor task can be executed if an event is received out of order

required

Returns:

Type Description
TaskTemplateBuilder

An instance of the updated template builder.

set_name(name) abstractmethod

Set the name of task

Parameters:

Name Type Description Default
name str

the name of the Task

required

Returns:

Type Description
TaskTemplateBuilder

An instance of the updated template builder.

set_next(task_template)

Set the next task in the process definition within the workflow

Parameters:

Name Type Description Default
task_template TaskTemplate

the next task template definition

required

Returns:

Type Description
TaskTemplateBuilder

An instance of the updated template builder.

set_type(task_type) abstractmethod

Sets the type of task

Parameters:

Name Type Description Default
task_type Type[ITask]

the type of the task to instantiate from the builder

required

Returns:

Type Description
TaskTemplateBuilder

An instance of the updated template builder.

ITemplateDAG

Skeleton class defining the attributes and functions of a template instance.

__init__(dag, name, app, template_type)

ITemplateDAG Constructor

Parameters:

Name Type Description Default
dag IProcessTemplateDAG

The definition of the first process to execute

required
name str

the name of the Workflow

required
app Service

the Dagger instance

required
template_type Type[ITemplateDAGInstance]

the type of the Workflow to instantiate

required

create_instance(id, partition_key_lookup, *, repartition=True, seed=None, **kwargs) abstractmethod async

Method for creating an instance of a workflow definition

Parameters:

Name Type Description Default
id UUID

The id of the workflow

required
partition_key_lookup str

Kafka topic partition key associated with the instance of the workflow. The key needs to be defined within the runtime parameters

required
repartition bool

Flag indicating if the creation of this instance needs to be stored on the current node or by the owner of the partition defined by the partition_key_lookup

True
seed random.Random

the seed to use to create all internal instances of the workflow

None
**kwargs

Other keyword arguments

{}

Returns:

Type Description
ITemplateDAGInstance

An instance of the workflow

set_dynamic_builders_for_process_template(name, process_template_builders) abstractmethod

Use these builders only when the processes of the workflow definition need to be determined at runtime. Using these the processes in the workflow definition can be defined at runtime based on the runtime parameters of the workflow

Parameters:

Name Type Description Default
name str

Then name of the dynamic process builder

required
process_template_builders

the ProcessTemplateDagBuilder dynamic process builders

required

IProcessTemplateDAG

Skeleton class defining the attributes and functions of a process instance.

__init__(next_process_dag, app, name, process_type, root_task_dag, max_run_duration)

Constructor

Parameters:

Name Type Description Default
next_process_dag List[IProcessTemplateDAG]

The list of next processes to execute in this workflow

required
app Service

The Dagger instance

required
name str

the name of the process

required
process_type Type[ITask]

the type of the process to instantiate

required
root_task_dag Optional[TaskTemplate]

The workflow type instance(definition)

required
max_run_duration int

The maximum time this process can execute until its marked as failure(timeout)

required

create_instance(id, parent_id, parent_name, partition_key_lookup, *, repartition=True, seed=None, workflow_instance=None, **kwargs) abstractmethod async

Method for creating an instance of a process instance

Parameters:

Name Type Description Default
id UUID

The id of the instance

required
parent_id UUID

the id of the parent of this process(The workflow instance)

required
parent_name str

the name of the workflow

required
partition_key_lookup str

The kafka partitioning key to look up

required
repartition bool

If true the instance is serialized and stored in the owner of the parition owned by the partioning key

True
seed random.Random

The seed to use to create any child instances

None
workflow_instance ITemplateDAGInstance

the instance of the workflow

None
**kwargs

Other keyword arguments

{}

Returns:

Type Description
IProcessTemplateDAGInstance

the instance of the Process

set_dynamic_process_builders(process_template_builders) abstractmethod

Sets the dynamic process builders on the process instance determined at runtime. Only used when the processes of the workflow cannot be determined statically and needs to be defined at runtime

Parameters:

Name Type Description Default
process_template_builders

the list of ProcessTemplateDagBuilder

required

set_parallel_process_template_dags(parallel_process_templates) abstractmethod

Method to set child_process_task_templates for parallel execution

Parameters:

Name Type Description Default
parallel_process_templates

List of parallel process templates

required

IDynamicProcessTemplateDAG

Bases: IProcessTemplateDAG

Skeleton class defining the attributes and functions of dynamic processes and task instances.

__init__(next_process_dags, app, name, max_run_duration)

Init method

Parameters:

Name Type Description Default
next_process_dags List[IProcessTemplateDAG]

The next Process to execute after the current process is complete

required
app Service

The Dagger instance

required
name str

The name of the process

required
max_run_duration int

the timeout on the process

required

create_instance(id, parent_id, parent_name, partition_key_lookup, *, repartition=True, seed=None, workflow_instance=None, **kwargs) async

Method for creating an dynamic instance(s) of a template and return the head of the list

Parameters:

Name Type Description Default
id UUID

The id of the instance to create

required
parent_id UUID

the id of the parent instance

required
parent_name str

the name of the parent workflow

required
partition_key_lookup str

The kafka partitioning key to use serialize the storage of this instance

required
repartition bool

If true, the instance is stored on the node owning the parition defined by the partioning key

True
seed random.Random

the seed to use any create any child instances

None
workflow_instance ITemplateDAGInstance

the instance of the workflow

None
**kwargs

Other keyword arguments

{}

TaskTemplate

Skeleton class defining the attributes and functions of process and task instances.

create_instance(id, parent_id, parent_name, partition_key_lookup, *, repartition=True, seed=None, workflow_instance=None, **kwargs) abstractmethod async

Method for creating an instance of a template definition

Parameters:

Name Type Description Default
id UUID

the ID of the instance

required
parent_id UUID

the ID of the parent instance if any

required
parent_name str

the name of the parent instance

required
partition_key_lookup str

The kafka paritioning key

required
repartition bool

If true the instance is serialized and stored by the node owning the parition defined by the partitioning key

True
seed random.Random

the seed to use to create any child instanes

None
workflow_instance ITemplateDAGInstance

the workflow object

None
**kwargs

Other keyword arguments

{}

DefaultTaskTemplate

Bases: TaskTemplate

Default implementation of task template.

__init__(app, type, name, task_dag_template, allow_skip_to, reprocess_on_message=False)

Init method

Parameters:

Name Type Description Default
app Service

The Dagger Instance

required
type Type[ITask]

The type of the task defined within the workflow

required
name str

the name of the task

required
task_dag_template List[TaskTemplate]

The next task to execute after this task

required
allow_skip_to bool

Set to true if processing of the worklow can skip to this task

required
reprocess_on_message bool

the task is executed when invoked irresepective of the state of the task

False

create_instance(id, parent_id, parent_name, partition_key_lookup, *, repartition=True, seed=None, workflow_instance=None, **kwargs) async

Creates an instance of the task defined by this template

Parameters:

Name Type Description Default
id UUID

the id of the instance to create

required
parent_id UUID

the id of the parent of this task to be created

required
parent_name str

the name of the task of the parent of the task to be created

required
partition_key_lookup str

the kafka partioning key if this instance needs to be repartitioned

required
repartition bool

If true, the instance is stored in the node owning the partition defined by the paritioning key

True
seed random.Random

the seed to use to create any child instances

None
workflow_instance ITemplateDAGInstance

the workflow object

None
**kwargs Any

other keywork arguments

{}

DefaultKafkaTaskTemplate

Bases: DefaultTaskTemplate

Default implementation of KafkaCommandTask.

__init__(app, type, name, topic, task_dag_templates, allow_skip_to, reprocess_on_message=False)

Init method

Parameters:

Name Type Description Default
app Service

Dagger instance

required
type Type[KafkaCommandTask[KT, VT]]

The type of KafkaCommandTask

required
name str

the name of the task

required
topic Topic

The topic to send the command on

required
task_dag_templates List[TaskTemplate]

the next task to execute after this task

required
allow_skip_to bool

If true the execution of the workflow can jump to this task

required
reprocess_on_message bool

if true, the task is executed irrespective of the state of this task

False

create_instance(id, parent_id, parent_name, partition_key_lookup, *, repartition=True, seed=None, workflow_instance=None, **kwargs) async

Method for creating an instance of a kafkaTask

Parameters:

Name Type Description Default
id UUID

The id of the instance

required
parent_id UUID

the id of the parent of this process(The workflow instance)

required
parent_name str

the name of the workflow

required
partition_key_lookup str

The kafka partitioning key to look up

required
repartition bool

If true the instance is serialized and stored in the owner of the parition owned by the partioning key

True
seed random.Random

The seed to use to create any child instances

None
workflow_instance ITemplateDAGInstance

the instance of the workflow

None
**kwargs Any

Other keyword arguments

{}

Returns:

Type Description
ITask

the instance of the Process

DefaultTriggerTaskTemplate

Bases: DefaultTaskTemplate

Default implementation of TriggerTask.

__init__(app, type, name, time_to_execute_key, task_dag_templates, allow_skip_to)

Init method

Parameters:

Name Type Description Default
app Service

The dagger instance

required
type Type[TriggerTask[KT, VT]]

The type of TriggerTask

required
name str

the name of the task

required
time_to_execute_key str

the key lookup in runtime paramters to execute the trigger

required
allow_skip_to bool

Flag For skipping serial execution

required

create_instance(id, parent_id, parent_name, partition_key_lookup, *, repartition=True, seed=None, workflow_instance=None, **kwargs) async

Method for creating an instance of a TriggerTask

Parameters:

Name Type Description Default
id UUID

The id of the instance

required
parent_id UUID

the id of the parent of this process(The workflow instance)

required
parent_name str

the name of the workflow

required
partition_key_lookup str

The kafka partitioning key to look up

required
repartition bool

If true the instance is serialized and stored in the owner of the parition owned by the partioning key

True
seed random.Random

The seed to use to create any child instances

None
workflow_instance ITemplateDAGInstance

the instance of the workflow

None
**kwargs Any

Other keyword arguments

{}

Returns:

Type Description
ITask

the instance of the Process

DefaultIntervalTaskTemplate

Bases: DefaultTaskTemplate

Default implementation of IntervalTask.

__init__(app, type, name, time_to_execute_key, time_to_force_complete_key, interval_execute_period_key, task_dag_templates, allow_skip_to)

Init method

Parameters:

Name Type Description Default
app Service

Dagger instance

required
type Type[IntervalTask[KT, VT]]

The type of IntervalTask

required
name str

the name of the task

required
time_to_execute_key str

the key to lookup in the runtime paramters to trigger the task

required
time_to_force_complete_key str

the key to lookup to timeout the task

required
interval_execute_period_key str

the frequency of execution from trigger time to timeout until the task succeeds

required
task_dag_templates List[TaskTemplate]

the next task to execute

required
allow_skip_to bool

Flag to skip serial execution

required

create_instance(id, parent_id, parent_name, partition_key_lookup, *, repartition=True, seed=None, workflow_instance=None, **kwargs) async

Method for creating an instance of IntervalTask

Parameters:

Name Type Description Default
id UUID

The id of the instance

required
parent_id UUID

the id of the parent of this process(The workflow instance)

required
parent_name str

the name of the workflow

required
partition_key_lookup str

The kafka partitioning key to look up

required
repartition bool

If true the instance is serialized and stored in the owner of the parition owned by the partioning key

True
seed random.Random

The seed to use to create any child instances

None
workflow_instance ITemplateDAGInstance

the instance of the workflow

None
**kwargs

Other keyword arguments

{}

Returns:

Type Description
ITask

the instance of the Process

ParallelCompositeTaskTemplate

Bases: DefaultTaskTemplate

Template to define the structure of parallel tasks to execute within a workflow

__init__(app, type, name, task_dag_templates, child_task_templates, allow_skip_to, reprocess_on_message=False, parallel_operator_type=TaskOperator.JOIN_ALL)

Init method

Parameters:

Name Type Description Default
app Service

The dagger instance

required
type Type[ITask]

The type of ParallelCompositeTask

required
name str

The name of the ParallelCompositeTask

required
task_dag_templates List[TaskTemplate]

the next task to execute after the parallel task completes

required
child_task_templates List[TaskTemplate]

the set of parallel tasks to execute

required
allow_skip_to bool

Skip serial execution of the workflow if this is set

required
reprocess_on_message bool

Re-execute the task irrespective of the state of the task

False
parallel_operator_type TaskOperator

Wait for all parallel tasks to complete or just one before transitioning to the next task in the workflow

TaskOperator.JOIN_ALL

DefaultTaskTemplateBuilder

Bases: TaskTemplateBuilder

Default Implementation of TaskTemplateBuilder

ParallelCompositeTaskTemplateBuilder

Bases: DefaultTaskTemplateBuilder

A type of DefaultTaskTemplateBuilder to create ParallelTasks

KafkaCommandTaskTemplateBuilder

Bases: DefaultTaskTemplateBuilder

A type of DefaultTaskTemplateBuilder to define KafkaCommandTasks

DecisionTaskTemplateBuilder

Bases: DefaultTaskTemplateBuilder

A type of DefaultTaskTemplateBuilder to define DecisionTasks

TriggerTaskTemplateBuilder

Bases: DefaultTaskTemplateBuilder

A type of DefaultTaskTemplateBuilder to define TriggerTasks

IntervalTaskTemplateBuilder

Bases: TriggerTaskTemplateBuilder

A type of DefaultTaskTemplateBuilder to define IntervalTasks

KafkaListenerTaskTemplateBuilder

Bases: DefaultTaskTemplateBuilder

A type of DefaultTaskTemplateBuilder to define KafkaListenerTasks

set_reprocess_on_message(reprocess_on_message)

Set whether or not this task should reprocess on_message if a correlated message is reprocessed.

DefaultTemplateBuilder

Bases: ITemplateDAGBuilder

Default implementation of template builder

build()

Builds the DefaultTemplateBuilder

Returns:

Type Description
ITemplateDAG

the instance of ITemplateDAG from the builder definition

set_name(name)

Sets the name of the task

Parameters:

Name Type Description Default
name str

The name of the task the builder is setting up

required

Returns:

Type Description
ITemplateDAGBuilder

The updated ITemplateDAGBuilder

set_root(template)

Sets the root process of this task

Parameters:

Name Type Description Default
template IProcessTemplateDAG

the parent Process Task

required

Returns:

Type Description
ITemplateDAGBuilder

the instance of ITemplateDAGBuilder

set_type(template_type)

Sets the type of the Task to instantiate from this builder

Parameters:

Name Type Description Default
template_type Type[ITemplateDAGInstance]

the Template Type

required

Returns:

Type Description
ITemplateDAGBuilder

the instance of ITemplateDAGBuilder

ProcessTemplateDAG

Bases: IProcessTemplateDAG

Class that encapsulates the definition of a Process

__init__(next_process_dag, app, name, process_type, root_task_dag, max_run_duration)

Constructor

Parameters:

Name Type Description Default
next_process_dag List[IProcessTemplateDAG]

The list of next processes to execute in this workflow

required
app Dagger

The Dagger instance

required
name str

the name of the process

required
process_type Type[ITask[KT, VT]]

the type of the process to instantiate

required
root_task_dag Optional[TaskTemplate]

The workflow type instance(definition)

required
max_run_duration int

The maximum time this process can execute until its marked as failure(timeout)

required

create_instance(id, parent_id, parent_name, partition_key_lookup, *, repartition=True, seed=None, workflow_instance=None, **kwargs) async

Method for creating an instance of a process instance

Parameters:

Name Type Description Default
id UUID

The id of the instance

required
parent_id UUID

the id of the parent of this process(The workflow instance)

required
parent_name str

the name of the workflow

required
partition_key_lookup str

The kafka partitioning key to look up

required
repartition bool

If true the instance is serialized and stored in the owner of the parition owned by the partioning key

True
seed random.Random

The seed to use to create any child instances

None
workflow_instance ITemplateDAGInstance

the instance of the workflow

None
**kwargs Any

Other keyword arguments

{}

Returns:

Type Description
IProcessTemplateDAGInstance[KT, VT]

the instance of the Process

ParallelCompositeProcessTemplateDAG

Bases: ProcessTemplateDAG

A Process Template to define a set of parallel Processes within a Process

__init__(next_process_dag, app, name, process_type, child_process_task_templates, parallel_operator_type)

init method

Parameters:

Name Type Description Default
next_process_dag List[IProcessTemplateDAG]

The next Process in the workflow definition

required
app Dagger

The Dagger instance

required
name str

The name of the ParallelCompositeProcessTask

required
process_type Type[ParallelCompositeTask[KT, VT]]

The type of ParallelCompositeTask to be instantiated from the definition

required
child_process_task_templates List[IProcessTemplateDAG]

The list of parallel processes to be created

required
parallel_operator_type TaskOperator

Wait for either all to complete or just one before transitioning to the next process in the workflow

required

create_instance(id, parent_id, parent_name, partition_key_lookup, *, repartition=True, seed=None, workflow_instance=None, **kwargs) async

Create a ParallelCompositeTask instance based on the template definition

Parameters:

Name Type Description Default
id UUID

The id of the instance

required
parent_id UUID

the id of the parent of this process(The workflow instance)

required
parent_name str

the name of the workflow

required
partition_key_lookup str

The kafka partitioning key to look up

required
repartition bool

If true the instance is serialized and stored in the owner of the parition owned by the partioning key

True
seed random.Random

The seed to use to create any child instances

None
workflow_instance ITemplateDAGInstance

the instance of the workflow

None
**kwargs Any

Other keyword arguments

{}

Returns:

Type Description
ParallelCompositeTask[KT, VT]

the instance of the Process

set_parallel_process_template_dags(parallel_process_templates)

Sets child_process_task_templates

Parameters:

Name Type Description Default
parallel_process_templates List[IProcessTemplateDAG]

List of parallel process templates

required

TemplateDAG

Bases: ITemplateDAG

Default Implementation of ITemplateDAG

__init__(dag, name, app, template_type)

Constructor

Parameters:

Name Type Description Default
dag IProcessTemplateDAG

The definition of the first process to execute

required
name str

the name of the Workflow

required
app Dagger

the Dagger instance

required
template_type Type[ITemplateDAGInstance[KT, VT]]

the type of the Workflow to instantiate

required

create_instance(id, partition_key_lookup, *, repartition=True, seed=None, **kwargs) async

Method for creating an instance of a workflow definition

Parameters:

Name Type Description Default
id UUID

The id of the workflow

required
partition_key_lookup str

Kafka topic partition key associated with the instance of the workflow. The key needs to be defined within the runtime parameters

required
repartition bool

Flag indicating if the creation of this instance needs to be stored on the current node or by the owner of the partition defined by the partition_key_lookup

True
seed random.Random

the seed to use to create all internal instances of the workflow

None
**kwargs

Other keyword arguments

{}

Returns:

Type Description
ITemplateDAGInstance[KT, VT]

An instance of the workflow

get_given_process(process_name)

Looks up for a specific process template within a DAG

Parameters:

Name Type Description Default
process_name str

Name of the process

required

Returns:

Type Description
Optional[IProcessTemplateDAG]

Process Template if found, else None

set_given_num_of_parallel_processes_for_a_composite_process(no_of_processes, composite_process_name, parallel_template_builder)

This method creates and sets 'N' number of new parralel processes for a given process in a DAG

Parameters:

Name Type Description Default
no_of_processes int

Number of parallel processes required

required
composite_process_name str

Name of the process with in which the parallel processes must reside

required
parallel_template_builder ProcessTemplateDagBuilder

A process template builder which needs to be cloned 'N' times and executed in parallel

required

set_parallel_process_template_dags_for_a_composite_process(name, parallel_process_templates)

Sets new process builders within a given process in a DAG

Parameters:

Name Type Description Default
name str

Name of the process with in which given process builders must reside

required
parallel_process_templates List[IProcessTemplateDAG]

List of process template builders

required

DynamicProcessTemplateDAG

Bases: IDynamicProcessTemplateDAG

A template to add Dynamic Processes to a workflow at runtime

__init__(next_process_dag, app, name, max_run_duration)

Init method

Parameters:

Name Type Description Default
next_process_dag List[IProcessTemplateDAG]

the next process in the workflow definition

required
app Dagger

The Dagger instance

required
name str

the name of the Dynamic Process

required
max_run_duration int

The timeout on the process to COMPLETE execution

required

create_instance(id, parent_id, parent_name, partition_key_lookup, *, repartition=True, seed=None, workflow_instance=None, **kwargs) async

Create a ParallelCompositeTask instance based on the template definition

Parameters:

Name Type Description Default
id UUID

The id of the instance

required
parent_id UUID

the id of the parent of this process(The workflow instance)

required
parent_name str

the name of the workflow

required
partition_key_lookup str

The kafka partitioning key to look up

required
repartition bool

If true the instance is serialized and stored in the owner of the parition owned by the partioning key

True
seed random.Random

The seed to use to create any child instances

None
workflow_instance ITemplateDAGInstance

the instance of the workflow

None
**kwargs Any

Other keyword arguments

{}

Returns:

Type Description
IProcessTemplateDAGInstance[KT, VT]

the instance of the Process

ProcessTemplateDagBuilder

Bases: IProcessTemplateDAGBuilder

Default implementation of process builder

__init__(app)

Init method

Parameters:

Name Type Description Default
app Dagger

The Dagger instance

required

DynamicProcessTemplateDagBuilder

Bases: IProcessTemplateDAGBuilder

Skeleton builder class used to build a dynamic process object(s).

set_root_task(task)

Not implemented.

Raises:

Type Description
NotImplementedError

Not implemented.

set_type(process_type)

Not implemented.

Raises:

Type Description
NotImplementedError

Not implemented.

ParallelCompositeProcessTemplateDagBuilder

Bases: IProcessTemplateDAGBuilder

Skeleton builder class used to build a parallel process object(s).

set_max_run_duration(max_run_duration)

Not implemented.

Raises:

Type Description
NotImplementedError

Not implemented.

set_root_task(task)

Not implemented.

Raises:

Type Description
NotImplementedError

Not implemented.

TaskStatusEnum

Bases: Enum

Class to indicate State of the Task

COMPLETED = 'COMPLETED' class-attribute instance-attribute

The Task COMPLETED Execution

EXECUTING = 'EXECUTING' class-attribute instance-attribute

The task is currently EXECUTING

FAILURE = 'FAILURE' class-attribute instance-attribute

The Task Failed during Execution

NOT_STARTED = 'NOT_STARTED' class-attribute instance-attribute

The Task has NOT STARTED EXECUTION

SKIPPED = 'SKIPPED' class-attribute instance-attribute

The Task Skipped Execution

STOPPED = 'STOPPED' class-attribute instance-attribute

The Task execution was STOPPED

SUBMITTED = 'SUBMITTED' class-attribute instance-attribute

The Task was SUBMITTED for Execution

TaskType

Bases: Enum

The type of the Task

LEAF = 'LEAF' class-attribute instance-attribute

Task which has no children

PARALLEL_COMPOSITE = 'PARALLEL_COMPOSITE' class-attribute instance-attribute

A container for parallel tasks

ROOT = 'ROOT' class-attribute instance-attribute

The Root node of the workflow

SUB_DAG = 'SUB_DAG' class-attribute instance-attribute

A Process Task that consists of LEAF Tasks

TaskStatus

Bases: Record

Class to serialize the status of the Task

ITask

Bases: Record, Generic[KT, VT]

Class that every template, process, and task extends. Defines attributes and core functions that Dagger uses.

evaluate(**kwargs) abstractmethod async

Processes some inputs and determines the next ITask id.

Returns:

Type Description
Optional[UUID]

The next ITask id.

execute(runtime_parameters, workflow_instance=None) abstractmethod async

Executes the ITask.

Parameters:

Name Type Description Default
runtime_parameters Dict[str, str]

The runtime parameters of the task

required
workflow_instance ITask

The workflow object

None

get_correlatable_key(payload) abstractmethod

Get the lookup key,value associated with the task.Deprecated use get_correlatable_key_from_payload

Parameters:

Name Type Description Default
payload Any

The lookup key,value.

required

Returns:

Type Description
TaskLookupKey

used to associate a task with a message.

get_correlatable_key_from_payload(payload) async

Get the lookup key,value associated with the task(Deprecated use get_correlatable_keys_from_payload).

Parameters:

Name Type Description Default
payload Any

The lookup key,value.

required

Returns:

Type Description
TaskLookupKey

used to associate a task with a message.

get_correlatable_keys_from_payload(payload) async

Get a list of lookup key,value associated with the task(s).

Parameters:

Name Type Description Default
payload Any

The lookup key,value.

required

Returns:

Type Description
List[TaskLookupKey]

used to associate a tasks with a message.

get_remaining_tasks(next_dag_id, workflow_instance, tasks=None, end_task_id=None) async

Get the remaining tasks in the workflow.

Parameters:

Name Type Description Default
next_dag_id UUID

Current ITask id.

required
tasks Optional[List[ITask]]

List of previous ITasks. Defaults to [].

None
end_task_id UUID

The task id that the function should stop and return at. Defaults to None (so end of DAG).

None
workflow_instance ITemplateDAGInstance

The Workflow object

required

Returns:

Type Description
Optional[List[ITask]]

List of remaining ITasks appended to inputted list.

notify(status, workflow_instance) async

If not completed, runs the steps required for completion by calling on_complete(). This is used to signal a task that it can now complete

Parameters:

Name Type Description Default
status TaskStatus

the status of the task to set to when completed

required
workflow_instance Optional[ITemplateDAGInstance]

the Workflow object

required

on_complete(workflow_instance, status=TaskStatus(code=TaskStatusEnum.COMPLETED.name, value=TaskStatusEnum.COMPLETED.value), *, iterate=True) async

Sets the status of the ITask to completed and starts the next ITask if there is one.

Parameters:

Name Type Description Default
workflow_instance Optional[ITemplateDAGInstance]

The workflow object

required
status TaskStatus

The status of the task to set to

TaskStatus(code=TaskStatusEnum.COMPLETED.name, value=TaskStatusEnum.COMPLETED.value)

on_message(runtime_parameters, *args, **kwargs) abstractmethod async

Defines what to do when the task recieves a message.

Parameters:

Name Type Description Default
runtime_parameters Dict[str, str]

The runtime parameters of the task

required

Returns:

Type Description
bool

True if the processing succeeds false otherwise

start(workflow_instance) abstractmethod async

Starts the ITask.

Parameters:

Name Type Description Default
workflow_instance Optional[ITemplateDAGInstance]

The Workflow instance

required

stop(runtime_parameters, workflow_instance=None) abstractmethod async

Stops the ITask.

Parameters:

Name Type Description Default
runtime_parameters Dict[str, str]

The runtime parameters of the task

required
workflow_instance ITask

The workflow object

None

MonitoringTask

Bases: TriggerTask[KT, VT], abc.ABC

A Type of TriggerTask that executes at s specific time and checks on the monitored task to execute some domain specific logic

process_monitored_task(monitored_task, workflow_instance) abstractmethod async

Callback on when business logic has to be executed on the monitored task based on the time condition

Parameters:

Name Type Description Default
monitored_task ITask

the monitored task

required
workflow_instance Optional[ITemplateDAGInstance]

the workflow object

required

Returns:

Type Description
None

None

IntervalTask

Bases: TriggerTask[KT, VT], abc.ABC

A type of Task to Trigger at a trigger time and execute multiple times until the execution completes. The task is retried until the timeout is reached periodically after the trigger time

interval_execute(runtime_parameters) async

Task to run on an interval until either the trigger end time or until this method returns True.

Parameters:

Name Type Description Default
runtime_parameters Dict[str, VT]

The runtime parameters of the task

required

Returns:

Type Description
bool

If True, finish this task.

TriggerTask

Bases: ExecutorTask[KT, VT], abc.ABC

This task waits/halts the execution of the DAG until current time >= the trigger time on the task and then invokes the execute method defined by the task

ExecutorTask

Bases: ITask[KT, VT], abc.ABC

A simple ITask that executes some domain specific logic

evaluate(**kwargs) async

Not implemented.

Raises:

Type Description
NotImplementedError

Not implemented.

on_message(runtime_parameters, *args, **kwargs) async

Not implemented.

Raises:

Type Description
NotImplementedError

Not implemented.

DefaultMonitoringTask

Bases: MonitoringTask[str, str]

Default Implementation of MonitoringTask

DecisionTask

Bases: ITask[KT, VT]

This type of task is similar to the case..switch statement in a programming language. It returns the next task to execute based on the execution logic. A decision task needs to implement

execute(runtime_parameters, workflow_instance=None) async

Not implemented.

Raises: NotImplementedError: Not implemented.

on_message(runtime_parameters, *args, **kwargs) async

Not implemented.

Raises: NotImplementedError: Not implemented.

SystemTask

Bases: ExecutorTask[str, str]

An internal Task for Dagger bookkeeping

evaluate(**kwargs) async

Not implemented.

Raises:

Type Description
NotImplementedError

Not implemented.

get_correlatable_key(payload)

Not implemented.

Raises:

Type Description
NotImplementedError

Not implemented.

on_complete(workflow_instance, status=TaskStatus(code=TaskStatusEnum.COMPLETED.name, value=TaskStatusEnum.COMPLETED.value), iterate=True) async

Not implemented.

Raises:

Type Description
NotImplementedError

Not implemented.

on_message(*args, **kwargs) async

Not implemented.

Raises:

Type Description
NotImplementedError

Not implemented.

SystemTimerTask

Bases: SystemTask

A type of SystemTask to execute internal Dagger Tasks

SensorTask

Bases: ITask[KT, VT], abc.ABC

A type of task that halts execution of the workflow until a condition is met. When the condition is met the on_message method on this task is invoked

evaluate(**kwargs) async

Not implemented.

Raises:

Type Description
NotImplementedError

Not implemented.

execute(runtime_parameters, workflow_instance=None) async

Not implemented.

Raises:

Type Description
NotImplementedError

Not implemented.

IMonitoredTask

Abstract interface to enable monitoring of a task

get_monitoring_task_type() abstractmethod

Get the TaskType to instantiate to monitor the current task

Returns:

Type Description
Type[MonitoringTask]

The Type of MonitoringTask

KafkaCommandTask

Bases: ExecutorTask[KT, VT], abc.ABC

This task is used to send a request/message on a Kafka Topic defined using the template builder. This type of task is a child task in the execution graph and can be extended by implementing the method

KafkaListenerTask

Bases: SensorTask[KT, VT], abc.ABC

This task waits/halts the execution of the DAG until a message is received on the defined Kafka topic(in the template definition). Each task is created using the DAG builder defines a durable key to correlate each received message on the topic against listener tasks. The Engine handles the complexity of invoking the appropriate task instance based on the key in the payload.

INonLeafNodeTask

Bases: ITask[KT, VT], abc.ABC

An Abstract class for any Process/SUB_DAG node

stop(runtime_parameters, workflow_instance=None) async

Stops the ITask.

Parameters:

Name Type Description Default
runtime_parameters Dict[str, str]

The runtime parameters of the task

required
workflow_instance ITask

The workflow object

None

TaskOperator

Bases: Enum

An operator for Joining Parallel Tasks

ATLEAST_ONE = 'ATLEAST_ONE' class-attribute instance-attribute

Wait for Atleast one of the parallel tasks to reach terminal state to begin execution of the next task in the workflow definition

JOIN_ALL = 'JOIN_ALL' class-attribute instance-attribute

Waits for All the parallel tasks to reach terminal state before execution of the next task in the workflow definition

ParallelCompositeTask

Bases: ITask[KT, VT], abc.ABC

SUB-DAG Task to execute parallel tasks and wait until all of them are in a terminal state before progressing to the next task This task can be embedded as a child of the root node or a process node

notify(status, workflow_instance=None) async

If not completed, runs the steps required for completion by calling on_complete().

stop(runtime_parameters, workflow_instance=None) async

Stops the ITask.

Parameters:

Name Type Description Default
runtime_parameters Dict[str, str]

The runtime parameters of the task

required
workflow_instance ITask

The workflow object

None

IProcessTemplateDAGInstance

Bases: INonLeafNodeTask[KT, VT], abc.ABC

A Process implementation of INonLeafNodeTask

CorrelatableMapValue

Bases: Record

An internal Class to store the correletable keys and their associated values for SensorTask

ITemplateDAGInstance

Bases: INonLeafNodeTask[KT, VT], abc.ABC

A root node implementation of INonLeafNodeTask

stop(runtime_parameters, workflow_instance=None) async

Stops the ITask.

Parameters:

Name Type Description Default
runtime_parameters Dict[str, str]

The runtime parameters of the task

required
workflow_instance ITask

The workflow object

None

MonitoredProcessTemplateDAGInstance

Bases: DefaultProcessTemplateDAGInstance, IMonitoredTask

Default implementation of a Monitored ProcessTask

on_complete(workflow_instance, status=TaskStatus(code=TaskStatusEnum.COMPLETED.name, value=TaskStatusEnum.COMPLETED.value), iterate=True) async

Sets the status of the ITask to completed and starts the next ITask if there is one.

Trigger

Bases: Record

Class To store the Trigger data. The time to execute a task

get_trigger_key()

The key to store for the trigger instance

Returns:

Type Description
Tuple[Optional[UUID], Optional[UUID]]

The Key to store

DAGBuilderHelper

Helper class to Model and connect Task Definition(s) for DAG's

__init__(dagger_app)

init method

Parameters:

Name Type Description Default
dagger_app Dagger

The Dagger app

required

Helper function to link processes together in a DAG definition

Parameters:

Name Type Description Default
process_builder_list List[Union[ProcessTemplateDagBuilder, IProcessTemplateDAGBuilder]]

The list of process definitions to link together

required

Returns:

Type Description
IProcessTemplateDAG

The instance of IProcessTemplateDAG

Link the tasks definition's together to define the order of tasks to execute

Parameters:

Name Type Description Default
tasks_builder_list List[TaskTemplateBuilder]

The list of TaskTemplatesBuilders to link togehter defining the chain of tasks

required

Returns:

Type Description
TaskTemplate

An instance of the TaskTemplate

generic_command_task_builder(*, topic, task_type, process_name, key_type=None, value_type=None, key_serializer=None, value_serializer=None)

Helper function to define the KafkCommandTask definition

Parameters:

Name Type Description Default
topic str

The name of the topic to write the command to

required
task_type Type[ITask]

The type of KafkaCommandTask

required
process_name str

the name of the process it belongs to

required

Returns:

Type Description
KafkaCommandTaskTemplateBuilder

the instance of KafkaCommandTaskTemplateBuilder

generic_dynamic_process_builder(*, name, max_run_duration=0)

Helper function to build a dynamic process, to be determined at runtime based on the runtime parameters

Parameters:

Name Type Description Default
name str

The name of the dynamic process

required
max_run_duration int

The timeout on the process

0

Returns:

Type Description
DynamicProcessTemplateDagBuilder

The DynamicProcessTemplateDagBuilder

generic_executor_task_builder(*, task_type, name, allow_skip_to=False)

Helper Function to define an executor task

Parameters:

Name Type Description Default
task_type Type[ITask]

The type of the Task

required
name str

the Name of the Task

required
allow_skip_to bool

If true, the execution of the DAG can skip execution to this task out of order

False

Returns:

Type Description
DefaultTaskTemplateBuilder

The DefaultTaskTemplateBuilder instance

generic_listener_task_builder(*, topic, task_type, process_name, allow_skip_to=False, concurrency=1, reprocess_on_message=False, key_type=None, value_type=None, key_serializer=None, value_serializer=None)

Helper function to define a KafkaListenerTaskTemplateBuilder

Parameters:

Name Type Description Default
topic str

the topic name the listener listens to

required
task_type Type[ITask]

The type of KafkaListenerTask

required
process_name str

the name of the parent process this task belongs tp

required
allow_skip_to bool

If true, the execution of the DAG can skip execution to this task out of order

False
concurrency int

Check Concurrency

1
reprocess_on_message bool

Re-executes the task when the message is received irrespective of the state of the task

False

Returns:

Type Description
KafkaListenerTaskTemplateBuilder

The instance of KafkaListenerTaskTemplateBuilder

generic_process_builder(*, process_name, root_task, process_type=DefaultProcessTemplateDAGInstance, max_run_duration=0)

Helper function to build a Process Definition

Parameters:

Name Type Description Default
process_name str

the Name of the process example 'ORDERS'

required
root_task TaskTemplate

The first task to execute in this process

required
process_type Type[IProcessTemplateDAGInstance]

The class type of the Process

DefaultProcessTemplateDAGInstance
max_run_duration int

the timeout on the process

0

Returns:

Type Description
ProcessTemplateDagBuilder

The ProcessTemplateDagBuilder

generic_template(*, template_name, root_process, template_type=DefaultTemplateDAGInstance)

Helper function to define the Workflow Template

Parameters:

Name Type Description Default
template_name str

The name of the workflow

required
root_process IProcessTemplateDAG

The first process definition to execute in the workflow

required
template_type Type[ITemplateDAGInstance]

The tye of ITemplateDAGInstance

DefaultTemplateDAGInstance

Returns:

Type Description
ITemplateDAG

The instance of ITemplateDAG

DaggerError

Bases: Exception

Base-class for all Dagger exceptions.

InvalidTaskType

Bases: DaggerError

Invalid Task Type.

TaskInvalidState

Bases: DaggerError

Invalid Task State.

TemplateDoesNotExist

Bases: DaggerError

Invalid Template Name

InvalidTriggerTimeForTask

Bases: DaggerError

Invalid trigger time