Execution Patterns
Learn how to get results and control workflow execution.
Understanding Task Results
When tasks return values, Graflow stores them in the channel using the task's task_id:
# Auto-generated task_id (function name)
@task
def calculate():
return 42
# Stored as: channel.set("calculate.__result__", 42)
# Access: ctx.get_result("calculate") → 42
# Custom task_id
task1 = calculate(task_id="calc1")
task2 = calculate(task_id="calc2")
# Stored as: channel.set("calc1.__result__", 42)
# channel.set("calc2.__result__", 42)
# Access: ctx.get_result("calc1"), ctx.get_result("calc2")
Result storage format: {task_id}.__result__
Pattern 1: Get Final Result
with workflow("simple") as wf:
@task
def compute():
return 42
result = wf.execute()
print(result) # 42 (last task's return value)
Pattern 2: Get All Results
Get results from all tasks using execution context:
with workflow("all_results") as wf:
@task
def task_a():
return "A"
@task
def task_b():
return "B"
task_a >> task_b
# Get execution context to access all results
_, ctx = wf.execute(ret_context=True)
# Access individual task results
print(ctx.get_result("task_a")) # Output: A
print(ctx.get_result("task_b")) # Output: B
Key Points:
ret_context=Truereturns tuple:(final_result, execution_context)- Use
ctx.get_result(task_id)to get any task's result - Results are automatically stored when tasks return values
Pattern 3: Start from Specific Task
Auto-Detection (No argument)
When you call wf.execute() without arguments, Graflow automatically finds the start node:
with workflow("auto_start") as wf:
@task
def step1():
print("Step 1")
@task
def step2():
print("Step 2")
step1 >> step2
# Auto-detects step1 (node with no predecessors)
wf.execute()
How auto-detection works:
- Finds all nodes with no incoming edges (no predecessors)
- If exactly one node found → use it as start node
- If none found → raises
GraphCompilationError - If multiple found → raises
GraphCompilationError
Multiple Entry Points
with workflow("ambiguous") as wf:
@task
def task_a():
print("A")
@task
def task_b():
print("B")
@task
def task_c():
print("C")
# Two separate chains - two entry points!
task_a >> task_c
task_b >> task_c
# ERROR: Multiple start nodes found (task_a and task_b)
# wf.execute() # Raises GraphCompilationError
# Solution: Specify start node explicitly
wf.execute(start_node="task_a")
Manual Start Node
Skip earlier tasks by specifying a start node:
with workflow("skip") as wf:
@task
def step1():
print("Step 1")
@task
def step2():
print("Step 2")
@task
def step3():
print("Step 3")
step1 >> step2 >> step3
# Start from step2 (skip step1)
wf.execute(start_node="step2")
Output:
Step 2
Step 3
Key Takeaways:
wf.execute()auto-detects start node (node with no predecessors)- Raises error if zero or multiple start nodes found
wf.execute(start_node="task_id")explicitly sets start pointwf.execute(ret_context=True)returns(result, context)- Use
ctx.get_result(task_id)to get specific task results