From a36e1215270fd06bf448ed7194ac42808def11e9 Mon Sep 17 00:00:00 2001 From: Prithvish Baidya Date: Wed, 16 Jul 2025 12:04:03 +0530 Subject: [PATCH 1/2] update docs --- HOW_IT_WORKS.md | 313 -------------------------------------- README.md | 31 ++++ README_EOA.md | 388 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 419 insertions(+), 313 deletions(-) delete mode 100644 HOW_IT_WORKS.md create mode 100644 README_EOA.md diff --git a/HOW_IT_WORKS.md b/HOW_IT_WORKS.md deleted file mode 100644 index d0ff143..0000000 --- a/HOW_IT_WORKS.md +++ /dev/null @@ -1,313 +0,0 @@ -# How It Works: EOA Worker Transaction Processing - -## Overview - -The EOA (Externally Owned Account) Worker is a single worker per EOA:chain combination that processes all transactions for that specific EOA. It manages transaction lifecycle from queuing to confirmation, with robust error handling and nonce management. - -## Core Architecture - -### Data Structures - -The worker maintains several key data structures: - -- **`pending_txns`**: Queue of transaction IDs waiting to be sent -- **`success_txns`**: Sorted set mapping nonces to transaction hashes for sent transactions -- **`hash_to_id`**: Hash map from transaction hash to transaction ID -- **`tx_data`**: Hash map from transaction ID to full transaction JSON data -- **`borrowed_txns`**: Hash map for crash recovery of prepared transactions -- **`recycled_nonces`**: Sorted set of available recycled nonces -- **`optimistic_nonce`**: Next nonce to use for new transactions -- **`last_chain_nonce`**: Cached chain nonce for comparison -- **`eoa_health`**: Health status including funding state and last check timestamp - -### Main Worker Loop - -The worker runs in a continuous loop with three main phases: - -1. **Recovery Phase**: Recovers any borrowed transactions from crashes -2. **Confirmation Phase**: Checks for mined transactions and handles failures -3. **Send Phase**: Sends new transactions while managing nonce allocation - -## Transaction Flow Diagram - -```mermaid -flowchart TD - A["๐Ÿš€ EOA Worker Start"] --> B["Main Worker Loop"] - B --> C["1. Recover Borrowed State"] - B --> D["2. Confirm Flow"] - B --> E["3. Send Flow"] - B --> F["๐Ÿ’ค Sleep(WORKER_CYCLE_DELAY)"] - F --> B - - %% Recovery Flow - C --> C1["Check borrowed_txns"] - C1 --> C2{Any borrowed
transactions?} - C2 -->|Yes| C3["For each borrowed tx:
Rebroadcast prepared_tx"] - C2 -->|No| D - C3 --> C4["RPC Send Transaction"] - C4 --> C5{Result Type?} - C5 -->|Deterministic Failure| C6["โŒ Requeue to pending_txns
Add nonce to recycled_nonces"] - C5 -->|Success/Indeterminate| C7["โœ… Add to success_txns
Update hash_to_id"] - C6 --> C8["Remove from borrowed_txns"] - C7 --> C8 - C8 --> C9{More borrowed
transactions?} - C9 -->|Yes| C3 - C9 -->|No| D - - %% Confirmation Flow - D --> D1["Get current_chain_nonce"] - D1 --> D2{Chain nonce
changed?} - D2 -->|No| E - D2 -->|Yes| D3["Get pending hashes for
nonces < current_chain_nonce"] - D3 --> D4["For each pending hash:
Get transaction receipt"] - D4 --> D5{Receipt exists?} - D5 -->|Yes| D6["โœ… Transaction mined
Add to confirmed_tx_ids
Cleanup success_txns"] - D5 -->|No| D7["โŒ Transaction failed
Add to failed_tx_ids"] - D6 --> D8{More hashes
to check?} - D7 --> D8 - D8 -->|Yes| D4 - D8 -->|No| D9["Requeue failed transactions
to pending_txns"] - D9 --> D10["Update last_chain_nonce"] - D10 --> E - - %% Send Flow - E --> E1["Check EOA Health"] - E1 --> E2{EOA funded?} - E2 -->|No| B - E2 -->|Yes| E3["Process Recycled Nonces"] - E3 --> E4["Check in-flight count"] - E4 --> E5{Too many
in-flight?} - E5 -->|Yes| B - E5 -->|No| E6["Process New Transactions"] - E6 --> B - - %% Process Recycled Nonces - E3 --> E3A{recycled_nonces
> MAX_RECYCLED?} - E3A -->|Yes| E3B["๐Ÿงน Clear all recycled nonces"] - E3A -->|No| E3C{recycled_nonces > 0
AND pending_txns > 0?} - E3B --> E4 - E3C -->|Yes| E3D["Pop min nonce
Dequeue tx_id"] - E3C -->|No| E3E{Still recycled
nonces?} - E3D --> E3F["Send transaction with nonce"] - E3F --> E3C - E3E -->|Yes| E3G["Send no-op transaction"] - E3E -->|No| E4 - E3G --> E3E - - %% Process New Transactions - E6 --> E6A{sent_count < max_count
AND pending_txns > 0?} - E6A -->|Yes| E6B["Dequeue tx_id
Get next nonce"] - E6A -->|No| B - E6B --> E6C["Send transaction with nonce"] - E6C --> E6D["Increment sent_count"] - E6D --> E6A - - %% Send Transaction with Nonce - E3F --> ST1["Get transaction data"] - E6C --> ST1 - E3G --> ST1 - ST1 --> ST2["Prepare complete transaction"] - ST2 --> ST3["Store in borrowed_txns"] - ST3 --> ST4["RPC Send Transaction"] - ST4 --> ST5{Result Type?} - ST5 -->|Deterministic Failure| ST6["โŒ Requeue tx_id
Add nonce to recycled
Mark EOA unfunded"] - ST5 -->|Success/Indeterminate| ST7["โœ… Add to success_txns
Update hash_to_id"] - ST6 --> ST8["Remove from borrowed_txns"] - ST7 --> ST8 - ST8 --> ST9["Return to caller"] - - %% Health Check - E1 --> E1A{Time since last
check > threshold?} - E1A -->|Yes| E1B["Get EOA balance"] - E1A -->|No| E2 - E1B --> E1C["Update eoa_health.funded
Update last_check"] - E1C --> E2 - - %% Styling - classDef startEnd fill:#e1f5fe,stroke:#01579b,stroke-width:2px - classDef process fill:#f3e5f5,stroke:#4a148c,stroke-width:2px - classDef decision fill:#fff3e0,stroke:#e65100,stroke-width:2px - classDef success fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px - classDef failure fill:#ffebee,stroke:#c62828,stroke-width:2px - classDef cleanup fill:#f1f8e9,stroke:#558b2f,stroke-width:2px - - class A,B,F startEnd - class C,D,E,E1,E3,E4,E6,C1,C3,C4,D1,D3,D4,D9,D10,E3D,E3F,E6B,E6C,E6D,ST1,ST2,ST3,ST4,E1B,E1C process - class C2,C5,C9,D2,D5,D8,E2,E5,E3A,E3C,E3E,E6A,ST5,E1A decision - class C7,D6,ST7 success - class C6,D7,ST6 failure - class C8,D9,E3B,ST8,ST9 cleanup -``` - -The above diagram illustrates the complete transaction processing flow. The worker operates in a continuous loop, processing three main phases sequentially. - -## Detailed Phase Breakdown - -### 1. Recovery Phase (`recover_borrowed_state`) - -**Purpose**: Recover from crashes by handling any transactions that were prepared but not fully processed. - -**Process**: - -- Iterates through all transactions in `borrowed_txns` -- Rebroadcasts each prepared transaction to the RPC -- Classifies results: - - **Deterministic failures**: Requeues transaction and recycles nonce - - **Success/Indeterminate**: Assumes sent and adds to success tracking -- Cleans up borrowed state - -**Key Insight**: This provides crash resilience by ensuring no prepared transactions are lost. - -### 2. Confirmation Phase (`confirm_flow`) - -**Purpose**: Identify completed transactions and handle failures. - -**Process**: - -- Compares current chain nonce with cached `last_chain_nonce` -- If unchanged, skips confirmation (no progress on chain) -- For progressed nonces, checks transaction receipts -- Categorizes results: - - **Mined transactions**: Removes from tracking, adds to confirmed set - - **Failed/Dropped transactions**: Adds to failed set for requeuing -- Requeues failed transactions (deduplicated against confirmed ones) -- Updates cached chain nonce - -**Key Insight**: Uses nonce progression to efficiently identify which transactions need confirmation checks. - -### 3. Send Phase (`send_flow`) - -**Purpose**: Send new transactions while managing nonce allocation and capacity. - -**Components**: - -#### A. Health Check - -- Periodically checks EOA balance -- Skips sending if insufficient funds -- Prevents wasteful RPC calls when EOA is unfunded - -#### B. Recycled Nonce Processing - -- **Overflow Protection**: Clears all recycled nonces if too many accumulate -- **Reuse Priority**: Fills recycled nonces before using fresh ones -- **No-op Transactions**: Sends empty transactions for unused recycled nonces - -#### C. New Transaction Processing - -- **Capacity Management**: Limits in-flight transactions to `MAX_IN_FLIGHT` -- **Fresh Nonce Allocation**: Uses optimistic nonce counter for new transactions -- **Batch Processing**: Sends multiple transactions up to available capacity - -## Error Classification System - -### Deterministic Failures - -- Invalid signature -- Malformed transaction -- Invalid transaction format -- **Action**: Immediate requeue + nonce recycling - -### Success Cases - -- Explicit success response -- "already known" (duplicate) -- "nonce too low" (already mined) -- **Action**: Add to success tracking - -### Indeterminate Cases - -- Network timeouts -- Temporary RPC failures -- Unknown errors -- **Action**: Assume sent (optimistic approach) - -## Nonce Management Strategy - -### Optimistic Nonce Counter - -- Maintains local counter independent of chain state -- Increments immediately when sending transactions -- Allows parallel transaction preparation - -### Recycled Nonce Pool - -- Reuses nonces from failed transactions -- Prevents nonce gaps in the sequence -- Bounded size to prevent memory leaks - -### Chain Nonce Synchronization - -- Periodically syncs with actual chain state -- Used for confirmation and capacity calculations -- Handles chain reorganizations gracefully - -## Key Design Decisions - -### 1. Single Worker Per EOA:Chain - -- **Benefit**: Eliminates nonce conflicts between workers -- **Trade-off**: Limits parallelism but ensures consistency - -### 2. Optimistic Sending - -- **Benefit**: Higher throughput by not waiting for confirmations -- **Trade-off**: Requires robust error handling and recovery - -### 3. Borrowed Transaction Pattern - -- **Benefit**: Crash resilience without complex state management -- **Trade-off**: Slight overhead for state tracking - -### 4. Bounded In-Flight Transactions - -- **Benefit**: Prevents memory leaks and excessive RPC usage -- **Trade-off**: May limit throughput during high-volume periods - -### 5. Recycled Nonce Cleanup - -- **Benefit**: Prevents unbounded memory growth -- **Trade-off**: May create temporary nonce gaps - -## Configuration Parameters - -- **`MAX_IN_FLIGHT`**: Maximum concurrent unconfirmed transactions -- **`MAX_RECYCLED_NONCES`**: Maximum recycled nonces before cleanup -- **`WORKER_CYCLE_DELAY`**: Sleep time between worker iterations -- **`HEALTH_CHECK_INTERVAL`**: Frequency of EOA balance checks -- **`MIN_BALANCE_THRESHOLD`**: Minimum balance to consider EOA funded - -## Monitoring and Observability - -The worker exposes several metrics for monitoring: - -- **Queue Depth**: Size of `pending_txns` queue -- **In-Flight Count**: `optimistic_nonce - last_chain_nonce` -- **Success Rate**: Ratio of confirmed to sent transactions -- **Recycled Nonce Count**: Size of recycled nonce pool -- **Health Status**: EOA funding state and last check time - -## Failure Modes and Recovery - -### Common Failure Scenarios - -1. **EOA Runs Out of Funds** - - - **Detection**: Balance check during health verification - - **Recovery**: Automatic retry once funds are restored - -2. **Network Partitions** - - - **Detection**: RPC call failures during any phase - - **Recovery**: Continues processing with cached state until network restored - -3. **Worker Crashes** - - - **Detection**: Restart detection during recovery phase - - **Recovery**: Borrowed transaction rebroadcast ensures no loss - -4. **Chain Reorganizations** - - **Detection**: Chain nonce inconsistencies - - **Recovery**: Confirmation phase handles dropped transactions - -This architecture provides a robust, scalable solution for managing EOA transactions with strong consistency guarantees and graceful failure handling. diff --git a/README.md b/README.md index 2f74bed..41631d5 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,31 @@ Advanced Redis-backed job queue with enterprise features: - **Webhook Delivery**: Reliable HTTP webhook notifications with configurable retries - **Transaction Confirmation**: Block confirmation tracking with reorganization handling - **External Bundler Integration**: UserOperation submission and status monitoring +- **EOA Transaction Processing**: Production-grade EOA (Externally Owned Account) transaction management with advanced nonce handling, crash recovery, and optimal throughput + +##### EOA Executor Deep Dive + +The EOA executor implements a sophisticated single-worker-per-EOA architecture that ensures transaction consistency while maximizing throughput: + +**Key Features:** +- **Crash-Resilient Recovery**: Borrowed transaction pattern prevents loss during worker restarts +- **Intelligent Nonce Management**: Optimistic nonce allocation with recycling for failed transactions +- **Three-Phase Processing**: Recovery โ†’ Confirmation โ†’ Send phases ensure complete transaction lifecycle management +- **Adaptive Capacity Control**: Dynamic in-flight transaction limits based on network conditions +- **Health Monitoring**: Automatic EOA balance checking with funding state awareness + +**Transaction Flow:** +1. **Recovery Phase**: Rebroadcasts any prepared transactions from crashes +2. **Confirmation Phase**: Efficiently tracks transaction confirmations using nonce progression +3. **Send Phase**: Processes new transactions with recycled nonce prioritization and capacity management + +**Error Classification:** +- **Deterministic Failures**: Immediate requeue with nonce recycling (invalid signatures, malformed transactions) +- **Success Cases**: Transaction tracking for known/duplicate transactions +- **Indeterminate Cases**: Optimistic handling for network timeouts and unknown errors + +This architecture provides strong consistency guarantees while handling high-volume transaction processing with graceful degradation under network stress. +For more details, see [README_EOA.md](README_EOA.md). ### Thirdweb Service Integration (`thirdweb-core/`) **Purpose**: First-party service integrations @@ -261,6 +286,11 @@ cargo nextest run -p twmq --profile ci โ”‚ โ””โ”€โ”€ src/ โ”‚ โ”œโ”€โ”€ webhook/ # Webhook delivery โ”‚ โ””โ”€โ”€ external_bundler/ # AA bundler integration +โ”‚ โ””โ”€โ”€ eoa/ # EOA transaction processing +โ”‚ โ”œโ”€โ”€ worker/ # Main worker logic +โ”‚ โ”œโ”€โ”€ store/ # Redis-backed state management +โ”‚ โ”œโ”€โ”€ events.rs # Transaction lifecycle events +โ”‚ โ””โ”€โ”€ error_classifier.rs # Error categorization โ””โ”€โ”€ thirdweb-core/ # Thirdweb integrations ``` @@ -377,3 +407,4 @@ For issues and questions: --- **Built with โค๏ธ by the Thirdweb team** + diff --git a/README_EOA.md b/README_EOA.md new file mode 100644 index 0000000..189e1b2 --- /dev/null +++ b/README_EOA.md @@ -0,0 +1,388 @@ +# How It Works: EOA Worker Transaction Processing + +## Overview + +The EOA (Externally Owned Account) Worker is a single worker per EOA:chain combination that processes all transactions for that specific EOA. It manages transaction lifecycle from queuing to confirmation, with robust error handling, nonce management, gas bumping, and webhook notifications. + +## Core Architecture + +### Data Structures + +The worker maintains several key data structures in Redis: + +- **`pending_transactions`**: ZSET of transaction IDs waiting to be sent, scored by queued timestamp +- **`submitted_transactions`**: ZSET mapping nonces to "hash:transaction_id" pairs for sent transactions +- **`transaction_hash_to_id`**: Hash map from transaction hash to transaction ID +- **`transaction_data`**: Hash maps storing full transaction data with user requests and receipts +- **`transaction_attempts`**: Lists storing all attempts (including gas bumps) for each transaction +- **`borrowed_transactions`**: Hash map for crash recovery of prepared transactions +- **`recycled_nonces`**: ZSET of available recycled nonces from failed transactions +- **`optimistic_transaction_count`**: Counter for next nonce to use for new transactions +- **`last_transaction_count`**: Cached chain transaction count for comparison +- **`eoa_health`**: Health status including balance, thresholds, and movement tracking + +### Main Worker Execution + +The worker runs as a TWMQ job with three main phases executed once per job: + +1. **Recovery Phase**: Recovers any borrowed transactions from crashes +2. **Confirmation Phase**: Checks for mined transactions, handles gas bumping, and manages nonce synchronization +3. **Send Phase**: Sends new transactions while managing nonce allocation and capacity + +Worker execution is scheduled by the TWMQ job queue system. TWMQ acts like a "distributed async scheduler" for each `eoa:chain` "green thread". These threads suspend themselves when there is no work to do, and ask to be rescheduled when more work is available. + +## Transaction Flow Diagram + +```mermaid +flowchart TD + A["๐Ÿš€ EOA Worker Job Start"] --> B["Acquire EOA Lock Aggressively"] + B --> C["1. Recover Borrowed State"] + C --> D["2. Confirm Flow"] + D --> E["3. Send Flow"] + E --> F["Check Work Remaining"] + F --> G{Work Remaining?} + G -->|Yes| H["โฐ Requeue Job (2s delay)"] + G -->|No| I["โœ… Job Complete"] + H --> J["Release Lock"] + I --> J + J --> K["๐Ÿ Job End"] + + %% Recovery Flow + C --> C1["Get borrowed_transactions"] + C1 --> C2{Any borrowed
transactions?} + C2 -->|Yes| C3["For each borrowed tx:
Rebroadcast signed_transaction"] + C2 -->|No| D + C3 --> C4["RPC Send Transaction"] + C4 --> C5{Result Type?} + C5 -->|Deterministic Failure| C6["โŒ Move to pending
Add nonce to recycled"] + C5 -->|Success/Indeterminate| C7["โœ… Move to submitted
Update hash mappings"] + C6 --> C8["Remove from borrowed"] + C7 --> C8 + C8 --> C9{More borrowed
transactions?} + C9 -->|Yes| C3 + C9 -->|No| D + + %% Confirmation Flow + D --> D1["Get current chain tx_count"] + D1 --> D2{Chain tx_count
progressed?} + D2 -->|No| D2A["Check if stalled (5min timeout)"] + D2A --> D2B{Nonce stalled
& submitted > 0?} + D2B -->|Yes| D2C["๐Ÿš€ Attempt Gas Bump"] + D2B -->|No| E + D2C --> E + D2 -->|Yes| D3["Get submitted txs below tx_count"] + D3 --> D4["For each submitted tx:
Get transaction receipt"] + D4 --> D5{Receipt exists?} + D5 -->|Yes| D6["โœ… Transaction confirmed
Queue webhook
Cleanup submitted"] + D5 -->|No| D7["โŒ Transaction replaced
Move to pending"] + D6 --> D8{More txs
to check?} + D7 --> D8 + D8 -->|Yes| D4 + D8 -->|No| D9["Update cached tx_count
Update health timestamps"] + D9 --> E + + %% Send Flow + E --> E1["Check EOA Health"] + E1 --> E2{Balance >
threshold?} + E2 -->|No| E2A["Update balance if stale"] + E2A --> E2B{Still insufficient?} + E2B -->|Yes| F + E2B -->|No| E3 + E2 -->|Yes| E3["Process Recycled Nonces"] + E3 --> E4["Check remaining recycled"] + E4 --> E5{All recycled
processed?} + E5 -->|No| F + E5 -->|Yes| E6["Process New Transactions"] + E6 --> F + + %% Process Recycled Nonces + E3 --> E3A["Clean recycled nonces
(remove if > MAX_RECYCLED_THRESHOLD)"] + E3A --> E3B{recycled_nonces > 0
AND pending > 0?} + E3B -->|Yes| E3C["Get pending txs matching count"] + E3B -->|No| E3D{Still have
recycled nonces?} + E3C --> E3E["Build & sign transactions"] + E3E --> E3F["Move to borrowed atomically"] + E3F --> E3G["Send transactions via RPC"] + E3G --> E3H["Process send results"] + E3H --> E3A + E3D -->|Yes| E3I["Send noop transactions
for unused nonces"] + E3D -->|No| E4 + E3I --> E4 + + %% Process New Transactions + E6 --> E6A["Get inflight budget"] + E6A --> E6B{Budget > 0
AND pending > 0?} + E6B -->|Yes| E6C["Get pending txs up to budget"] + E6B -->|No| F + E6C --> E6D["Build & sign with sequential nonces"] + E6D --> E6E["Move to borrowed with incremented nonces"] + E6E --> E6F["Send transactions via RPC"] + E6F --> E6G["Process send results"] + E6G --> E6A + + %% Gas Bump Flow + D2C --> GB1["Get submitted txs for stalled nonce"] + GB1 --> GB2{Any txs found
for nonce?} + GB2 -->|No| GB3["Send noop transaction"] + GB2 -->|Yes| GB4["Find newest transaction"] + GB4 --> GB5["Rebuild with 20% gas increase"] + GB5 --> GB6["Sign bumped transaction"] + GB6 --> GB7["Add gas bump attempt"] + GB7 --> GB8["Send bumped transaction"] + GB3 --> E + GB8 --> E + + %% Health Check Details + E1 --> E1A["Get cached health data"] + E1A --> E1B{Health data
exists?} + E1B -->|No| E1C["Initialize health from chain balance"] + E1B -->|Yes| E2 + E1C --> E1D["Save initial health data"] + E1D --> E2 + + %% Styling + classDef startEnd fill:#e1f5fe,stroke:#01579b,stroke-width:2px + classDef process fill:#f3e5f5,stroke:#4a148c,stroke-width:2px + classDef decision fill:#fff3e0,stroke:#e65100,stroke-width:2px + classDef success fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px + classDef failure fill:#ffebee,stroke:#c62828,stroke-width:2px + classDef cleanup fill:#f1f8e9,stroke:#558b2f,stroke-width:2px + classDef gasbump fill:#e3f2fd,stroke:#1565c0,stroke-width:2px + + class A,I,J,K startEnd + class B,C,D,E,F,C1,C3,C4,D1,D3,D4,D9,E3A,E3C,E3E,E3F,E3G,E3H,E6C,E6D,E6E,E6F,E6G,GB4,GB5,GB6,GB7,GB8,E1A,E1C,E1D,E2A process + class C2,C5,C9,D2,D2B,D5,D8,E2,E2B,E5,E3B,E3D,E6B,GB2,E1B decision + class C7,D6 success + class C6,D7 failure + class C8,H,E3I cleanup + class D2C,GB1,GB3 gasbump +``` + +The above diagram illustrates the complete transaction processing flow within a single job execution. The worker operates using the TWMQ job queue system, processing available work and requeueing itself if more work remains. + +## Detailed Phase Breakdown + +### 1. Recovery Phase (`recover_borrowed_state`) + +**Purpose**: Recover from crashes by handling any transactions that were prepared but not fully processed. + +**Process**: + +- Retrieves all transactions from `borrowed_transactions` hashmap +- Sorts by nonce to ensure proper ordering during rebroadcast +- Rebroadcasts each signed transaction in parallel to the RPC +- Classifies results using the sophisticated error classification system +- Uses batch processing to atomically move transactions to final states +- Queues webhook notifications for state changes + +**Key Insight**: Provides crash resilience while maintaining transaction ordering and proper state transitions. "Borrowed transactions" are funcationally a write-ahead-log for the worker. + +### 2. Confirmation Phase (`confirm_flow`) + +**Purpose**: Identify completed transactions, handle gas bumping for stalled nonces, and maintain nonce synchronization. + +**Process**: + +- Fetches current chain transaction count and compares with cached value +- **No Progress Path**: If no nonce advancement, checks for stalled transactions + - If a nonce is stalled for >5 minutes with pending transactions, attempts gas bumping + - Finds the newest transaction for the stalled nonce and rebuilds with 20% gas increase + - If no transactions exist for the nonce, sends a noop transaction +- **Progress Path**: If nonces advanced, processes confirmations + - Fetches receipts for all submitted transactions below current transaction count + - Categorizes as confirmed (has receipt) or replaced (no receipt) + - Atomically cleans confirmed transactions and updates mappings + - Updates cached transaction count and health timestamps +- Queues webhook notifications for all state changes + +**Key Insight**: Combines confirmation checking with proactive gas bumping to prevent transaction stalls. + +### 3. Send Phase (`send_flow`) + +**Purpose**: Send new transactions while managing nonce allocation, capacity limits, and EOA health. + +**Components**: + +#### A. Health Check and Balance Management + +- Retrieves or initializes EOA health data with balance, thresholds, and timestamps +- Updates balance if stale (>5 minutes since last check) +- Compares balance against dynamic threshold (updated based on transaction failures) +- Skips sending if balance is insufficient + +#### B. Recycled Nonce Processing + +- **Overflow Protection**: Cleans recycled nonces if count exceeds `MAX_RECYCLED_THRESHOLD` (50) +- **Batch Processing**: Matches recycled nonces with pending transactions +- **Parallel Preparation**: Builds and signs multiple transactions concurrently +- **Error Handling**: Filters preparation failures and handles balance threshold updates +- **Atomic State Transitions**: Uses WATCH/MULTI/EXEC for race-free state changes +- **Noop Handling**: Sends empty transactions for unused recycled nonces + +#### C. New Transaction Processing + +- **Budget Calculation**: Determines available capacity using `MAX_INFLIGHT_PER_EOA` (100) +- **Sequential Nonce Assignment**: Assigns consecutive nonces starting from optimistic counter +- **Batch Processing**: Processes multiple transactions up to available budget +- **Atomic Nonce Management**: Atomically increments optimistic counter during state transitions +- **Retry Logic**: Fixed iteration limit prevents infinite loops during high failure rates + +## Advanced Features + +### Gas Bumping System + +**Trigger Conditions**: +- Nonce hasn't moved for >5 minutes (`NONCE_STALL_TIMEOUT`) +- Submitted transactions exist for the stalled nonce + +**Process**: +1. Identifies newest transaction for the stalled nonce +2. Rebuilds transaction with 20% gas price increase +3. Signs and records as new attempt in transaction history +4. Broadcasts bumped transaction +5. Falls back to noop transaction if no transactions exist + +### Atomic State Management + +**Lock Acquisition**: +- Aggressive lock takeover for stalled workers +- Redis-based distributed locking per EOA:chain combination + +**Transaction Safety**: +- All state changes use Redis WATCH/MULTI/EXEC for atomicity +- Retry logic with exponential backoff for contention +- Lock ownership validation before every operation + +### Webhook Integration + +**Event Types**: +- Transaction submitted +- Transaction confirmed +- Transaction failed +- Transaction replaced + +**Delivery**: +- Queued via TWMQ for reliable delivery +- Supports multiple webhook endpoints per transaction +- Includes full transaction data and receipts + +## Error Classification System + +### Deterministic Failures +- Invalid transaction parameters +- Insufficient balance (below threshold) +- Transaction simulation failures +- **Action**: Immediate failure + webhook notification + +### Success Cases +- Explicit RPC success +- "already known" (duplicate submission) +- "nonce too low" (already mined) +- **Action**: Move to submitted state + +### Indeterminate Cases +- Network timeouts +- Temporary RPC failures +- Unknown RPC errors +- **Action**: Assume sent (optimistic approach) + +### Balance Threshold Management +- Dynamically updated based on transaction failures +- Prevents wasteful RPC calls when EOA lacks funds +- Automatically refreshed when transactions fail due to balance + +## Nonce Management Strategy + +### Optimistic Transaction Count +- Tracks next available nonce independent of chain state +- Atomically incremented when moving transactions to borrowed state +- Enables parallel transaction preparation + +### Cached Transaction Count +- Periodically synced with actual chain state during confirmation +- Used for inflight budget calculations and confirmation checks +- Updated atomically with health timestamps + +### Recycled Nonce Pool +- Reuses nonces from definitively failed transactions +- Bounded size with automatic cleanup at `MAX_RECYCLED_THRESHOLD` +- Priority processing before new nonces + +### Nonce Reset Protection +- Automatic reset when sync issues detected +- Tracks reset history in health data for monitoring +- Prevents state corruption during chain reorganizations + +## Key Design Decisions + +### 1. Job-Based Execution +- **Benefit**: Natural backpressure and resource management via TWMQ +- **Trade-off**: Latency depends on queue processing speed + +### 2. Aggressive Lock Acquisition +- **Benefit**: Handles worker crashes and stalls gracefully +- **Trade-off**: Potential work duplication during handoffs + +### 3. Atomic State Transitions +- **Benefit**: Strong consistency guarantees even during failures +- **Trade-off**: Increased complexity and potential retry overhead + +### 4. Batch Processing +- **Benefit**: High throughput via parallel RPC calls and atomic state updates +- **Trade-off**: More complex error handling and state management + +### 5. Gas Bumping Integration +- **Benefit**: Proactive handling of network congestion +- **Trade-off**: Additional RPC overhead and complexity + +### 6. Dynamic Balance Thresholds +- **Benefit**: Adapts to changing gas prices and network conditions +- **Trade-off**: Potential for false positives during price volatility + +## Configuration Parameters + +- **`MAX_INFLIGHT_PER_EOA`**: 100 - Maximum concurrent unconfirmed transactions +- **`MAX_RECYCLED_THRESHOLD`**: 50 - Maximum recycled nonces before cleanup +- **`TARGET_TRANSACTIONS_PER_EOA`**: 10 - Fleet management target +- **`MIN_TRANSACTIONS_PER_EOA`**: 1 - Fleet management minimum +- **`NONCE_STALL_TIMEOUT`**: 300,000ms (5 minutes) - Gas bump trigger +- **`HEALTH_CHECK_INTERVAL`**: 300s (5 minutes) - Balance refresh interval + +## Monitoring and Observability + +The worker exposes several metrics through the job result: + +- **Queue Metrics**: Pending, borrowed, and submitted transaction counts +- **Nonce Metrics**: Recycled nonce count and optimistic nonce position +- **Processing Metrics**: Transactions recovered, confirmed, failed, and sent per job +- **Health Metrics**: Balance status, last check timestamp, nonce reset history + +## Failure Modes and Recovery + +### Common Failure Scenarios + +1. **EOA Runs Out of Funds** + - **Detection**: Balance check against dynamic threshold + - **Recovery**: Automatic retry when balance threshold is met + +2. **Network Partitions** + - **Detection**: RPC call failures during any phase + - **Recovery**: Job requeue with exponential backoff + +3. **Worker Crashes** + - **Detection**: Lock timeout and aggressive takeover + - **Recovery**: Borrowed transaction rebroadcast ensures no loss + +4. **Nonce Stalls** + - **Detection**: Time-based stall detection (5-minute timeout) + - **Recovery**: Automatic gas bumping or noop transactions + +5. **Chain Reorganizations** + - **Detection**: Chain transaction count inconsistencies + - **Recovery**: Confirmation phase handles dropped transactions + +6. **State Corruption** + - **Detection**: Optimistic nonce validation failures + - **Recovery**: Automatic nonce reset to chain state + +This architecture provides a robust, scalable solution for managing EOA transactions with strong consistency guarantees, proactive congestion handling, and comprehensive failure recovery mechanisms. From 242e11b0fc09b6733dc81f42f524908037dc9e40 Mon Sep 17 00:00:00 2001 From: Prithvish Baidya Date: Wed, 16 Jul 2025 12:04:23 +0530 Subject: [PATCH 2/2] webhook metadata + eoa retry changes --- aa-types/src/userop.rs | 59 ++++++++++++++------ core/src/execution_options/mod.rs | 71 +++++++++++++++++++++++++ executors/src/eoa/store/hydrate.rs | 2 +- executors/src/eoa/worker/error.rs | 47 +++++++++++++--- executors/src/eoa/worker/mod.rs | 28 ++++++---- executors/src/eoa/worker/send.rs | 5 +- executors/src/eoa/worker/transaction.rs | 1 - executors/src/webhook/envelope.rs | 9 ++++ executors/src/webhook/mod.rs | 2 +- twmq/benches/throughput.rs | 4 +- 10 files changed, 188 insertions(+), 40 deletions(-) diff --git a/aa-types/src/userop.rs b/aa-types/src/userop.rs index e164082..7e51133 100644 --- a/aa-types/src/userop.rs +++ b/aa-types/src/userop.rs @@ -1,6 +1,6 @@ use alloy::{ core::sol_types::SolValue, - primitives::{keccak256, Address, ChainId, Bytes, U256, B256}, + primitives::{Address, B256, Bytes, ChainId, U256, keccak256}, rpc::types::{PackedUserOperation, UserOperation}, }; use serde::{Deserialize, Serialize}; @@ -14,7 +14,15 @@ pub enum VersionedUserOp { } /// Error type for UserOp operations -#[derive(Debug, Clone, thiserror::Error, serde::Serialize, serde::Deserialize, schemars::JsonSchema, utoipa::ToSchema)] +#[derive( + Debug, + Clone, + thiserror::Error, + serde::Serialize, + serde::Deserialize, + schemars::JsonSchema, + utoipa::ToSchema, +)] #[serde(tag = "type", rename_all = "SCREAMING_SNAKE_CASE")] pub enum UserOpError { #[error("Unexpected error: {0}")] @@ -68,7 +76,12 @@ pub fn compute_user_op_v07_hash( // Construct initCode from factory and factoryData let init_code: Bytes = if let Some(factory) = op.factory { if factory != Address::ZERO { - [&factory[..], &op.factory_data.clone().unwrap_or_default()[..]].concat().into() + [ + &factory[..], + &op.factory_data.clone().unwrap_or_default()[..], + ] + .concat() + .into() } else { op.factory_data.clone().unwrap_or_default() } @@ -80,9 +93,10 @@ pub fn compute_user_op_v07_hash( let vgl_u128: u128 = op.verification_gas_limit.try_into().map_err(|_| { UserOpError::UnexpectedError("verification_gas_limit too large".to_string()) })?; - let cgl_u128: u128 = op.call_gas_limit.try_into().map_err(|_| { - UserOpError::UnexpectedError("call_gas_limit too large".to_string()) - })?; + let cgl_u128: u128 = op + .call_gas_limit + .try_into() + .map_err(|_| UserOpError::UnexpectedError("call_gas_limit too large".to_string()))?; let mut account_gas_limits_bytes = [0u8; 32]; account_gas_limits_bytes[0..16].copy_from_slice(&vgl_u128.to_be_bytes()); @@ -93,9 +107,10 @@ pub fn compute_user_op_v07_hash( let mpfpg_u128: u128 = op.max_priority_fee_per_gas.try_into().map_err(|_| { UserOpError::UnexpectedError("max_priority_fee_per_gas too large".to_string()) })?; - let mfpg_u128: u128 = op.max_fee_per_gas.try_into().map_err(|_| { - UserOpError::UnexpectedError("max_fee_per_gas too large".to_string()) - })?; + let mfpg_u128: u128 = op + .max_fee_per_gas + .try_into() + .map_err(|_| UserOpError::UnexpectedError("max_fee_per_gas too large".to_string()))?; let mut gas_fees_bytes = [0u8; 32]; gas_fees_bytes[0..16].copy_from_slice(&mpfpg_u128.to_be_bytes()); @@ -105,12 +120,24 @@ pub fn compute_user_op_v07_hash( // Construct paymasterAndData let paymaster_and_data: Bytes = if let Some(paymaster) = op.paymaster { if paymaster != Address::ZERO { - let pm_vgl_u128: u128 = op.paymaster_verification_gas_limit.unwrap_or_default().try_into().map_err(|_| { - UserOpError::UnexpectedError("paymaster_verification_gas_limit too large".to_string()) - })?; - let pm_pogl_u128: u128 = op.paymaster_post_op_gas_limit.unwrap_or_default().try_into().map_err(|_| { - UserOpError::UnexpectedError("paymaster_post_op_gas_limit too large".to_string()) - })?; + let pm_vgl_u128: u128 = op + .paymaster_verification_gas_limit + .unwrap_or_default() + .try_into() + .map_err(|_| { + UserOpError::UnexpectedError( + "paymaster_verification_gas_limit too large".to_string(), + ) + })?; + let pm_pogl_u128: u128 = op + .paymaster_post_op_gas_limit + .unwrap_or_default() + .try_into() + .map_err(|_| { + UserOpError::UnexpectedError( + "paymaster_post_op_gas_limit too large".to_string(), + ) + })?; [ &paymaster[..], &pm_vgl_u128.to_be_bytes()[..], @@ -154,4 +181,4 @@ pub fn compute_user_op_v07_hash( let outer_encoded = outer_tuple.abi_encode(); let final_hash = keccak256(&outer_encoded); Ok(final_hash) -} \ No newline at end of file +} diff --git a/core/src/execution_options/mod.rs b/core/src/execution_options/mod.rs index 48fd2d9..58a637d 100644 --- a/core/src/execution_options/mod.rs +++ b/core/src/execution_options/mod.rs @@ -76,10 +76,42 @@ pub struct ExecutionOptions { pub specific: SpecificExecutionOptions, } +const MAX_USER_METADATA_SIZE: usize = 4096; // 4KB limit + +fn validate_user_metadata_size(metadata: &Option) -> Result<(), String> { + if let Some(meta) = metadata { + if meta.len() > MAX_USER_METADATA_SIZE { + return Err(format!( + "User metadata exceeds maximum size of {} bytes (provided: {} bytes)", + MAX_USER_METADATA_SIZE, + meta.len() + )); + } + } + Ok(()) +} + +fn deserialize_and_validate_user_metadata<'de, D>( + deserializer: D, +) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let metadata: Option = Option::deserialize(deserializer)?; + validate_user_metadata_size(&metadata).map_err(D::Error::custom)?; + Ok(metadata) +} + #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] pub struct WebhookOptions { pub url: String, pub secret: Option, + /// Custom metadata provided by the user to be included in webhook notifications. + /// Limited to 4KB (4096 bytes) to prevent abuse. + #[serde(default, skip_serializing_if = "Option::is_none")] + #[serde(deserialize_with = "deserialize_and_validate_user_metadata")] + pub user_metadata: Option, } /// Incoming transaction request, parsed into InnerTransaction @@ -152,3 +184,42 @@ impl ExecutionOptions { &self.base.idempotency_key } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_webhook_options_user_metadata_validation() { + // Test valid metadata + let valid_json = r#"{ + "url": "https://example.com/webhook", + "secret": "test_secret", + "userMetadata": "test metadata" + }"#; + + let webhook_options: Result = serde_json::from_str(valid_json); + assert!(webhook_options.is_ok()); + assert_eq!(webhook_options.unwrap().user_metadata, Some("test metadata".to_string())); + + // Test metadata that's too large (over 4KB) + let large_metadata = "x".repeat(5000); // 5KB string + let invalid_json = format!(r#"{{ + "url": "https://example.com/webhook", + "secret": "test_secret", + "userMetadata": "{}" + }}"#, large_metadata); + + let webhook_options: Result = serde_json::from_str(&invalid_json); + assert!(webhook_options.is_err()); + + // Test missing metadata (should default to None) + let minimal_json = r#"{ + "url": "https://example.com/webhook" + }"#; + + let webhook_options: Result = serde_json::from_str(minimal_json); + assert!(webhook_options.is_ok()); + assert_eq!(webhook_options.unwrap().user_metadata, None); + } +} diff --git a/executors/src/eoa/store/hydrate.rs b/executors/src/eoa/store/hydrate.rs index 74f113a..7449ab5 100644 --- a/executors/src/eoa/store/hydrate.rs +++ b/executors/src/eoa/store/hydrate.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::HashMap; use crate::eoa::{ EoaExecutorStore, EoaTransactionRequest, diff --git a/executors/src/eoa/worker/error.rs b/executors/src/eoa/worker/error.rs index 45d39b4..44cb1de 100644 --- a/executors/src/eoa/worker/error.rs +++ b/executors/src/eoa/worker/error.rs @@ -1,18 +1,23 @@ -use alloy::transports::{RpcError, TransportErrorKind}; +use std::time::Duration; + +use alloy::{ + primitives::U256, + transports::{RpcError, TransportErrorKind}, +}; use engine_core::{ chain::Chain, error::{AlloyRpcErrorToEngineError, EngineError, RpcErrorKind}, }; use serde::{Deserialize, Serialize}; use thirdweb_core::iaw::IAWError; -use twmq::{UserCancellable, error::TwmqError}; +use twmq::{ + UserCancellable, + error::TwmqError, + job::{JobError, RequeuePosition}, +}; use crate::eoa::{ - EoaTransactionRequest, - store::{ - BorrowedTransaction, BorrowedTransactionData, SubmissionResult, SubmissionResultType, - SubmittedTransaction, TransactionStoreError, - }, + store::{BorrowedTransaction, SubmissionResult, SubmissionResultType, TransactionStoreError}, worker::EoaExecutorWorkerResult, }; @@ -64,6 +69,12 @@ pub enum EoaExecutorWorkerError { #[error("Work still remaining: {result:?}")] WorkRemaining { result: EoaExecutorWorkerResult }, + #[error("EOA out of funds: {balance} < {balance_threshold} wei")] + EoaOutOfFunds { + balance: U256, + balance_threshold: U256, + }, + #[error("Internal error: {message}")] InternalError { message: String }, @@ -71,6 +82,27 @@ pub enum EoaExecutorWorkerError { UserCancelled, } +impl EoaExecutorWorkerError { + pub fn handle(self) -> JobError { + match &self { + EoaExecutorWorkerError::EoaOutOfFunds { .. } => JobError::Nack { + error: self, + delay: Some(Duration::from_secs(60)), + position: RequeuePosition::Last, + }, + EoaExecutorWorkerError::StoreError { + inner_error: TransactionStoreError::LockLost { .. }, + .. + } => JobError::Fail(self), + _ => JobError::Nack { + error: self, + delay: Some(Duration::from_secs(10)), + position: RequeuePosition::Last, + }, + } + } +} + impl From for EoaExecutorWorkerError { fn from(error: TwmqError) -> Self { EoaExecutorWorkerError::InternalError { @@ -220,6 +252,7 @@ pub fn is_retryable_preparation_error(error: &EoaExecutorWorkerError) -> bool { EoaExecutorWorkerError::TransactionSendError { .. } => false, // Different context EoaExecutorWorkerError::SignatureParsingFailed { .. } => false, // Deterministic EoaExecutorWorkerError::WorkRemaining { .. } => false, // Different context + EoaExecutorWorkerError::EoaOutOfFunds { .. } => false, // Deterministic } } diff --git a/executors/src/eoa/worker/mod.rs b/executors/src/eoa/worker/mod.rs index f73ad36..68de7ce 100644 --- a/executors/src/eoa/worker/mod.rs +++ b/executors/src/eoa/worker/mod.rs @@ -153,7 +153,7 @@ where ) .acquire_eoa_lock_aggressively(&job.lease_token) .await - .map_err_nack(Some(Duration::from_secs(10)), RequeuePosition::Last)?; + .map_err(|e| Into::::into(e).handle())?; let worker = EoaExecutorWorker { store: scoped, @@ -169,7 +169,9 @@ where }; let result = worker.execute_main_workflow().await?; - worker.release_eoa_lock().await; + if let Err(e) = worker.release_eoa_lock().await { + tracing::error!("Error releasing EOA lock: {}", e); + } if result.is_work_remaining() { Err(EoaExecutorWorkerError::WorkRemaining { result }) @@ -267,7 +269,7 @@ impl EoaExecutorWorker { tracing::error!("Error in recover_borrowed_state: {}", e); e }) - .map_err_nack(Some(Duration::from_secs(10)), RequeuePosition::Last)?; + .map_err(|e| e.handle())?; // 2. CONFIRM FLOW let confirmations_report = self @@ -277,7 +279,7 @@ impl EoaExecutorWorker { tracing::error!("Error in confirm flow: {}", e); e }) - .map_err_nack(Some(Duration::from_secs(10)), RequeuePosition::Last)?; + .map_err(|e| e.handle())?; // 3. SEND FLOW let sent = self @@ -287,7 +289,7 @@ impl EoaExecutorWorker { tracing::error!("Error in send_flow: {}", e); e }) - .map_err_nack(Some(Duration::from_secs(10)), RequeuePosition::Last)?; + .map_err(|e| e.handle())?; // 4. CHECK FOR REMAINING WORK let pending_count = self @@ -298,8 +300,9 @@ impl EoaExecutorWorker { tracing::error!("Error in peek_pending_transactions: {}", e); e }) - .map_err_nack(Some(Duration::from_secs(10)), RequeuePosition::Last)? + .map_err(|e| Into::::into(e).handle())? .len(); + let borrowed_count = self .store .peek_borrowed_transactions() @@ -308,8 +311,9 @@ impl EoaExecutorWorker { tracing::error!("Error in peek_borrowed_transactions: {}", e); e }) - .map_err_nack(Some(Duration::from_secs(10)), RequeuePosition::Last)? + .map_err(|e| Into::::into(e).handle())? .len(); + let recycled_count = self .store .peek_recycled_nonces() @@ -318,8 +322,9 @@ impl EoaExecutorWorker { tracing::error!("Error in peek_recycled_nonces: {}", e); e }) - .map_err_nack(Some(Duration::from_secs(10)), RequeuePosition::Last)? + .map_err(|e| Into::::into(e).handle())? .len(); + let submitted_count = self .store .get_submitted_transactions_count() @@ -328,7 +333,7 @@ impl EoaExecutorWorker { tracing::error!("Error in get_submitted_transactions_count: {}", e); e }) - .map_err_nack(Some(Duration::from_secs(10)), RequeuePosition::Last)?; + .map_err(|e| Into::::into(e).handle())?; Ok(EoaExecutorWorkerResult { recovered_transactions: recovered, @@ -493,7 +498,8 @@ impl EoaExecutorWorker { Ok(()) } - async fn release_eoa_lock(self) { - self.store.release_eoa_lock().await; + async fn release_eoa_lock(self) -> Result<(), EoaExecutorWorkerError> { + self.store.release_eoa_lock().await?; + Ok(()) } } diff --git a/executors/src/eoa/worker/send.rs b/executors/src/eoa/worker/send.rs index eb4c384..19b7b88 100644 --- a/executors/src/eoa/worker/send.rs +++ b/executors/src/eoa/worker/send.rs @@ -49,7 +49,10 @@ impl EoaExecutorWorker { "EOA has insufficient balance (<= {} wei), skipping send flow", health.balance_threshold ); - return Ok(0); + return Err(EoaExecutorWorkerError::EoaOutOfFunds { + balance: health.balance, + balance_threshold: health.balance_threshold, + }); } } diff --git a/executors/src/eoa/worker/transaction.rs b/executors/src/eoa/worker/transaction.rs index 12cd2e7..e442367 100644 --- a/executors/src/eoa/worker/transaction.rs +++ b/executors/src/eoa/worker/transaction.rs @@ -23,7 +23,6 @@ use crate::eoa::{ EoaTransactionRequest, store::{ BorrowedTransaction, BorrowedTransactionData, PendingTransaction, SubmittedNoopTransaction, - TransactionData, }, worker::{ EoaExecutorWorker, diff --git a/executors/src/webhook/envelope.rs b/executors/src/webhook/envelope.rs index d351862..0422f16 100644 --- a/executors/src/webhook/envelope.rs +++ b/executors/src/webhook/envelope.rs @@ -39,6 +39,10 @@ pub struct WebhookNotificationEnvelope { #[serde(skip_serializing_if = "Option::is_none")] pub delivery_target_url: Option, + + /// Custom metadata provided by the user + #[serde(skip_serializing_if = "Option::is_none")] + pub user_metadata: Option, } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -56,6 +60,7 @@ impl BareWebhookNotificationEnvelope { self, timestamp: u64, delivery_target_url: String, + user_metadata: Option, ) -> WebhookNotificationEnvelope { WebhookNotificationEnvelope { notification_id: Uuid::new_v4().to_string(), @@ -66,6 +71,7 @@ impl BareWebhookNotificationEnvelope { event_type: self.event_type, payload: self.payload, delivery_target_url: Some(delivery_target_url), + user_metadata, } } } @@ -145,6 +151,7 @@ pub trait WebhookCapable: DurableExecution + ExecutorStage { result: success_data.result.clone(), }, delivery_target_url: Some(w.url.clone()), + user_metadata: w.user_metadata.clone(), }; self.queue_webhook_envelope(envelope, w, job, tx)? @@ -184,6 +191,7 @@ pub trait WebhookCapable: DurableExecution + ExecutorStage { next_retry_at, }, delivery_target_url: Some(w.url.clone()), + user_metadata: w.user_metadata.clone(), }; self.queue_webhook_envelope(envelope, w, job, tx)?; @@ -215,6 +223,7 @@ pub trait WebhookCapable: DurableExecution + ExecutorStage { final_attempt_number: job.job.attempts, }, delivery_target_url: Some(w.url.clone()), + user_metadata: w.user_metadata.clone(), }; self.queue_webhook_envelope(envelope, w, job, tx)?; diff --git a/executors/src/webhook/mod.rs b/executors/src/webhook/mod.rs index 271c4b6..559f94c 100644 --- a/executors/src/webhook/mod.rs +++ b/executors/src/webhook/mod.rs @@ -443,7 +443,7 @@ pub fn queue_webhook_envelopes( .map(|webhook_option| { let webhook_notification_envelope = envelope .clone() - .into_webhook_notification_envelope(now, webhook_option.url.clone()); + .into_webhook_notification_envelope(now, webhook_option.url.clone(), webhook_option.user_metadata.clone()); let serialised_envelope = serde_json::to_string(&webhook_notification_envelope)?; Ok(( serialised_envelope, diff --git a/twmq/benches/throughput.rs b/twmq/benches/throughput.rs index 4056fa0..6921f7b 100644 --- a/twmq/benches/throughput.rs +++ b/twmq/benches/throughput.rs @@ -294,7 +294,7 @@ async fn load_test_throughput( ); println!(" Jobs pushed: {}", jobs_pushed); println!(" Jobs processed: {}", total_processed); - println!(" Success rate: {:.1}%", success_rate * 100.0); + println!(" Simulated success rate: {:.1}%", success_rate * 100.0); println!(" Avg processing time: {:.2}ms", avg_processing_time); println!(" Max queue depth: {}", max_depth); println!(" Final backlog: {}", final_pending + final_active); @@ -325,7 +325,7 @@ fn find_max_throughput(c: &mut Criterion) { group.sample_size(10); // Test different throughput levels to find the limit - let throughput_levels = vec![50_000, 100_000, 120_000, 150_000, 200_000, 250_000]; + let throughput_levels = vec![150_000, 200_000, 250_000]; for &jobs_per_second in &throughput_levels { group.throughput(Throughput::Elements(jobs_per_second as u64));