@Higuchi Kokoro
@FukasawaYusuke
Checkpoints Are Not Durable Execution
LangGraph … チェックポイントをどのようにマネージするかは実装した本人が気をつけるしかない(Checkpointers That Require You to Be the Orchestrator)
config = {"configurable": {"thread_id": "workflow-123"}}
try:
result = graph.invoke({"query": "process this"}, config=config)
except Exception as e:
# You detected the failure. Now you resume manually.
result = graph.invoke(None, config=config)
CrewAI … ただし「自律でない」エージェントだけ、しかも復旧処理は自前実装が前提
リプレイ機能あり
crewai replay -t <task_id>
二重実行などを防止する機能あり(SQLite に成功した状態を保存する)
@persist
class MyFlow(Flow[MyState]):
@start()
def step_one(self):
self.state.counter = 1
@listen(step_one)
def step_two(self):
self.state.intermediate_results["step2"] = "result"
human in the loop もできる
flow = MyFlow.from_pending(flow_id="abc123")
result = flow.resume(feedback="approved")
何が問題か?
@persist ****は自動復帰機能ではない
各ステップ成功後に状態は保存されますが、プロセスがクラッシュしてもフローを再起動してくれる存在はいない。
フレームワーク側が自動で完了済みステップをスキップしてくれるわけではなく、すべてのメソッドに条件分岐ロジックを追加しなければいけないのはあなた!!
@listen(step_one)
def step_two(self):
if self.state.step_completed >= 2:
return # このスキップ処理は自分で書かないといけない
# ... 実際の処理 ...
分散実行なし
google-adk … Event Sourcing Without the Orchestrator(嫌な予感が…)
まず前提として… adk は一番洗練されている!✨️ とのこと
Google ADK's session management is the most architecturally sophisticated of the three, built on an event-sourcing model. Every interaction
1.14 から resume 機能が追加。復帰させたい id を指定すればオッケー(?)
from google.adk.app import App
from google.adk.runtime import ResumabilityConfig
app = App(
name='my_resumable_agent',
root_agent=root_agent,
resumability_config=ResumabilityConfig(is_resumable=True),
)
runner.run_async(
user_id='u_123',
session_id='s_abc',
invocation_id='invocation-123'
)
何が問題か?
開発者は、ただの直線的な通常コードとしてワークフローを書くだけ
def order_processing_workflow(ctx, order):
# Each await is automatically a durable checkpoint
inventory = yield ctx.call_activity(check_inventory, input=order)
payment = yield ctx.call_activity(process_payment, input=order)
shipment = yield ctx.call_activity(arrange_shipping, input=order)
return shipment
どのワークフローステップを実行する前にも、ランタイムは「耐久リマインダ」を作成。プロセスや Dapr、さらにはクラスター全体がクラッシュしても、そのリマインダが自動的にワークフローを再アクティブ化し、人手も外部システムも介さずに無期限にリトライする(それはそれで怖くない?)