Skip to main content

Channels and Context

Learn inter-task communication, dependency injection, and prompt management.

Channel Backends

Graflow supports two backends for seamless local-to-distributed transition:

1. MemoryChannel (Default) - For local execution:

  • Fast: In-memory, minimal latency
  • Simple: No infrastructure required
  • Checkpoint-compatible: Auto-saved with checkpoints
  • Limitation: Single process only

2. RedisChannel - For distributed execution:

  • Distributed: Share state across workers/machines
  • Persistent: Redis persistence for fault tolerance
  • Scalable: Consistent data across many workers
  • Requires: Redis server

Switching backends:

# Local execution (default) - uses MemoryChannel
with workflow("local") as wf:
task_a >> task_b
wf.execute()

# Distributed execution - uses RedisChannel
from graflow.channels.factory import ChannelFactory, ChannelBackend

channel = ChannelFactory.create_channel(
backend=ChannelBackend.REDIS,
redis_client=redis_client
)

with workflow("distributed") as wf:
task_a >> task_b
wf.execute()

Working with Channels

Basic Channel: ctx.get_channel()

Access the basic channel for simple key-value storage:

@task(inject_context=True)
def producer(ctx: TaskExecutionContext):
"""Write data to channel."""
channel = ctx.get_channel()

# Store simple values
channel.set("user_id", "user_123")
channel.set("score", 95.5)
channel.set("active", True)

# Store complex objects
channel.set("user_profile", {
"name": "Alice",
"email": "alice@example.com",
"age": 30
})

@task(inject_context=True)
def consumer(ctx: TaskExecutionContext):
"""Read data from channel."""
channel = ctx.get_channel()

# Retrieve values
user_id = channel.get("user_id") # "user_123"
score = channel.get("score") # 95.5
active = channel.get("active") # True
profile = channel.get("user_profile") # dict

# With default value
setting = channel.get("setting", default="default_value")

Channel Methods:

MethodDescriptionExample
set(key, value)Store a valuechannel.set("count", 42)
set(key, value, ttl)Store with expiration (seconds)channel.set("temp", 100, ttl=300)
get(key)Retrieve a valuevalue = channel.get("count")
get(key, default)Retrieve with fallbackvalue = channel.get("count", default=0)
append(key, value)Append to listchannel.append("logs", "entry")
prepend(key, value)Prepend to listchannel.prepend("queue", "item")
delete(key)Remove a keychannel.delete("count")
exists(key)Check if key existsif channel.exists("count"):

List Operations

Channels support list operations for collecting multiple values:

@task(inject_context=True)
def collect_logs(ctx: TaskExecutionContext):
channel = ctx.get_channel()

# Append to end of list (FIFO queue)
channel.append("logs", "Log entry 1")
channel.append("logs", "Log entry 2")
channel.append("logs", "Log entry 3")

logs = channel.get("logs")
print(logs) # ["Log entry 1", "Log entry 2", "Log entry 3"]

@task(inject_context=True)
def use_stack(ctx: TaskExecutionContext):
channel = ctx.get_channel()

# Prepend to beginning of list (LIFO stack)
channel.prepend("stack", "First")
channel.prepend("stack", "Second")
channel.prepend("stack", "Third")

stack = channel.get("stack")
print(stack) # ["Third", "Second", "First"]

Time-to-Live (TTL)

Use TTL to automatically expire temporary data:

@task(inject_context=True)
def cache_data(ctx: TaskExecutionContext):
channel = ctx.get_channel()

# Cache for 5 minutes (300 seconds)
channel.set("api_response", {"data": "..."}, ttl=300)

# Temporary flag expires in 60 seconds
channel.set("processing", True, ttl=60)

TTL Behavior:

  • TTL is in seconds
  • Key expires and is automatically deleted after TTL
  • Calling get() on expired key returns None (or default value)

Type-Safe Channel

Use typed channels for compile-time type checking and IDE autocomplete:

from typing import TypedDict

class UserProfile(TypedDict):
user_id: str
name: str
email: str
age: int
premium: bool

@task(inject_context=True)
def collect_user_data(ctx: TaskExecutionContext):
typed_channel = ctx.get_typed_channel(UserProfile)

# IDE autocompletes fields!
user_profile: UserProfile = {
"user_id": "user_123",
"name": "Alice",
"email": "alice@example.com",
"age": 30,
"premium": True
}

typed_channel.set("current_user", user_profile)

Dependency Injection

Graflow provides three types of dependency injection:

1. Context Injection

@task(inject_context=True)
def my_task(ctx: TaskExecutionContext, value: int):
channel = ctx.get_channel()
channel.set("result", value * 2)
return value * 2

2. LLM Client Injection

from graflow.llm.client import LLMClient

@task(inject_llm_client=True)
def analyze_text(llm: LLMClient, text: str) -> str:
response = llm.completion_text(
messages=[{"role": "user", "content": f"Analyze: {text}"}],
model="gpt-4o-mini"
)
return response

3. LLM Agent Injection

from graflow.llm.agents.base import LLMAgent

# First, register the agent in workflow
context.register_llm_agent("supervisor", my_agent)

# Then inject into task
@task(inject_llm_agent="supervisor")
def supervise_task(agent: LLMAgent, query: str) -> str:
result = agent.run(query)
return result["output"]
Injection TypeParameterUse Case
inject_context=Truectx: TaskExecutionContextChannels, workflow control, results
inject_llm_client=Truellm: LLMClientSimple LLM API calls
inject_llm_agent="name"agent: LLMAgentComplex agent tasks with tools

Prompt Management

Use PromptManagerFactory to create a prompt manager:

from pathlib import Path
from graflow.prompts.factory import PromptManagerFactory

# Create YAML-based prompt manager
prompts_dir = Path(__file__).parent / "prompts"
pm = PromptManagerFactory.create("yaml", prompts_dir=str(prompts_dir))

# Or create Langfuse-based prompt manager
pm = PromptManagerFactory.create("langfuse")

# Pass to workflow
with workflow("my_workflow", prompt_manager=pm) as ctx:
...

Using prompts in tasks:

@task(inject_context=True)
def greet(ctx: TaskExecutionContext) -> str:
pm = ctx.prompt_manager

# Get text prompt and render
prompt = pm.get_text_prompt("greeting")
return prompt.render(name="Alice", product="Graflow")

@task(inject_context=True)
def generate_conversation(ctx: TaskExecutionContext) -> list:
pm = ctx.prompt_manager

# Get chat prompt for LLM APIs
prompt = pm.get_chat_prompt("assistant")
messages = prompt.render(domain="Python", task="debugging")
return messages