Skip to main content

Tasks

Tasks are the building blocks of workflows in Graflow — Python functions decorated with @task.

Defining Tasks

The @task Decorator

Convert any Python function into a Graflow task:

from graflow.core.decorators import task

@task
def hello():
"""A simple task."""
print("Hello, Graflow!")
return "success"

What happens:

  • @task converts a regular function into a Graflow task
  • The task can be used in workflows or executed directly
  • The function name becomes the default task ID

Custom Task IDs

Specify a custom ID for the task:

# Default: task_id is "hello"
@task
def hello():
print("Hello!")

# Custom: task_id is "greeting_task"
@task(task_id="greeting_task")
def hello():
print("Hello!")

Type Hints

Always use type hints for better IDE support and documentation:

@task
def calculate(x: int, y: int) -> int:
"""Add two numbers."""
return x + y

Task Instances

Create multiple instances from one task definition with different parameters.

The Problem

Without task instances, you need to duplicate code:

# Repetitive - avoid this
@task
def fetch_tokyo():
return fetch("Tokyo")

@task
def fetch_paris():
return fetch("Paris")

The Solution

Create task instances with bound parameters:

@task
def fetch_weather(city: str) -> str:
return f"Weather for {city}"

# Create instances with different parameters
tokyo = fetch_weather(task_id="tokyo", city="Tokyo")
paris = fetch_weather(task_id="paris", city="Paris")
london = fetch_weather(task_id="london", city="London")

with workflow("weather") as wf:
# Use instances in workflow
tokyo >> paris >> london
wf.execute()

Auto-Generated Task IDs

Omit task_id for auto-generated unique IDs:

@task
def process(value: int) -> int:
return value * 2

# Auto-generated IDs: process_{random_uuid}
task1 = process(value=10) # task_id: process_a3f2b9c1
task2 = process(value=20) # task_id: process_b7e8f4d2

Unique Task ID Requirement

Each task instance must have a unique task_id:

# Good: Unique task_ids
tokyo = fetch_weather(task_id="tokyo", city="Tokyo")
paris = fetch_weather(task_id="paris", city="Paris")

# Bad: Duplicate task_ids cause conflicts
task1 = fetch_weather(task_id="fetch", city="Tokyo")
task2 = fetch_weather(task_id="fetch", city="Paris") # ERROR!

# Good: Auto-generated IDs are always unique
task1 = fetch_weather(city="Tokyo") # Auto: fetch_weather_a3f2b9c1
task2 = fetch_weather(city="Paris") # Auto: fetch_weather_b7e8f4d2

Testing Tasks

Direct Execution with .run()

Test tasks in isolation using .run():

@task
def calculate(x: int, y: int) -> int:
"""Add two numbers."""
return x + y

# Test the task directly
result = calculate.run(x=5, y=3)
print(result) # Output: 8

When to use .run():

  • Unit testing individual tasks
  • Quick verification of task logic
  • Debugging task behavior
  • Not for production workflows (use workflow.execute())

Testing with Parameters

@task
def process_data(data: list[int], multiplier: int = 2) -> list[int]:
"""Process data with a multiplier."""
return [x * multiplier for x in data]

# Test with different parameters
result1 = process_data.run(data=[1, 2, 3])
print(result1) # [2, 4, 6]

result2 = process_data.run(data=[1, 2, 3], multiplier=3)
print(result2) # [3, 6, 9]

Dependency Injection

Graflow provides automatic dependency injection for common resources.

Context Injection

Access channels, results, and workflow control:

from graflow.core.context import TaskExecutionContext

@task(inject_context=True)
def my_task(ctx: TaskExecutionContext, value: int):
# Access channel
channel = ctx.get_channel()
channel.set("result", value * 2)

# Access session info
print(f"Session: {ctx.session_id}")

# Access other task results
previous = ctx.get_result("previous_task")

return value * 2

LLM Client Injection

Direct LLM API calls without agent loops:

from graflow.llm.client import LLMClient

@task(inject_llm_client=True)
def analyze_text(llm: LLMClient, text: str) -> str:
response = llm.completion_text(
messages=[{"role": "user", "content": f"Analyze: {text}"}],
model="gpt-4o-mini"
)
return response

LLM Agent Injection

Full-featured agent with ReAct loops and tools:

from graflow.llm.agents.base import LLMAgent

# Register agent first
context.register_llm_agent("supervisor", my_agent)

@task(inject_llm_agent="supervisor")
def supervise_task(agent: LLMAgent, query: str) -> str:
result = agent.run(query)
return result["output"]

Injection Summary

Injection TypeParameterUse Case
inject_context=Truectx: TaskExecutionContextChannels, workflow control, results
inject_llm_client=Truellm: LLMClientSimple LLM API calls
inject_llm_agent="name"agent: LLMAgentComplex agent tasks with tools

Parameter Resolution

When resolving parameters, Graflow uses this priority (highest wins):

Injection > Bound > Channel
(ctx) (task_id) (initial_channel)

Example:

@task
def calculate(value: int, multiplier: int) -> int:
return value * multiplier

# Bind value=10, multiplier from channel
task = calculate(task_id="calc", value=10)

wf.execute(initial_channel={"value": 100, "multiplier": 5})
# Result: 10 × 5 = 50 (bound value=10 beats channel value=100)

Best Practices

Use Task Instances for Reusability

# Good - Reusable task definition
@task
def fetch_data(source: str):
return fetch(source)

api = fetch_data(task_id="api", source="api")
db = fetch_data(task_id="db", source="database")

# Avoid - Duplicated definitions
@task
def fetch_api():
return fetch("api")

@task
def fetch_db():
return fetch("database")

Inject Context Only When Needed

# Simple computation - no context needed
@task
def add(x: int, y: int) -> int:
return x + y

# Inter-task communication - needs context
@task(inject_context=True)
def share_data(ctx: TaskExecutionContext, value: int):
ctx.get_channel().set("shared", value)

Use Descriptive Task IDs

# Good - Clear and descriptive
fetch_user_profile = fetch(task_id="fetch_user_profile")
validate_email = validate(task_id="validate_email")

# Avoid - Generic names
task1 = fetch(task_id="t1")
task2 = validate(task_id="t2")

Next Steps

  • Learn about Channels for inter-task communication
  • Explore Execution patterns for dynamic control flow