Description
Every ingestion plan-group / unmatched-cluster failure in Datadog over the last 24h has logged:
❌ Plan group processing failed with exception: Failed to resolve toolkit for tool slug 'X' (API error)
❌ Task failed for item=<unknown>: Failed to resolve toolkit for tool slug 'X' (API error)
The <unknown> is wrong — it appears for every failure regardless of input.
Root cause. run_with_concurrency_limit in app_tester/rube_learning/ingestion_pipeline/ingestion_utils.py was tracking which item produced which task via a {asyncio.Task: item} dict and looking it up from the iterator returned by asyncio.as_completed(tasks). From Python 3.10+, asyncio.as_completed yields wrapper coroutines, not the original Task objects, so task_to_item.get(fut) always returns None and the log line falls back to <unknown>. Verified locally:
fut type: <class 'coroutine'>
fut in dict: False
Fix. Carry the originating item alongside the result/exception inside the worker, then unpack it after as_completed yields. External behavior is preserved: the function still returns a list of values + Exception objects in completion order; only the failure log line changes from item=<unknown> to item=<actual repr>.
This is a pure observability fix — it does not prevent the failures themselves. The matching retry/backoff fix is PR #1405 (already approved by @dhawal1, awaiting merge).
How did I test this PR
- Repro confirmed locally with a 5-line script using
asyncio.as_completed and a {Task: item} dict — the iterator yields coroutines and the lookup always fails.
- Added
app_tester/rube_learning/ingestion_pipeline/tests/test_run_with_concurrency_limit.py covering: real item repr appears in failure logs (no <unknown>), long item reprs are truncated, log_exceptions=False silences output, all-success path returns values only, and the semaphore caps concurrency.
pytest app_tester/rube_learning/ingestion_pipeline/tests/test_run_with_concurrency_limit.py -v → 5 passed.
- Full ingestion-pipeline test suite (non-integration) → 50 passed, 13 deselected.
ruff check and ruff format --check clean for both touched files.
Origin: cron-5d55c321e47a / zen-cron-2a3909fd3817
Triggered by: dhawal@composio.dev | Source: cron
Session: https://zen-api-production-4c98.up.railway.app/dashboard/#/chat/zen-cron-2a3909fd3817