Review the following changes in direct dependencies. Learn more about Socket for GitHub.
Stack: Batch Revoke — Thermos side · PR 2 of 3
We're adding batch-revoke support to the platform: the ability to revoke every connected account in a scope (org / project / auth config / connected account) asynchronously, plus opt-in revoke-on-delete on the public DELETE endpoints. This introduces new batch-revoke APIs and revoke-on-delete semantics across the platform. The work runs as a Temporal workflow on Thermos that calls Apollo's single-connection revoke one account at a time, recording each per-connection outcome durably.
This PR is part of a 3-PR stack landing the Thermos side of that machinery
(the Apollo side — jobs table, public endpoints, DELETE cascade — follows
separately).
Stack (merge bottom-up):
sarthak/batch-revoke-1-workerdb — revoke_job_item workerdb table + storesarthak/batch-revoke-2-workflow — RevokeBatchWorkflow + activities ← this PRsarthak/batch-revoke-3-endpoints — enqueue/poll HTTP endpointsBase:
sarthak/batch-revoke-1-workerdb— review/merge that first.
Thermos owns execution of an async scoped revocation job: given a list of
connected-account ids, it revokes each via Apollo's single-connection admin endpoint
and records every per-connection outcome durably in workerdb so the job status can
be polled to completion. This PR adds the Temporal workflow that does the draining,
plus the Apollo-HTTP mocking infrastructure its unit tests rely on.
The workflow (workers/revoke_batch.go) — one workflow, three activities:
MaterializeWorkList expands the staged id list (read from the object store) into
pending revoke_job_item rows.RevokePendingItems drains them through a bounded-concurrency pool, recording 2xx
as success and 4xx as a terminal failed, while leaving 5xx/transport rows
pending for the next attempt.FinalizePending sweeps any leftovers to failed once retries are exhausted, so
the job always completes with a whole ledger rather than failing the run.Supporting pieces:
lib/batchrevoke/ids.go: the thermos_job_id = encode(workflow_id, run_id) codec
and the object-store StagingKey, shared with the enqueue/poll handlers (PR 3).lib/awsutils/providers.go: an ObjectStore fx provider gated on the existing
LARGE_PAYLOAD_S3_BUCKET (reused, not a new bucket; fail-closed — the worker skips
when unset).go.uber.org/mock added as a direct
dependency, a //go:generate mockgen directive next to the HTTPClient interface
in utils/http.go with the generated MockHTTPClient committed under
utils/mocks/, and utils/mocking.md documenting the pattern. This is what lets
the workflow's per-connection unit be tested without a live Apollo.Key design decisions:
thermos_job_id from workflow.GetInfo because
run_id only exists after start, so the enqueue side stages the raw id list under
workflow_id and the workflow materializes its own rows.apps/thermos/workers/revoke_batch_test.go +
apps/thermos/service_isolation/batch_revoke/workflow_test.go (real workerdb,
Apollo mocked via the uber/mock MockHTTPClient): revokeConnectionAndPersist
across 2xx → success, 4xx → terminal failed, 5xx/transport → row stays pending
with the last error persisted; the FinalizePending exhaustion sweep.ON CONFLICT DO NOTHING); ErrObjectNotFound with rows present → no-op success,
with no rows → error.testsuite run drives a staged blob through
materialize → drain → completed; a forced-failure drain finalizes to completed
with failed rows (not a failed workflow).apps/thermos/service_isolation/run_service_isolation_tests.sh; mocks
regenerated with go generate ./utils/....MaterializeWorkList reads the staged id-list blob from an object store. Rather than a
new bucket, this reuses the existing LARGE_PAYLOAD_S3_BUCKET (already used for
Mercury large-payload offloading), so the bucket infra is already in place — staging keys
are namespaced under StagingKey(workflow_id) so they never collide with the offloading
objects:
LARGE_PAYLOAD_S3_BUCKET; ProvideObjectStore is gated on it (returns nil when unset) and
the worker fails closed, so no run materializes without a staged list. No new bucket, no new
env var.Get/Put — already granted on the bucket.ObjectStore interface has no Delete (GC = overwrite-per-workflow_id
completed).RevokeBatchWorkflow and its three activities
(materialize / drain / finalize) on startup.Towards DASH-814
Review the following changes in direct dependencies. Learn more about Socket for GitHub.
auth configs / projects