Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
3051548
LangGraph: Add Phase 1 prototype structure
mfateev Dec 25, 2025
56394b0
LangGraph: Implement Pregel loop submit injection prototype
mfateev Dec 25, 2025
8d1409f
LangGraph: Use internal constant import to avoid deprecation warning
mfateev Dec 25, 2025
1ac7158
Fix write capture test for PregelExecutableTask type
mfateev Dec 25, 2025
c5c9896
Add task interface prototype and tests
mfateev Dec 25, 2025
4e078c7
Add serialization prototype using Temporal data converters
mfateev Dec 25, 2025
5521520
LangGraph: Add Phase 1 validation prototypes before cleanup
mfateev Dec 25, 2025
b12a24e
LangGraph: Remove prototype files after Phase 2 implementation
mfateev Dec 25, 2025
7a29282
LangGraph: Implement Phase 3 and 4 - Activity write capture and per-n…
mfateev Dec 25, 2025
6d72b9c
LangGraph: Fix conditional edge routing and use AsyncPregelLoop
mfateev Dec 25, 2025
5feff61
LangGraph: Remove prototype test files
mfateev Dec 25, 2025
0f555e1
LangGraph: Execute tasks in parallel within each tick
mfateev Dec 25, 2025
5ed4b86
LangGraph: Implement native interrupt API with comprehensive tests
mfateev Dec 25, 2025
13ef23c
LangGraph: Fix multi-interrupt handling and add e2e tests
mfateev Dec 25, 2025
60ed486
LangGraph: Add checkpoint and should_continue APIs for continue-as-new
mfateev Dec 25, 2025
4e4301f
LangGraph: Add Store support for cross-node persistence
mfateev Dec 26, 2025
83e90e5
LangGraph: Add e2e tests for Store functionality
mfateev Dec 26, 2025
1f49320
LangGraph: Add Send API support and validation tests
mfateev Dec 26, 2025
2d5738e
LangGraph: Add design docs for interrupt and store APIs
mfateev Dec 26, 2025
3f7c044
LangGraph: Remove design docs (preserved in git history)
mfateev Dec 26, 2025
535851a
LangGraph: Add user-facing README documentation
mfateev Dec 26, 2025
5b0231e
LangGraph: Add temporal_node_metadata() helper for typed activity opt…
mfateev Dec 26, 2025
3d54aba
LangGraph: Use temporal_node_metadata() for compile() defaults
mfateev Dec 26, 2025
788e5cc
LangGraph: Rename API for consistency
mfateev Dec 26, 2025
904c566
LangGraph: Add plugin-level activity options
mfateev Dec 26, 2025
cf45004
LangGraph: Add temporal_tool() and temporal_model() for durable agent…
mfateev Dec 26, 2025
b518249
LangGraph: Add create_agent support and temporal_node_metadata() helper
mfateev Dec 26, 2025
bda9006
LangGraph: Reorganize tests and fix sandbox graph building
mfateev Dec 26, 2025
be1d36e
LangGraph: Add experimental warnings and improve documentation
mfateev Dec 27, 2025
afbd452
LangGraph: Document internal API usage with detailed rationale
mfateev Dec 27, 2025
87f6b9b
LangGraph: Add logging infrastructure
mfateev Dec 27, 2025
6bf1456
LangGraph: Remove warning suppression by using internal imports directly
mfateev Dec 27, 2025
ec44b14
LangGraph: Add domain-specific exceptions with ApplicationError
mfateev Dec 27, 2025
ea808c4
LangGraph: Tidy docstrings to be precise and concise
mfateev Dec 27, 2025
7e98835
LangGraph: Remove enable_workflow_execution compile parameter
mfateev Dec 27, 2025
720667e
LangGraph: Rename activities and add meaningful summaries for UI
mfateev Dec 27, 2025
ed1da9a
LangGraph: Run __start__ node inline in workflow
mfateev Dec 27, 2025
7a3121a
LangGraph: Use ClientConfig for example connection setup
mfateev Dec 27, 2025
02eb922
LangGraph: Add sandbox passthrough for pydantic_core and langchain_core
mfateev Dec 27, 2025
dc1d2e3
LangGraph: Align with SDK style conventions
mfateev Dec 27, 2025
386d9b1
LangGraph: Remove example.py from module
mfateev Dec 27, 2025
a86d651
LangGraph: Implement bind_tools for temporal_model
mfateev Dec 27, 2025
406acc9
LangGraph: Document bind_tools support in README
mfateev Dec 27, 2025
bf072c4
LangGraph: Add summary to temporal_tool activity
mfateev Dec 27, 2025
8952549
LangGraph: Show tool names in activity summary for tools node
mfateev Dec 27, 2025
1f010d9
LangGraph: Simplify activity names and add metadata description support
mfateev Dec 27, 2025
54ee95f
LangGraph: Add create_durable_agent and create_durable_react_agent fu…
mfateev Dec 27, 2025
01216c6
LangGraph: Document create_durable_agent and create_durable_react_agent
mfateev Dec 27, 2025
f69c100
LangGraph: Rename node_activity_options to activity_options
mfateev Dec 27, 2025
ee7dbd0
LangGraph: Fix temporal_model deepcopy issue with HTTP clients
mfateev Dec 27, 2025
68d7370
LangGraph: Remove temporal_model and temporal_tool wrappers
mfateev Dec 27, 2025
212a80a
LangGraph: Update README to reflect simplified architecture
mfateev Dec 27, 2025
b53c66a
LangGraph: Document create_agent as preferred over deprecated create_…
mfateev Dec 27, 2025
c97eede
LangGraph: Add langchain test dependency and use create_react_agent i…
mfateev Dec 27, 2025
6921021
LangGraph: Fix state reading for create_agent routing edges
mfateev Dec 28, 2025
1e87448
LangGraph: Improve activity summaries for model nodes
mfateev Dec 28, 2025
dc2dd26
LangGraph: Execute subgraph inner nodes as separate activities
mfateev Dec 28, 2025
c1edae9
LangGraph: Fix subgraph routing and add unit tests
mfateev Dec 28, 2025
62b245c
LangGraph: Fix node execution to always use ainvoke
mfateev Dec 28, 2025
1f1c209
LangGraph: Handle ParentCommand for supervisor multi-agent routing
mfateev Dec 29, 2025
8c0f3ae
LangGraph: Update README to reference create_agent
mfateev Dec 29, 2025
517c01d
LangGraph: Classify node errors as retryable or non-retryable
mfateev Dec 29, 2025
97e4312
LangGraph: Execute Send packets in parallel using asyncio.gather
mfateev Dec 29, 2025
82f0725
LangGraph: Extract query from input state for activity summaries
mfateev Dec 29, 2025
2193a41
Add .mypy_cache to .gitignore
mfateev Dec 29, 2025
63dcfdf
LangGraph: Refactor ainvoke and _execute_subgraph into smaller methods
mfateev Dec 29, 2025
fb46f18
LangGraph: Group instance variables into state dataclasses
mfateev Dec 29, 2025
5576183
LangGraph: Extract magic strings to constants module
mfateev Dec 29, 2025
dbb1821
LangGraph: Extract nested functions from _execute_node_impl
mfateev Dec 29, 2025
4cc4ff8
Update CODE_REVIEW.md to mark completed refactoring items
mfateev Dec 29, 2025
470d916
LangGraph: Fix linting issues (import order and formatting)
mfateev Dec 30, 2025
d80affd
Fix lint errors: add type annotations and docstrings
mfateev Dec 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.venv
__pycache__
.mypy_cache
/build
/dist
temporalio/bridge/target/
Expand Down
277 changes: 277 additions & 0 deletions CODE_REVIEW.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
# LangGraph Temporal Plugin - Code Review

**Date:** 2025-12-29
**Reviewer:** Claude Code
**Scope:** Full codebase review of `temporalio/contrib/langgraph/`

---

## Executive Summary

The LangGraph plugin is a **well-designed integration** that maps LangGraph's computational model onto Temporal's durable execution model. The architecture is sound with clear separation of concerns. The implementation successfully supports most LangGraph features including interrupts, Store API, Send API, Command API, and subgraphs.

**Overall Rating:** Good with minor improvements recommended

---

## Architecture Overview

### Module Structure

| Module | Purpose | Lines | Assessment |
|--------|---------|-------|------------|
| `_plugin.py` | Plugin registration, worker setup | ~100 | Clean |
| `_graph_registry.py` | Graph storage, lookup | ~130 | Clean |
| `_runner.py` | Main orchestration logic | ~1200 | Complex but necessary |
| `_activities.py` | Node execution activities | ~430 | Well-structured |
| `_models.py` | Data transfer objects | ~320 | Good dataclass usage |
| `_exceptions.py` | Error classification | ~170 | Comprehensive |
| `_store.py` | Activity-local store | ~100 | Simple, effective |
| `__init__.py` | Public API | ~190 | Well-documented |

### Key Design Decisions

1. **Activities as Node Executors**: Each graph node runs as a Temporal activity, providing durability and retry semantics. This is the correct architectural choice.

2. **AsyncPregelLoop Integration**: The runner uses LangGraph's internal `AsyncPregelLoop` for graph traversal, ensuring compatibility with native LangGraph behavior.

3. **Plugin-based Registration**: Graphs are registered via `LangGraphPlugin` and stored in a global registry, allowing compile-time lookup within workflows.

4. **Store Snapshot Pattern**: Store data is snapshotted before each activity and writes are tracked/merged back - enables cross-node persistence without shared state.

---

## Strengths

### 1. Clean Separation of Concerns
- `_plugin.py` handles Temporal integration (activities, data converter, sandbox)
- `_runner.py` handles workflow-side orchestration
- `_activities.py` handles activity-side execution
- `_models.py` defines serializable DTOs

### 2. Comprehensive Error Classification (`_exceptions.py:13-97`)
```python
def is_non_retryable_error(exc: BaseException) -> bool:
```
The error classifier correctly identifies:
- Non-retryable: `TypeError`, `ValueError`, `AuthenticationError`, 4xx HTTP errors
- Retryable: Rate limits (429), network errors, 5xx server errors

This ensures proper retry behavior for different failure modes.

### 3. Rich Activity Summaries (`_runner.py:~64-185`)
Activity summaries extract meaningful context:
- Tool calls from messages
- Model names from chat models
- Last human query for context
- Node descriptions from metadata

This significantly improves workflow observability in the Temporal UI.

### 4. Robust Interrupt Handling
The interrupt/resume flow is well-implemented:
- `_pending_interrupt` tracks interrupt state
- `_interrupted_node_name` enables targeted resume
- `_completed_nodes_in_cycle` prevents re-execution after resume
- Resume values flow through `PregelScratchpad`

### 5. Parallel Send Execution (`_runner.py:866-999`)
Send packets now execute in parallel using `asyncio.gather`, with proper phase separation:
1. Prepare all activity inputs (deterministic step counter assignment)
2. Execute all activities in parallel
3. Process results sequentially (handle interrupts, parent commands)

### 6. Comprehensive Feature Support
The integration supports:
- Interrupts/resume via `interrupt()` and `Command(resume=...)`
- Store API via `ActivityLocalStore`
- Send API for dynamic parallelism
- Command API for navigation
- Subgraphs with automatic flattening
- Continue-as-new via `get_state()`/checkpoint

---

## Areas for Improvement

### 1. ~~Long Methods in `_runner.py`~~ ✅ COMPLETED

**Issue:** `ainvoke()` is ~215 lines, `_execute_subgraph()` is ~175 lines.

**Resolution:** Refactored into smaller methods:
- `_prepare_resume_input()` - Handle resume/Command input
- `_create_pregel_loop()` - Create and configure the Pregel loop
- `_execute_loop()` - Main execution loop with tick processing
- `_process_tick_tasks()` - Process tasks from a single tick
- `_execute_regular_tasks()` - Execute regular node tasks
- `_execute_send_packets()` - Execute Send packet tasks in parallel
- `_finalize_output()` - Prepare final output with interrupt/checkpoint handling

### 2. ~~Many Instance Variables in `TemporalLangGraphRunner`~~ ✅ COMPLETED

**Issue:** The class has ~20 instance variables tracking various state.

**Resolution:** Grouped into two dataclasses in `_runner.py`:
```python
@dataclass
class InterruptState:
interrupted_state: dict[str, Any] | None = None
interrupted_node_name: str | None = None
resume_value: Any | None = None
resume_used: bool = False
is_resume_invocation: bool = False
pending_interrupt: InterruptValue | None = None

@dataclass
class ExecutionState:
step_counter: int = 0
invocation_counter: int = 0
completed_nodes_in_cycle: set[str] = field(default_factory=set)
resumed_node_writes: dict[str, list[tuple[str, Any]]] = field(default_factory=dict)
last_output: dict[str, Any] | None = None
pending_parent_command: Any | None = None
store_state: dict[tuple[tuple[str, ...], str], dict[str, Any]] = field(default_factory=dict)
```

Now accessed via `self._interrupt.*` and `self._execution.*`.

### 3. ~~Magic Strings Could Be Constants~~ ✅ COMPLETED

**Issue:** String literals like `"__start__"`, `"tools"`, `"__interrupt__"`, `"__checkpoint__"` appear throughout.

**Resolution:** Created `_constants.py` with:
```python
START_NODE = "__start__"
TOOLS_NODE = "tools"
INTERRUPT_KEY = "__interrupt__"
CHECKPOINT_KEY = "__checkpoint__"
BRANCH_PREFIX = "branch:"
MODEL_NODE_NAMES = frozenset({"agent", "model", "llm", "chatbot", "chat_model"})
MODEL_NAME_ATTRS = ("model_name", "model")
```

### 4. ~~Nested Functions in `_execute_node_impl`~~ ✅ COMPLETED

**Issue:** `_execute_node_impl` contains 5 nested functions.

**Resolution:** Extracted to module level in `_activities.py`:
- `_convert_messages_if_needed()` - Module-level pure function
- `_merge_channel_value()` - Module-level pure function
- `StateReader` class - Encapsulates state reading logic
- `_get_null_resume()` - Module-level function

Only `_interrupt_counter()` remains nested (requires mutable state capture).

### 5. Type Annotations Could Be More Specific

**Issue:** Some `Any` types could be narrowed:
```python
per_node_activity_options: dict[str, dict[str, Any]] # inner dict structure is known
checkpoint: dict | None # could be StateSnapshot | dict | None
```

**Recommendation:** Use more specific types or TypedDict where the structure is known.

---

## Test Coverage Assessment

### Current Tests

| Test File | Tests | Coverage |
|-----------|-------|----------|
| `test_e2e.py` | 14 | Basic execution, interrupts, store, advanced features, agents |
| `test_runner.py` | 39 | Activity summary, model extraction, compile, error retryability, parallel sends |
| `test_activities.py` | ~10 | Node execution, interrupts, parent commands |
| `test_models.py` | ~15 | Data model serialization |
| `test_store.py` | ~10 | Store operations |
| `test_plugin.py` | ~5 | Plugin registration |
| `test_registry.py` | ~5 | Graph registry |

### Coverage Gaps

1. **Edge Cases:**
- Workflow cancellation during activity execution
- Very large state serialization
- Deep subgraph nesting (>3 levels)

2. **Error Scenarios:**
- Activity timeout during interrupt
- Store write conflicts
- Graph definition changes between invocations

3. **Performance:**
- No load tests for high-parallelism Send patterns
- No benchmarks for large state checkpointing

---

## Security Considerations

### Positive

1. **Sandbox passthrough is limited:** Only `pydantic_core`, `langchain_core`, `annotated_types` are passed through.

2. **Config filtering:** Internal LangGraph keys (`__pregel_*`, `__lg_*`) are stripped before serialization.

3. **No arbitrary code execution:** Node functions are registered at plugin init, not deserialized.

### Recommendations

1. **Input validation:** Consider validating `graph_id` format in `compile()` to prevent injection attacks via workflow inputs.

2. **State size limits:** Consider adding configurable limits on serialized state size to prevent memory issues.

---

## Documentation Quality

### Strengths

- Comprehensive README with examples
- Good docstrings on public API (`__init__.py`)
- MISSING_FEATURES.md provides clear status tracking
- Experimental warnings are clearly noted

### Gaps

- Internal architecture documentation could be added (class diagrams, sequence diagrams)
- Contributing guidelines not present
- Changelog/versioning not formalized

---

## Recommendations Summary

### High Priority

1. ~~**Refactor `ainvoke` and `_execute_subgraph`** into smaller, testable methods~~ ✅ DONE
2. ~~**Group instance variables** into state dataclasses for better organization~~ ✅ DONE

### Medium Priority

3. ~~**Extract magic strings** to a constants module~~ ✅ DONE
4. **Add integration tests** for cancellation and timeout scenarios
5. **Add more specific type annotations**

### Low Priority

6. ~~**Extract nested functions** from `_execute_node_impl`~~ ✅ DONE
7. **Add architecture documentation** with diagrams
8. **Add load/performance tests** for Send API patterns

---

## Conclusion

The LangGraph plugin is a solid implementation that correctly integrates LangGraph's graph execution model with Temporal's durable execution. The code is functional, well-tested for core scenarios, and provides good observability.

**Update (2025-12-29):** The major code organization improvements have been completed:
- ✅ Long methods refactored into smaller, testable functions
- ✅ Instance variables grouped into `InterruptState` and `ExecutionState` dataclasses
- ✅ Magic strings extracted to `_constants.py` module
- ✅ Nested functions extracted from `_execute_node_impl`

Remaining items are lower priority (integration tests, type annotations, documentation).

**Verdict:** Ready for experimental use with improved maintainability.
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ dev = [
"openai-agents[litellm]>=0.3,<0.7; python_version < '3.14'",
"googleapis-common-protos==1.70.0",
"pytest-rerunfailures>=16.1",
# LangGraph integration tests
"langchain>=1.2.0,<2",
"langgraph>=1.0.0,<2",
]

[tool.poe.tasks]
Expand Down Expand Up @@ -136,6 +139,8 @@ exclude = [
# Ignore generated code
'temporalio/api',
'temporalio/bridge/proto',
# Ignore separate repos/worktrees
'crew-ai',
]

[tool.pydocstyle]
Expand Down
Loading
Loading