API Documentation
Dagger
Bases: Service
The Dagger class to create the workflow engine.
__create_app()
Initializes instance of Faust
Returns:
Type | Description |
---|---|
faust.App
|
Instance of Faust |
__init__(*, broker, datadir=None, store=StoreEnum.AEROSPIKE.value, application_name='dagger', package_name='dagger', kafka_partitions=1, task_update_topic=None, tasks_topic='dagger_task_topic', bootstrap_topic='dagger_bootstrap', beacon=None, kafka_broker_list=None, loop=None, tracing_sensor=None, datadog_sensor=None, trigger_interval=60, max_tasks_per_trigger=2000, restart_tasks_on_boot=True, aerospike_config=None, enable_changelog=True, max_correletable_keys_in_values=15000, schema_registry_url=None, message_serializer=None, delete_workflow_on_complete=False, task_update_callbacks=[], logging_config=None, **kwargs)
Initialize an instance of Dagger.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
broker |
str
|
Kafka broker address i.e. kafka://0.0.0.0:9092. Defaults to None. |
required |
datadir |
str
|
Directory where db data will reside. Defaults to None. |
None
|
store |
str
|
DB to use. Defaults to "rocksdb://". |
StoreEnum.AEROSPIKE.value
|
application_name |
str
|
Name of application. Defaults to "dagger". |
'dagger'
|
package_name |
str
|
Name of package. Defaults to "dagger". |
'dagger'
|
kafka_partitions |
int
|
Number of Kafka partitions. Defaults to 1. |
1
|
task_update_topic |
Optional[str]
|
Name of topic where tasks that have updated in status will be sent. Defaults to "task_update_topic". |
None
|
tasks_topic |
str
|
Name of topic where new tasks will be sent for execution. Defaults to "dagger_task_topic". |
'dagger_task_topic'
|
bootstrap_topic |
str
|
Name of topic where tasks after restart will be sent for execution. Defaults to "dagger_task_topic". |
'dagger_bootstrap'
|
beacon |
NodeT
|
Beacon used to track services in a dependency graph. Defaults to None. |
None
|
loop |
asyncio.AbstractEventLoop
|
Asyncio event loop to attach to. Defaults to None. |
None
|
tracing_sensor |
Sensor
|
Tracing Sensor to use for OpenTelemetry. The global tracer has to be initialized in the client. Defaults to None |
None
|
datadog_sensor |
DatadogMonitor
|
datadog statsD sensor |
None
|
aerospike_config |
AerospikeConfig
|
Config for Aerospike if enabled |
None
|
enable_changelog |
bool
|
Flag to enable/disable events on the table changelog topic |
True
|
max_correletable_keys_in_values |
int
|
maximum number of ids in the value part to chunk |
15000
|
schema_registry_url |
str
|
the schema registry URK |
None
|
message_serializer |
MessageSerializer
|
the message serializer instance using the schema registry |
None
|
delete_workflow_on_complete |
bool
|
deletes the workflow instance when complete |
False
|
task_update_callbacks |
List[Callable[[ITemplateDAGInstance], Awaitable[None]]]
|
callbacks when a workflow instance is updated |
[]
|
logging_config |
the logging config to use |
None
|
|
**kwargs |
Any
|
Other Faust keyword arguments. |
{}
|
__post_init__()
This method is called after initialization of dagger
add_topic(topic_name, topic)
Associate a topic instance with a name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
topic_name |
str
|
Name of topic. |
required |
topic |
Topic
|
Instance of Topic. |
required |
chunk_and_store_correlatable_tasks(cor_instance, value, workflow_id)
async
This method chunks the value of the key to the list so that we don't overflow the limit of the value size on the datastore used by dagger defined by max_correletable_keys_in_values
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cor_instance |
CorreletableKeyTasks
|
the CorreletableKeyTasks instance |
required |
value |
UUID
|
the new value to add to the lookup_keys |
required |
workflow_id |
UUID
|
the id of the workflow to which the cor_instance belongs to |
required |
create_topic(topic_name, key_type=None, value_type=None, key_serializer=None, value_serializer=None)
classmethod
Create a Kafka topic using Faust
Parameters:
Name | Type | Description | Default |
---|---|---|---|
topic_name |
str
|
Name of topic |
required |
key_type |
Optional[ModelArg]
|
Key type for the topic. Defaults to str. |
None
|
value_type |
Optional[ModelArg]
|
Value type for the topic. Defaults to str. |
None
|
Returns:
Type | Description |
---|---|
TopicT
|
Instance of Faust Topic |
get_correletable_key_instances(cor_instance)
async
Gathers all the chunked values of the cor_instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cor_instance |
CorreletableKeyTasks
|
the CorreletableKeyTasks to gather |
required |
Returns:
Type | Description |
---|---|
List[CorreletableKeyTasks]
|
A list of all the chunked CorreletableKeyTasks |
get_db_options()
Get the DB options on the dagger store
Returns:
Type | Description |
---|---|
Mapping[str, Any]
|
the DB options |
get_instance(id, log=True)
async
Get an instance of an ITask given it's id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id |
UUID
|
Id of the ITask. |
required |
log |
bool
|
suppress logging if set to True |
True
|
Returns:
Type | Description |
---|---|
ITask
|
Instance of the ITask. |
get_monitoring_task(task, workflow_instance)
async
Returns the monitoring task associated with this task
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
ITask
|
the task to check for |
required |
Returns:
Type | Description |
---|---|
Optional[MonitoringTask]
|
the monitoring task instance or None |
get_template(template_name)
Get the instance of a template that contains the workflow definition given it's name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
template_name |
str
|
Name of template |
required |
Returns:
Type | Description |
---|---|
ITemplateDAG
|
Instance of Template that contains the workflow definition |
Raises:
Type | Description |
---|---|
TemplateDoesNotExist
|
Template does not exist |
get_topic(topic_name)
Get a topic based on the associated name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
topic_name |
str
|
Name of topic. |
required |
Returns:
Type | Description |
---|---|
TopicT
|
the instance of the topic |
main(override_logging=False)
Main method that initializes the Dagger worker. This method is blocking.
register_process_template(process_template_name)
classmethod
Registers a process template in Dagger.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
(str) |
process_template_name
|
Name of process to register in dagger. |
required |
register_template(template_name)
classmethod
this method is used to register a workflow definition with Dagger. Refer to the examples in the documentation
Parameters:
Name | Type | Description | Default |
---|---|---|---|
template_name |
str
|
Name of workflow to register |
required |
remove_task_from_correletable_keys_table(task, workflow_instance)
async
Removes a task from the correletable keys table
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
ITask
|
the task to remove from the correletable keys table |
required |
workflow_instance |
ITemplateDAGInstance
|
the workflow instance |
required |
submit(task, *, repartition=True)
async
Submits a workflow instance for execution.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
ITask
|
The workflow instance. |
required |
repartition |
bool
|
if True it uses the the repartitioning key to submit the task for execution on the configured kafka topic. If false, it creates the workflow on the same node and submits it for execution |
True
|
update_correletable_key_for_task(task_instance, key=None, new_task=False, workflow_instance=None)
async
Updates the correletable key for a SensorTask within the datastore.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task_instance |
ITask
|
the SensorTask for which the correletable key is updated |
required |
key |
str
|
the correletable key name to update. |
None
|
new_task |
bool
|
If the task is not new, then the entire runtime paramters and subsequent tasks need to be updated |
False
|
workflow_instance |
ITemplateDAGInstance
|
the workflow instance |
None
|
ITemplateDAGBuilder
Base class to define the structure of workflow definition
__init__(app)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
app |
Service
|
The Dagger app instance |
required |
build()
abstractmethod
Builds the ITemplateDAGBuilder object.
Returns:
Type | Description |
---|---|
ITemplateDAG
|
The instance of ITemplateDAG |
set_name(name)
abstractmethod
Sets the name of the template.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
Name of the template. |
required |
Returns:
Type | Description |
---|---|
ITemplateDAGBuilder
|
An instance of the updated template builder. |
set_root(template)
abstractmethod
Sets the first process to execute in the template.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
template |
IProcessTemplateDAG
|
Instance of a process template containing the defintion of the process. |
required |
Returns:
Type | Description |
---|---|
ITemplateDAGBuilder
|
An instance of the updated template builder. |
set_type(template_type)
abstractmethod
Sets the type of the process.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
template_type |
Type[ITemplateDAGInstance]
|
The type of the process. |
required |
Returns:
Type | Description |
---|---|
ITemplateDAGBuilder
|
An instance of the updated template builder. |
IProcessTemplateDAGBuilder
Skeleton builder class used to build a process definition within a workfow.
__init__(app)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
app |
Service
|
The Dagger instance |
required |
build()
abstractmethod
Builds the IProcessTemplateDAG object
Returns:
Type | Description |
---|---|
IProcessTemplateDAG
|
the instance of IProcessTemplateDAG to create the workflow definition |
set_name(process_name)
abstractmethod
Sets the name of the process.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
process_name |
str
|
Name of the process. |
required |
Returns:
Type | Description |
---|---|
IProcessTemplateDAGBuilder
|
An instance of the updated process template builder. |
set_next_process(task)
abstractmethod
Set the next process in the execution of the workflow defintion.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
IProcessTemplateDAG
|
TaskTemplate to be set as the next process. |
required |
Returns:
Type | Description |
---|---|
IProcessTemplateDAGBuilder
|
An instance of the updated process template builder. |
set_root_task(task)
abstractmethod
Set the first task in the process definition of the workflow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
TaskTemplate
|
TaskTemplate to be set as the first task of the process execution. |
required |
Returns:
Type | Description |
---|---|
IProcessTemplateDAGBuilder
|
An instance of the updated process template builder. |
set_type(process_type)
abstractmethod
Sets the type of the process.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
process_type |
Type[ITask]
|
The type of the process. |
required |
Returns:
Type | Description |
---|---|
IProcessTemplateDAGBuilder
|
An instance of the updated process template builder. |
TaskTemplateBuilder
Skeleton builder class used to build the definition of a task object within a workflow
__init__(app)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
app |
Service
|
The Dagger object |
required |
build()
abstractmethod
Builds the TaskTemplate object
Returns:
Type | Description |
---|---|
TaskTemplate
|
The TaskTemplate instance to add to the ProcessBuilder definition |
set_allow_skip_to(allow_skip_to)
Set whether or not this task is allowed to be executed out of order (skipped to)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
allow_skip_to |
bool
|
If set to true a Sensor task can be executed if an event is received out of order |
required |
Returns:
Type | Description |
---|---|
TaskTemplateBuilder
|
An instance of the updated template builder. |
set_name(name)
abstractmethod
Set the name of task
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
the name of the Task |
required |
Returns:
Type | Description |
---|---|
TaskTemplateBuilder
|
An instance of the updated template builder. |
set_next(task_template)
Set the next task in the process definition within the workflow
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task_template |
TaskTemplate
|
the next task template definition |
required |
Returns:
Type | Description |
---|---|
TaskTemplateBuilder
|
An instance of the updated template builder. |
set_type(task_type)
abstractmethod
Sets the type of task
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task_type |
Type[ITask]
|
the type of the task to instantiate from the builder |
required |
Returns:
Type | Description |
---|---|
TaskTemplateBuilder
|
An instance of the updated template builder. |
ITemplateDAG
Skeleton class defining the attributes and functions of a template instance.
__init__(dag, name, app, template_type)
ITemplateDAG Constructor
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dag |
IProcessTemplateDAG
|
The definition of the first process to execute |
required |
name |
str
|
the name of the Workflow |
required |
app |
Service
|
the Dagger instance |
required |
template_type |
Type[ITemplateDAGInstance]
|
the type of the Workflow to instantiate |
required |
create_instance(id, partition_key_lookup, *, repartition=True, seed=None, **kwargs)
abstractmethod
async
Method for creating an instance of a workflow definition
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id |
UUID
|
The id of the workflow |
required |
partition_key_lookup |
str
|
Kafka topic partition key associated with the instance of the workflow. The key needs to be defined within the runtime parameters |
required |
repartition |
bool
|
Flag indicating if the creation of this instance needs to be stored on the current node or by the owner of the partition defined by the partition_key_lookup |
True
|
seed |
random.Random
|
the seed to use to create all internal instances of the workflow |
None
|
**kwargs |
Other keyword arguments |
{}
|
Returns:
Type | Description |
---|---|
ITemplateDAGInstance
|
An instance of the workflow |
set_dynamic_builders_for_process_template(name, process_template_builders)
abstractmethod
Use these builders only when the processes of the workflow definition need to be determined at runtime. Using these the processes in the workflow definition can be defined at runtime based on the runtime parameters of the workflow
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
Then name of the dynamic process builder |
required |
process_template_builders |
the ProcessTemplateDagBuilder dynamic process builders |
required |
IProcessTemplateDAG
Skeleton class defining the attributes and functions of a process instance.
__init__(next_process_dag, app, name, process_type, root_task_dag, max_run_duration)
Constructor
Parameters:
Name | Type | Description | Default |
---|---|---|---|
next_process_dag |
List[IProcessTemplateDAG]
|
The list of next processes to execute in this workflow |
required |
app |
Service
|
The Dagger instance |
required |
name |
str
|
the name of the process |
required |
process_type |
Type[ITask]
|
the type of the process to instantiate |
required |
root_task_dag |
Optional[TaskTemplate]
|
The workflow type instance(definition) |
required |
max_run_duration |
int
|
The maximum time this process can execute until its marked as failure(timeout) |
required |
create_instance(id, parent_id, parent_name, partition_key_lookup, *, repartition=True, seed=None, workflow_instance=None, **kwargs)
abstractmethod
async
Method for creating an instance of a process instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id |
UUID
|
The id of the instance |
required |
parent_id |
UUID
|
the id of the parent of this process(The workflow instance) |
required |
parent_name |
str
|
the name of the workflow |
required |
partition_key_lookup |
str
|
The kafka partitioning key to look up |
required |
repartition |
bool
|
If true the instance is serialized and stored in the owner of the parition owned by the partioning key |
True
|
seed |
random.Random
|
The seed to use to create any child instances |
None
|
workflow_instance |
ITemplateDAGInstance
|
the instance of the workflow |
None
|
**kwargs |
Other keyword arguments |
{}
|
Returns:
Type | Description |
---|---|
IProcessTemplateDAGInstance
|
the instance of the Process |
set_dynamic_process_builders(process_template_builders)
abstractmethod
Sets the dynamic process builders on the process instance determined at runtime. Only used when the processes of the workflow cannot be determined statically and needs to be defined at runtime
Parameters:
Name | Type | Description | Default |
---|---|---|---|
process_template_builders |
the list of ProcessTemplateDagBuilder |
required |
set_parallel_process_template_dags(parallel_process_templates)
abstractmethod
Method to set child_process_task_templates for parallel execution
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parallel_process_templates |
List of parallel process templates |
required |
IDynamicProcessTemplateDAG
Bases: IProcessTemplateDAG
Skeleton class defining the attributes and functions of dynamic processes and task instances.
__init__(next_process_dags, app, name, max_run_duration)
Init method
Parameters:
Name | Type | Description | Default |
---|---|---|---|
next_process_dags |
List[IProcessTemplateDAG]
|
The next Process to execute after the current process is complete |
required |
app |
Service
|
The Dagger instance |
required |
name |
str
|
The name of the process |
required |
max_run_duration |
int
|
the timeout on the process |
required |
create_instance(id, parent_id, parent_name, partition_key_lookup, *, repartition=True, seed=None, workflow_instance=None, **kwargs)
async
Method for creating an dynamic instance(s) of a template and return the head of the list
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id |
UUID
|
The id of the instance to create |
required |
parent_id |
UUID
|
the id of the parent instance |
required |
parent_name |
str
|
the name of the parent workflow |
required |
partition_key_lookup |
str
|
The kafka partitioning key to use serialize the storage of this instance |
required |
repartition |
bool
|
If true, the instance is stored on the node owning the parition defined by the partioning key |
True
|
seed |
random.Random
|
the seed to use any create any child instances |
None
|
workflow_instance |
ITemplateDAGInstance
|
the instance of the workflow |
None
|
**kwargs |
Other keyword arguments |
{}
|
TaskTemplate
Skeleton class defining the attributes and functions of process and task instances.
create_instance(id, parent_id, parent_name, partition_key_lookup, *, repartition=True, seed=None, workflow_instance=None, **kwargs)
abstractmethod
async
Method for creating an instance of a template definition
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id |
UUID
|
the ID of the instance |
required |
parent_id |
UUID
|
the ID of the parent instance if any |
required |
parent_name |
str
|
the name of the parent instance |
required |
partition_key_lookup |
str
|
The kafka paritioning key |
required |
repartition |
bool
|
If true the instance is serialized and stored by the node owning the parition defined by the partitioning key |
True
|
seed |
random.Random
|
the seed to use to create any child instanes |
None
|
workflow_instance |
ITemplateDAGInstance
|
the workflow object |
None
|
**kwargs |
Other keyword arguments |
{}
|
DefaultTaskTemplate
Bases: TaskTemplate
Default implementation of task template.
__init__(app, type, name, task_dag_template, allow_skip_to, reprocess_on_message=False)
Init method
Parameters:
Name | Type | Description | Default |
---|---|---|---|
app |
Service
|
The Dagger Instance |
required |
type |
Type[ITask]
|
The type of the task defined within the workflow |
required |
name |
str
|
the name of the task |
required |
task_dag_template |
List[TaskTemplate]
|
The next task to execute after this task |
required |
allow_skip_to |
bool
|
Set to true if processing of the worklow can skip to this task |
required |
reprocess_on_message |
bool
|
the task is executed when invoked irresepective of the state of the task |
False
|
create_instance(id, parent_id, parent_name, partition_key_lookup, *, repartition=True, seed=None, workflow_instance=None, **kwargs)
async
Creates an instance of the task defined by this template
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id |
UUID
|
the id of the instance to create |
required |
parent_id |
UUID
|
the id of the parent of this task to be created |
required |
parent_name |
str
|
the name of the task of the parent of the task to be created |
required |
partition_key_lookup |
str
|
the kafka partioning key if this instance needs to be repartitioned |
required |
repartition |
bool
|
If true, the instance is stored in the node owning the partition defined by the paritioning key |
True
|
seed |
random.Random
|
the seed to use to create any child instances |
None
|
workflow_instance |
ITemplateDAGInstance
|
the workflow object |
None
|
**kwargs |
Any
|
other keywork arguments |
{}
|
DefaultKafkaTaskTemplate
Bases: DefaultTaskTemplate
Default implementation of KafkaCommandTask.
__init__(app, type, name, topic, task_dag_templates, allow_skip_to, reprocess_on_message=False)
Init method
Parameters:
Name | Type | Description | Default |
---|---|---|---|
app |
Service
|
Dagger instance |
required |
type |
Type[KafkaCommandTask[KT, VT]]
|
The type of KafkaCommandTask |
required |
name |
str
|
the name of the task |
required |
topic |
Topic
|
The topic to send the command on |
required |
task_dag_templates |
List[TaskTemplate]
|
the next task to execute after this task |
required |
allow_skip_to |
bool
|
If true the execution of the workflow can jump to this task |
required |
reprocess_on_message |
bool
|
if true, the task is executed irrespective of the state of this task |
False
|
create_instance(id, parent_id, parent_name, partition_key_lookup, *, repartition=True, seed=None, workflow_instance=None, **kwargs)
async
Method for creating an instance of a kafkaTask
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id |
UUID
|
The id of the instance |
required |
parent_id |
UUID
|
the id of the parent of this process(The workflow instance) |
required |
parent_name |
str
|
the name of the workflow |
required |
partition_key_lookup |
str
|
The kafka partitioning key to look up |
required |
repartition |
bool
|
If true the instance is serialized and stored in the owner of the parition owned by the partioning key |
True
|
seed |
random.Random
|
The seed to use to create any child instances |
None
|
workflow_instance |
ITemplateDAGInstance
|
the instance of the workflow |
None
|
**kwargs |
Any
|
Other keyword arguments |
{}
|
Returns:
Type | Description |
---|---|
ITask
|
the instance of the Process |
DefaultTriggerTaskTemplate
Bases: DefaultTaskTemplate
Default implementation of TriggerTask.
__init__(app, type, name, time_to_execute_key, task_dag_templates, allow_skip_to)
Init method
Parameters:
Name | Type | Description | Default |
---|---|---|---|
app |
Service
|
The dagger instance |
required |
type |
Type[TriggerTask[KT, VT]]
|
The type of TriggerTask |
required |
name |
str
|
the name of the task |
required |
time_to_execute_key |
str
|
the key lookup in runtime paramters to execute the trigger |
required |
allow_skip_to |
bool
|
Flag For skipping serial execution |
required |
create_instance(id, parent_id, parent_name, partition_key_lookup, *, repartition=True, seed=None, workflow_instance=None, **kwargs)
async
Method for creating an instance of a TriggerTask
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id |
UUID
|
The id of the instance |
required |
parent_id |
UUID
|
the id of the parent of this process(The workflow instance) |
required |
parent_name |
str
|
the name of the workflow |
required |
partition_key_lookup |
str
|
The kafka partitioning key to look up |
required |
repartition |
bool
|
If true the instance is serialized and stored in the owner of the parition owned by the partioning key |
True
|
seed |
random.Random
|
The seed to use to create any child instances |
None
|
workflow_instance |
ITemplateDAGInstance
|
the instance of the workflow |
None
|
**kwargs |
Any
|
Other keyword arguments |
{}
|
Returns:
Type | Description |
---|---|
ITask
|
the instance of the Process |
DefaultIntervalTaskTemplate
Bases: DefaultTaskTemplate
Default implementation of IntervalTask.
__init__(app, type, name, time_to_execute_key, time_to_force_complete_key, interval_execute_period_key, task_dag_templates, allow_skip_to)
Init method
Parameters:
Name | Type | Description | Default |
---|---|---|---|
app |
Service
|
Dagger instance |
required |
type |
Type[IntervalTask[KT, VT]]
|
The type of IntervalTask |
required |
name |
str
|
the name of the task |
required |
time_to_execute_key |
str
|
the key to lookup in the runtime paramters to trigger the task |
required |
time_to_force_complete_key |
str
|
the key to lookup to timeout the task |
required |
interval_execute_period_key |
str
|
the frequency of execution from trigger time to timeout until the task succeeds |
required |
task_dag_templates |
List[TaskTemplate]
|
the next task to execute |
required |
allow_skip_to |
bool
|
Flag to skip serial execution |
required |
create_instance(id, parent_id, parent_name, partition_key_lookup, *, repartition=True, seed=None, workflow_instance=None, **kwargs)
async
Method for creating an instance of IntervalTask
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id |
UUID
|
The id of the instance |
required |
parent_id |
UUID
|
the id of the parent of this process(The workflow instance) |
required |
parent_name |
str
|
the name of the workflow |
required |
partition_key_lookup |
str
|
The kafka partitioning key to look up |
required |
repartition |
bool
|
If true the instance is serialized and stored in the owner of the parition owned by the partioning key |
True
|
seed |
random.Random
|
The seed to use to create any child instances |
None
|
workflow_instance |
ITemplateDAGInstance
|
the instance of the workflow |
None
|
**kwargs |
Other keyword arguments |
{}
|
Returns:
Type | Description |
---|---|
ITask
|
the instance of the Process |
ParallelCompositeTaskTemplate
Bases: DefaultTaskTemplate
Template to define the structure of parallel tasks to execute within a workflow
__init__(app, type, name, task_dag_templates, child_task_templates, allow_skip_to, reprocess_on_message=False, parallel_operator_type=TaskOperator.JOIN_ALL)
Init method
Parameters:
Name | Type | Description | Default |
---|---|---|---|
app |
Service
|
The dagger instance |
required |
type |
Type[ITask]
|
The type of ParallelCompositeTask |
required |
name |
str
|
The name of the ParallelCompositeTask |
required |
task_dag_templates |
List[TaskTemplate]
|
the next task to execute after the parallel task completes |
required |
child_task_templates |
List[TaskTemplate]
|
the set of parallel tasks to execute |
required |
allow_skip_to |
bool
|
Skip serial execution of the workflow if this is set |
required |
reprocess_on_message |
bool
|
Re-execute the task irrespective of the state of the task |
False
|
parallel_operator_type |
TaskOperator
|
Wait for all parallel tasks to complete or just one before transitioning to the next task in the workflow |
TaskOperator.JOIN_ALL
|
DefaultTaskTemplateBuilder
ParallelCompositeTaskTemplateBuilder
KafkaCommandTaskTemplateBuilder
DecisionTaskTemplateBuilder
TriggerTaskTemplateBuilder
IntervalTaskTemplateBuilder
KafkaListenerTaskTemplateBuilder
Bases: DefaultTaskTemplateBuilder
A type of DefaultTaskTemplateBuilder to define KafkaListenerTasks
set_reprocess_on_message(reprocess_on_message)
Set whether or not this task should reprocess on_message if a correlated message is reprocessed.
DefaultTemplateBuilder
Bases: ITemplateDAGBuilder
Default implementation of template builder
build()
Builds the DefaultTemplateBuilder
Returns:
Type | Description |
---|---|
ITemplateDAG
|
the instance of ITemplateDAG from the builder definition |
set_name(name)
Sets the name of the task
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
The name of the task the builder is setting up |
required |
Returns:
Type | Description |
---|---|
ITemplateDAGBuilder
|
The updated ITemplateDAGBuilder |
set_root(template)
Sets the root process of this task
Parameters:
Name | Type | Description | Default |
---|---|---|---|
template |
IProcessTemplateDAG
|
the parent Process Task |
required |
Returns:
Type | Description |
---|---|
ITemplateDAGBuilder
|
the instance of ITemplateDAGBuilder |
set_type(template_type)
Sets the type of the Task to instantiate from this builder
Parameters:
Name | Type | Description | Default |
---|---|---|---|
template_type |
Type[ITemplateDAGInstance]
|
the Template Type |
required |
Returns:
Type | Description |
---|---|
ITemplateDAGBuilder
|
the instance of ITemplateDAGBuilder |
ProcessTemplateDAG
Bases: IProcessTemplateDAG
Class that encapsulates the definition of a Process
__init__(next_process_dag, app, name, process_type, root_task_dag, max_run_duration)
Constructor
Parameters:
Name | Type | Description | Default |
---|---|---|---|
next_process_dag |
List[IProcessTemplateDAG]
|
The list of next processes to execute in this workflow |
required |
app |
Dagger
|
The Dagger instance |
required |
name |
str
|
the name of the process |
required |
process_type |
Type[ITask[KT, VT]]
|
the type of the process to instantiate |
required |
root_task_dag |
Optional[TaskTemplate]
|
The workflow type instance(definition) |
required |
max_run_duration |
int
|
The maximum time this process can execute until its marked as failure(timeout) |
required |
create_instance(id, parent_id, parent_name, partition_key_lookup, *, repartition=True, seed=None, workflow_instance=None, **kwargs)
async
Method for creating an instance of a process instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id |
UUID
|
The id of the instance |
required |
parent_id |
UUID
|
the id of the parent of this process(The workflow instance) |
required |
parent_name |
str
|
the name of the workflow |
required |
partition_key_lookup |
str
|
The kafka partitioning key to look up |
required |
repartition |
bool
|
If true the instance is serialized and stored in the owner of the parition owned by the partioning key |
True
|
seed |
random.Random
|
The seed to use to create any child instances |
None
|
workflow_instance |
ITemplateDAGInstance
|
the instance of the workflow |
None
|
**kwargs |
Any
|
Other keyword arguments |
{}
|
Returns:
Type | Description |
---|---|
IProcessTemplateDAGInstance[KT, VT]
|
the instance of the Process |
ParallelCompositeProcessTemplateDAG
Bases: ProcessTemplateDAG
A Process Template to define a set of parallel Processes within a Process
__init__(next_process_dag, app, name, process_type, child_process_task_templates, parallel_operator_type)
init method
Parameters:
Name | Type | Description | Default |
---|---|---|---|
next_process_dag |
List[IProcessTemplateDAG]
|
The next Process in the workflow definition |
required |
app |
Dagger
|
The Dagger instance |
required |
name |
str
|
The name of the ParallelCompositeProcessTask |
required |
process_type |
Type[ParallelCompositeTask[KT, VT]]
|
The type of ParallelCompositeTask to be instantiated from the definition |
required |
child_process_task_templates |
List[IProcessTemplateDAG]
|
The list of parallel processes to be created |
required |
parallel_operator_type |
TaskOperator
|
Wait for either all to complete or just one before transitioning to the next process in the workflow |
required |
create_instance(id, parent_id, parent_name, partition_key_lookup, *, repartition=True, seed=None, workflow_instance=None, **kwargs)
async
Create a ParallelCompositeTask instance based on the template definition
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id |
UUID
|
The id of the instance |
required |
parent_id |
UUID
|
the id of the parent of this process(The workflow instance) |
required |
parent_name |
str
|
the name of the workflow |
required |
partition_key_lookup |
str
|
The kafka partitioning key to look up |
required |
repartition |
bool
|
If true the instance is serialized and stored in the owner of the parition owned by the partioning key |
True
|
seed |
random.Random
|
The seed to use to create any child instances |
None
|
workflow_instance |
ITemplateDAGInstance
|
the instance of the workflow |
None
|
**kwargs |
Any
|
Other keyword arguments |
{}
|
Returns:
Type | Description |
---|---|
ParallelCompositeTask[KT, VT]
|
the instance of the Process |
set_parallel_process_template_dags(parallel_process_templates)
Sets child_process_task_templates
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parallel_process_templates |
List[IProcessTemplateDAG]
|
List of parallel process templates |
required |
TemplateDAG
Bases: ITemplateDAG
Default Implementation of ITemplateDAG
__init__(dag, name, app, template_type)
Constructor
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dag |
IProcessTemplateDAG
|
The definition of the first process to execute |
required |
name |
str
|
the name of the Workflow |
required |
app |
Dagger
|
the Dagger instance |
required |
template_type |
Type[ITemplateDAGInstance[KT, VT]]
|
the type of the Workflow to instantiate |
required |
create_instance(id, partition_key_lookup, *, repartition=True, seed=None, **kwargs)
async
Method for creating an instance of a workflow definition
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id |
UUID
|
The id of the workflow |
required |
partition_key_lookup |
str
|
Kafka topic partition key associated with the instance of the workflow. The key needs to be defined within the runtime parameters |
required |
repartition |
bool
|
Flag indicating if the creation of this instance needs to be stored on the current node or by the owner of the partition defined by the partition_key_lookup |
True
|
seed |
random.Random
|
the seed to use to create all internal instances of the workflow |
None
|
**kwargs |
Other keyword arguments |
{}
|
Returns:
Type | Description |
---|---|
ITemplateDAGInstance[KT, VT]
|
An instance of the workflow |
get_given_process(process_name)
Looks up for a specific process template within a DAG
Parameters:
Name | Type | Description | Default |
---|---|---|---|
process_name |
str
|
Name of the process |
required |
Returns:
Type | Description |
---|---|
Optional[IProcessTemplateDAG]
|
Process Template if found, else None |
set_given_num_of_parallel_processes_for_a_composite_process(no_of_processes, composite_process_name, parallel_template_builder)
This method creates and sets 'N' number of new parralel processes for a given process in a DAG
Parameters:
Name | Type | Description | Default |
---|---|---|---|
no_of_processes |
int
|
Number of parallel processes required |
required |
composite_process_name |
str
|
Name of the process with in which the parallel processes must reside |
required |
parallel_template_builder |
ProcessTemplateDagBuilder
|
A process template builder which needs to be cloned 'N' times and executed in parallel |
required |
set_parallel_process_template_dags_for_a_composite_process(name, parallel_process_templates)
Sets new process builders within a given process in a DAG
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
Name of the process with in which given process builders must reside |
required |
parallel_process_templates |
List[IProcessTemplateDAG]
|
List of process template builders |
required |
DynamicProcessTemplateDAG
Bases: IDynamicProcessTemplateDAG
A template to add Dynamic Processes to a workflow at runtime
__init__(next_process_dag, app, name, max_run_duration)
Init method
Parameters:
Name | Type | Description | Default |
---|---|---|---|
next_process_dag |
List[IProcessTemplateDAG]
|
the next process in the workflow definition |
required |
app |
Dagger
|
The Dagger instance |
required |
name |
str
|
the name of the Dynamic Process |
required |
max_run_duration |
int
|
The timeout on the process to COMPLETE execution |
required |
create_instance(id, parent_id, parent_name, partition_key_lookup, *, repartition=True, seed=None, workflow_instance=None, **kwargs)
async
Create a ParallelCompositeTask instance based on the template definition
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id |
UUID
|
The id of the instance |
required |
parent_id |
UUID
|
the id of the parent of this process(The workflow instance) |
required |
parent_name |
str
|
the name of the workflow |
required |
partition_key_lookup |
str
|
The kafka partitioning key to look up |
required |
repartition |
bool
|
If true the instance is serialized and stored in the owner of the parition owned by the partioning key |
True
|
seed |
random.Random
|
The seed to use to create any child instances |
None
|
workflow_instance |
ITemplateDAGInstance
|
the instance of the workflow |
None
|
**kwargs |
Any
|
Other keyword arguments |
{}
|
Returns:
Type | Description |
---|---|
IProcessTemplateDAGInstance[KT, VT]
|
the instance of the Process |
ProcessTemplateDagBuilder
DynamicProcessTemplateDagBuilder
Bases: IProcessTemplateDAGBuilder
Skeleton builder class used to build a dynamic process object(s).
set_root_task(task)
Not implemented.
Raises:
Type | Description |
---|---|
NotImplementedError
|
Not implemented. |
set_type(process_type)
Not implemented.
Raises:
Type | Description |
---|---|
NotImplementedError
|
Not implemented. |
ParallelCompositeProcessTemplateDagBuilder
Bases: IProcessTemplateDAGBuilder
Skeleton builder class used to build a parallel process object(s).
set_max_run_duration(max_run_duration)
Not implemented.
Raises:
Type | Description |
---|---|
NotImplementedError
|
Not implemented. |
set_root_task(task)
Not implemented.
Raises:
Type | Description |
---|---|
NotImplementedError
|
Not implemented. |
TaskStatusEnum
Bases: Enum
Class to indicate State of the Task
COMPLETED = 'COMPLETED'
class-attribute
instance-attribute
The Task COMPLETED Execution
EXECUTING = 'EXECUTING'
class-attribute
instance-attribute
The task is currently EXECUTING
FAILURE = 'FAILURE'
class-attribute
instance-attribute
The Task Failed during Execution
NOT_STARTED = 'NOT_STARTED'
class-attribute
instance-attribute
The Task has NOT STARTED EXECUTION
SKIPPED = 'SKIPPED'
class-attribute
instance-attribute
The Task Skipped Execution
STOPPED = 'STOPPED'
class-attribute
instance-attribute
The Task execution was STOPPED
SUBMITTED = 'SUBMITTED'
class-attribute
instance-attribute
The Task was SUBMITTED for Execution
TaskType
Bases: Enum
The type of the Task
LEAF = 'LEAF'
class-attribute
instance-attribute
Task which has no children
PARALLEL_COMPOSITE = 'PARALLEL_COMPOSITE'
class-attribute
instance-attribute
A container for parallel tasks
ROOT = 'ROOT'
class-attribute
instance-attribute
The Root node of the workflow
SUB_DAG = 'SUB_DAG'
class-attribute
instance-attribute
A Process Task that consists of LEAF Tasks
TaskStatus
Bases: Record
Class to serialize the status of the Task
ITask
Bases: Record
, Generic[KT, VT]
Class that every template, process, and task extends. Defines attributes and core functions that Dagger uses.
evaluate(**kwargs)
abstractmethod
async
Processes some inputs and determines the next ITask id.
Returns:
Type | Description |
---|---|
Optional[UUID]
|
The next ITask id. |
execute(runtime_parameters, workflow_instance=None)
abstractmethod
async
Executes the ITask.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
runtime_parameters |
Dict[str, str]
|
The runtime parameters of the task |
required |
workflow_instance |
ITask
|
The workflow object |
None
|
get_correlatable_key(payload)
abstractmethod
Get the lookup key,value associated with the task.Deprecated use get_correlatable_key_from_payload
Parameters:
Name | Type | Description | Default |
---|---|---|---|
payload |
Any
|
The lookup key,value. |
required |
Returns:
Type | Description |
---|---|
TaskLookupKey
|
used to associate a task with a message. |
get_correlatable_key_from_payload(payload)
async
Get the lookup key,value associated with the task(Deprecated use get_correlatable_keys_from_payload).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
payload |
Any
|
The lookup key,value. |
required |
Returns:
Type | Description |
---|---|
TaskLookupKey
|
used to associate a task with a message. |
get_correlatable_keys_from_payload(payload)
async
Get a list of lookup key,value associated with the task(s).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
payload |
Any
|
The lookup key,value. |
required |
Returns:
Type | Description |
---|---|
List[TaskLookupKey]
|
used to associate a tasks with a message. |
get_remaining_tasks(next_dag_id, workflow_instance, tasks=None, end_task_id=None)
async
Get the remaining tasks in the workflow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
next_dag_id |
UUID
|
Current ITask id. |
required |
tasks |
Optional[List[ITask]]
|
List of previous ITasks. Defaults to []. |
None
|
end_task_id |
UUID
|
The task id that the function should stop and return at. Defaults to None (so end of DAG). |
None
|
workflow_instance |
ITemplateDAGInstance
|
The Workflow object |
required |
Returns:
Type | Description |
---|---|
Optional[List[ITask]]
|
List of remaining ITasks appended to inputted list. |
notify(status, workflow_instance)
async
If not completed, runs the steps required for completion by calling on_complete(). This is used to signal a task that it can now complete
Parameters:
Name | Type | Description | Default |
---|---|---|---|
status |
TaskStatus
|
the status of the task to set to when completed |
required |
workflow_instance |
Optional[ITemplateDAGInstance]
|
the Workflow object |
required |
on_complete(workflow_instance, status=TaskStatus(code=TaskStatusEnum.COMPLETED.name, value=TaskStatusEnum.COMPLETED.value), *, iterate=True)
async
Sets the status of the ITask to completed and starts the next ITask if there is one.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow_instance |
Optional[ITemplateDAGInstance]
|
The workflow object |
required |
status |
TaskStatus
|
The status of the task to set to |
TaskStatus(code=TaskStatusEnum.COMPLETED.name, value=TaskStatusEnum.COMPLETED.value)
|
on_message(runtime_parameters, *args, **kwargs)
abstractmethod
async
Defines what to do when the task recieves a message.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
runtime_parameters |
Dict[str, str]
|
The runtime parameters of the task |
required |
Returns:
Type | Description |
---|---|
bool
|
True if the processing succeeds false otherwise |
start(workflow_instance)
abstractmethod
async
Starts the ITask.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow_instance |
Optional[ITemplateDAGInstance]
|
The Workflow instance |
required |
stop(runtime_parameters, workflow_instance=None)
abstractmethod
async
Stops the ITask.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
runtime_parameters |
Dict[str, str]
|
The runtime parameters of the task |
required |
workflow_instance |
ITask
|
The workflow object |
None
|
MonitoringTask
Bases: TriggerTask[KT, VT]
, abc.ABC
A Type of TriggerTask that executes at s specific time and checks on the monitored task to execute some domain specific logic
process_monitored_task(monitored_task, workflow_instance)
abstractmethod
async
Callback on when business logic has to be executed on the monitored task based on the time condition
Parameters:
Name | Type | Description | Default |
---|---|---|---|
monitored_task |
ITask
|
the monitored task |
required |
workflow_instance |
Optional[ITemplateDAGInstance]
|
the workflow object |
required |
Returns:
Type | Description |
---|---|
None
|
None |
IntervalTask
Bases: TriggerTask[KT, VT]
, abc.ABC
A type of Task to Trigger at a trigger time and execute multiple times until the execution completes. The task is retried until the timeout is reached periodically after the trigger time
interval_execute(runtime_parameters)
async
Task to run on an interval until either the trigger end time or until this method returns True.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
runtime_parameters |
Dict[str, VT]
|
The runtime parameters of the task |
required |
Returns:
Type | Description |
---|---|
bool
|
If True, finish this task. |
TriggerTask
Bases: ExecutorTask[KT, VT]
, abc.ABC
This task waits/halts the execution of the DAG until current time >= the trigger time on the task and then invokes the execute method defined by the task
ExecutorTask
Bases: ITask[KT, VT]
, abc.ABC
A simple ITask that executes some domain specific logic
evaluate(**kwargs)
async
Not implemented.
Raises:
Type | Description |
---|---|
NotImplementedError
|
Not implemented. |
on_message(runtime_parameters, *args, **kwargs)
async
Not implemented.
Raises:
Type | Description |
---|---|
NotImplementedError
|
Not implemented. |
DefaultMonitoringTask
DecisionTask
Bases: ITask[KT, VT]
This type of task is similar to the case..switch
statement in a programming language. It returns the next task to
execute based on the execution logic. A decision task needs to implement
execute(runtime_parameters, workflow_instance=None)
async
Not implemented.
Raises: NotImplementedError: Not implemented.
on_message(runtime_parameters, *args, **kwargs)
async
Not implemented.
Raises: NotImplementedError: Not implemented.
SystemTask
Bases: ExecutorTask[str, str]
An internal Task for Dagger bookkeeping
evaluate(**kwargs)
async
Not implemented.
Raises:
Type | Description |
---|---|
NotImplementedError
|
Not implemented. |
get_correlatable_key(payload)
Not implemented.
Raises:
Type | Description |
---|---|
NotImplementedError
|
Not implemented. |
on_complete(workflow_instance, status=TaskStatus(code=TaskStatusEnum.COMPLETED.name, value=TaskStatusEnum.COMPLETED.value), iterate=True)
async
Not implemented.
Raises:
Type | Description |
---|---|
NotImplementedError
|
Not implemented. |
on_message(*args, **kwargs)
async
Not implemented.
Raises:
Type | Description |
---|---|
NotImplementedError
|
Not implemented. |
SystemTimerTask
SensorTask
Bases: ITask[KT, VT]
, abc.ABC
A type of task that halts execution of the workflow until a condition is met. When the condition is met the on_message method on this task is invoked
evaluate(**kwargs)
async
Not implemented.
Raises:
Type | Description |
---|---|
NotImplementedError
|
Not implemented. |
execute(runtime_parameters, workflow_instance=None)
async
Not implemented.
Raises:
Type | Description |
---|---|
NotImplementedError
|
Not implemented. |
IMonitoredTask
Abstract interface to enable monitoring of a task
get_monitoring_task_type()
abstractmethod
Get the TaskType to instantiate to monitor the current task
Returns:
Type | Description |
---|---|
Type[MonitoringTask]
|
The Type of MonitoringTask |
KafkaCommandTask
Bases: ExecutorTask[KT, VT]
, abc.ABC
This task is used to send a request/message on a Kafka Topic defined using the template builder. This type of task is a child task in the execution graph and can be extended by implementing the method
KafkaListenerTask
Bases: SensorTask[KT, VT]
, abc.ABC
This task waits/halts the execution of the DAG until a message is received on the defined Kafka topic(in the template definition). Each task is created using the DAG builder defines a durable key to correlate each received message on the topic against listener tasks. The Engine handles the complexity of invoking the appropriate task instance based on the key in the payload.
INonLeafNodeTask
Bases: ITask[KT, VT]
, abc.ABC
An Abstract class for any Process/SUB_DAG node
stop(runtime_parameters, workflow_instance=None)
async
Stops the ITask.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
runtime_parameters |
Dict[str, str]
|
The runtime parameters of the task |
required |
workflow_instance |
ITask
|
The workflow object |
None
|
TaskOperator
Bases: Enum
An operator for Joining Parallel Tasks
ATLEAST_ONE = 'ATLEAST_ONE'
class-attribute
instance-attribute
Wait for Atleast one of the parallel tasks to reach terminal state to begin execution of the next task in the workflow definition
JOIN_ALL = 'JOIN_ALL'
class-attribute
instance-attribute
Waits for All the parallel tasks to reach terminal state before execution of the next task in the workflow definition
ParallelCompositeTask
Bases: ITask[KT, VT]
, abc.ABC
SUB-DAG Task to execute parallel tasks and wait until all of them are in a terminal state before progressing to the next task This task can be embedded as a child of the root node or a process node
notify(status, workflow_instance=None)
async
If not completed, runs the steps required for completion by calling on_complete().
stop(runtime_parameters, workflow_instance=None)
async
Stops the ITask.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
runtime_parameters |
Dict[str, str]
|
The runtime parameters of the task |
required |
workflow_instance |
ITask
|
The workflow object |
None
|
IProcessTemplateDAGInstance
CorrelatableMapValue
Bases: Record
An internal Class to store the correletable keys and their associated values for SensorTask
ITemplateDAGInstance
Bases: INonLeafNodeTask[KT, VT]
, abc.ABC
A root node implementation of INonLeafNodeTask
stop(runtime_parameters, workflow_instance=None)
async
Stops the ITask.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
runtime_parameters |
Dict[str, str]
|
The runtime parameters of the task |
required |
workflow_instance |
ITask
|
The workflow object |
None
|
MonitoredProcessTemplateDAGInstance
Bases: DefaultProcessTemplateDAGInstance
, IMonitoredTask
Default implementation of a Monitored ProcessTask
on_complete(workflow_instance, status=TaskStatus(code=TaskStatusEnum.COMPLETED.name, value=TaskStatusEnum.COMPLETED.value), iterate=True)
async
Sets the status of the ITask to completed and starts the next ITask if there is one.
Trigger
Bases: Record
Class To store the Trigger data. The time to execute a task
get_trigger_key()
The key to store for the trigger instance
Returns:
Type | Description |
---|---|
Tuple[Optional[UUID], Optional[UUID]]
|
The Key to store |
DAGBuilderHelper
Helper class to Model and connect Task Definition(s) for DAG's
__init__(dagger_app)
build_and_link_processes(process_builder_list)
Helper function to link processes together in a DAG definition
Parameters:
Name | Type | Description | Default |
---|---|---|---|
process_builder_list |
List[Union[ProcessTemplateDagBuilder, IProcessTemplateDAGBuilder]]
|
The list of process definitions to link together |
required |
Returns:
Type | Description |
---|---|
IProcessTemplateDAG
|
The instance of IProcessTemplateDAG |
build_and_link_tasks(tasks_builder_list)
Link the tasks definition's together to define the order of tasks to execute
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks_builder_list |
List[TaskTemplateBuilder]
|
The list of TaskTemplatesBuilders to link togehter defining the chain of tasks |
required |
Returns:
Type | Description |
---|---|
TaskTemplate
|
An instance of the TaskTemplate |
generic_command_task_builder(*, topic, task_type, process_name, key_type=None, value_type=None, key_serializer=None, value_serializer=None)
Helper function to define the KafkCommandTask definition
Parameters:
Name | Type | Description | Default |
---|---|---|---|
topic |
str
|
The name of the topic to write the command to |
required |
task_type |
Type[ITask]
|
The type of KafkaCommandTask |
required |
process_name |
str
|
the name of the process it belongs to |
required |
Returns:
Type | Description |
---|---|
KafkaCommandTaskTemplateBuilder
|
the instance of KafkaCommandTaskTemplateBuilder |
generic_dynamic_process_builder(*, name, max_run_duration=0)
Helper function to build a dynamic process, to be determined at runtime based on the runtime parameters
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
The name of the dynamic process |
required |
max_run_duration |
int
|
The timeout on the process |
0
|
Returns:
Type | Description |
---|---|
DynamicProcessTemplateDagBuilder
|
The DynamicProcessTemplateDagBuilder |
generic_executor_task_builder(*, task_type, name, allow_skip_to=False)
Helper Function to define an executor task
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task_type |
Type[ITask]
|
The type of the Task |
required |
name |
str
|
the Name of the Task |
required |
allow_skip_to |
bool
|
If true, the execution of the DAG can skip execution to this task out of order |
False
|
Returns:
Type | Description |
---|---|
DefaultTaskTemplateBuilder
|
The DefaultTaskTemplateBuilder instance |
generic_listener_task_builder(*, topic, task_type, process_name, allow_skip_to=False, concurrency=1, reprocess_on_message=False, key_type=None, value_type=None, key_serializer=None, value_serializer=None)
Helper function to define a KafkaListenerTaskTemplateBuilder
Parameters:
Name | Type | Description | Default |
---|---|---|---|
topic |
str
|
the topic name the listener listens to |
required |
task_type |
Type[ITask]
|
The type of KafkaListenerTask |
required |
process_name |
str
|
the name of the parent process this task belongs tp |
required |
allow_skip_to |
bool
|
If true, the execution of the DAG can skip execution to this task out of order |
False
|
concurrency |
int
|
Check Concurrency |
1
|
reprocess_on_message |
bool
|
Re-executes the task when the message is received irrespective of the state of the task |
False
|
Returns:
Type | Description |
---|---|
KafkaListenerTaskTemplateBuilder
|
The instance of KafkaListenerTaskTemplateBuilder |
generic_process_builder(*, process_name, root_task, process_type=DefaultProcessTemplateDAGInstance, max_run_duration=0)
Helper function to build a Process Definition
Parameters:
Name | Type | Description | Default |
---|---|---|---|
process_name |
str
|
the Name of the process example 'ORDERS' |
required |
root_task |
TaskTemplate
|
The first task to execute in this process |
required |
process_type |
Type[IProcessTemplateDAGInstance]
|
The class type of the Process |
DefaultProcessTemplateDAGInstance
|
max_run_duration |
int
|
the timeout on the process |
0
|
Returns:
Type | Description |
---|---|
ProcessTemplateDagBuilder
|
The ProcessTemplateDagBuilder |
generic_template(*, template_name, root_process, template_type=DefaultTemplateDAGInstance)
Helper function to define the Workflow Template
Parameters:
Name | Type | Description | Default |
---|---|---|---|
template_name |
str
|
The name of the workflow |
required |
root_process |
IProcessTemplateDAG
|
The first process definition to execute in the workflow |
required |
template_type |
Type[ITemplateDAGInstance]
|
The tye of ITemplateDAGInstance |
DefaultTemplateDAGInstance
|
Returns:
Type | Description |
---|---|
ITemplateDAG
|
The instance of ITemplateDAG |
DaggerError
Bases: Exception
Base-class for all Dagger exceptions.
InvalidTaskType
Bases: DaggerError
Invalid Task Type.
TaskInvalidState
Bases: DaggerError
Invalid Task State.
TemplateDoesNotExist
Bases: DaggerError
Invalid Template Name
InvalidTriggerTimeForTask
Bases: DaggerError
Invalid trigger time