Skip to content

EOA Production Readiness #14

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jul 15, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 275 additions & 2 deletions executors/src/eoa/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,22 @@ impl EoaExecutorStore {
None => format!("eoa_executor:health:{chain_id}:{eoa}"),
}
}

/// Name of the sorted set for completed transactions per EOA (for pruning)
fn completed_transactions_per_eoa_key_name(&self, eoa: Address, chain_id: u64) -> String {
match &self.namespace {
Some(ns) => format!("{ns}:eoa_executor:completed:{chain_id}:{eoa}"),
None => format!("eoa_executor:completed:{chain_id}:{eoa}"),
}
}

/// Name of the sorted set for completed transactions globally (for pruning)
fn completed_transactions_global_key_name(&self) -> String {
match &self.namespace {
Some(ns) => format!("{ns}:eoa_executor:completed_global"),
None => "eoa_executor:completed_global".to_string(),
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -1417,6 +1433,8 @@ impl EoaExecutorStore {
let submitted_key = self.submitted_transactions_zset_name(eoa, chain_id);
let hash_to_id_key = self.transaction_hash_to_id_key_name(hash);
let tx_data_key = self.transaction_data_key_name(transaction_id);
let completed_eoa_key = self.completed_transactions_per_eoa_key_name(eoa, chain_id);
let completed_global_key = self.completed_transactions_global_key_name();
let now = chrono::Utc::now().timestamp_millis().max(0) as u64;

// Remove this hash:id from submitted
Expand All @@ -1430,6 +1448,10 @@ impl EoaExecutorStore {
pipeline.hset(&tx_data_key, "completed_at", now);
pipeline.hset(&tx_data_key, "receipt", receipt);
pipeline.hset(&tx_data_key, "status", "confirmed");

// Add to completed transactions tracking for pruning
pipeline.zadd(&completed_eoa_key, transaction_id, now);
pipeline.zadd(&completed_global_key, transaction_id, now);
})
.await
}
Expand Down Expand Up @@ -1484,7 +1506,7 @@ impl EoaExecutorStore {
eoa: Address,
chain_id: u64,
worker_id: &str,
failures: Vec<crate::eoa::worker::TransactionFailure>,
failures: Vec<crate::eoa::worker::TransactionReplacement>,
) -> Result<(), TransactionStoreError> {
if failures.is_empty() {
return Ok(());
Expand Down Expand Up @@ -1532,6 +1554,8 @@ impl EoaExecutorStore {

self.with_lock_check(eoa, chain_id, worker_id, |pipeline| {
let submitted_key = self.submitted_transactions_zset_name(eoa, chain_id);
let completed_eoa_key = self.completed_transactions_per_eoa_key_name(eoa, chain_id);
let completed_global_key = self.completed_transactions_global_key_name();
let now = chrono::Utc::now().timestamp_millis().max(0) as u64;

for success in &successes {
Expand All @@ -1548,6 +1572,10 @@ impl EoaExecutorStore {
pipeline.hset(&tx_data_key, "completed_at", now);
pipeline.hset(&tx_data_key, "receipt", &success.receipt_data);
pipeline.hset(&tx_data_key, "status", "confirmed");

// Add to completed transactions tracking for pruning
pipeline.zadd(&completed_eoa_key, &success.transaction_id, now);
pipeline.zadd(&completed_global_key, &success.transaction_id, now);
}
})
.await
Expand Down Expand Up @@ -1713,6 +1741,197 @@ impl EoaExecutorStore {

Ok(())
}

/// Get count of submitted transactions awaiting confirmation
pub async fn get_submitted_transactions_count(
&self,
eoa: Address,
chain_id: u64,
) -> Result<u64, TransactionStoreError> {
let submitted_key = self.submitted_transactions_zset_name(eoa, chain_id);
let mut conn = self.redis.clone();

let count: u64 = conn.zcard(&submitted_key).await?;
Ok(count)
}

/// Fail a transaction that's in the pending queue
pub async fn fail_pending_transaction(
&self,
eoa: Address,
chain_id: u64,
worker_id: &str,
transaction_id: &str,
failure_reason: &str,
) -> Result<(), TransactionStoreError> {
self.with_lock_check(eoa, chain_id, worker_id, |pipeline| {
let pending_key = self.pending_transactions_list_name(eoa, chain_id);
let tx_data_key = self.transaction_data_key_name(transaction_id);
let completed_eoa_key = self.completed_transactions_per_eoa_key_name(eoa, chain_id);
let completed_global_key = self.completed_transactions_global_key_name();
let now = chrono::Utc::now().timestamp_millis().max(0) as u64;

// Remove from pending queue
pipeline.lrem(&pending_key, 0, transaction_id);

// Update transaction data with failure
pipeline.hset(&tx_data_key, "completed_at", now);
pipeline.hset(&tx_data_key, "failure_reason", failure_reason);
pipeline.hset(&tx_data_key, "status", "failed");

// Add to completed transactions tracking for pruning
pipeline.zadd(&completed_eoa_key, transaction_id, now);
pipeline.zadd(&completed_global_key, transaction_id, now);
})
.await
}

/// Fail a transaction that's in the borrowed state (we know the nonce)
pub async fn fail_borrowed_transaction(
&self,
eoa: Address,
chain_id: u64,
worker_id: &str,
transaction_id: &str,
nonce: u64,
failure_reason: &str,
) -> Result<(), TransactionStoreError> {
self.with_lock_check(eoa, chain_id, worker_id, |pipeline| {
let borrowed_key = self.borrowed_transactions_hashmap_name(eoa, chain_id);
let tx_data_key = self.transaction_data_key_name(transaction_id);
let completed_eoa_key = self.completed_transactions_per_eoa_key_name(eoa, chain_id);
let completed_global_key = self.completed_transactions_global_key_name();
let now = chrono::Utc::now().timestamp_millis().max(0) as u64;

// Remove from borrowed state using the known nonce
pipeline.hdel(&borrowed_key, nonce.to_string());

// Update transaction data with failure
pipeline.hset(&tx_data_key, "completed_at", now);
pipeline.hset(&tx_data_key, "failure_reason", failure_reason);
pipeline.hset(&tx_data_key, "status", "failed");

// Add to completed transactions tracking for pruning
pipeline.zadd(&completed_eoa_key, transaction_id, now);
pipeline.zadd(&completed_global_key, transaction_id, now);
})
.await
}

/// Prune old completed transactions for a specific EOA if it exceeds the cap
pub async fn prune_completed_transactions_for_eoa(
&self,
eoa: Address,
chain_id: u64,
max_per_eoa: u64,
batch_size: u64,
) -> Result<u64, TransactionStoreError> {
let completed_eoa_key = self.completed_transactions_per_eoa_key_name(eoa, chain_id);
let completed_global_key = self.completed_transactions_global_key_name();
let mut conn = self.redis.clone();

// Check current count
let current_count: u64 = conn.zcard(&completed_eoa_key).await?;
if current_count <= max_per_eoa {
return Ok(0); // No pruning needed
}

let to_remove = current_count - max_per_eoa;
let batch_to_remove = to_remove.min(batch_size);

// Get oldest transactions (lowest scores)
let oldest_transactions: Vec<String> = conn
.zrange(&completed_eoa_key, 0, (batch_to_remove - 1) as isize)
.await?;

if oldest_transactions.is_empty() {
return Ok(0);
}

// Remove transaction data and tracking
for transaction_id in &oldest_transactions {
let tx_data_key = self.transaction_data_key_name(transaction_id);
let _: () = conn.del(&tx_data_key).await?;
}

// Remove from tracking sets
for transaction_id in &oldest_transactions {
let _: () = conn.zrem(&completed_eoa_key, transaction_id).await?;
let _: () = conn.zrem(&completed_global_key, transaction_id).await?;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Batch Redis operations in pruning methods for better performance.

The pruning methods execute individual del and zrem operations in loops, which is inefficient. For better performance, use Redis pipelines to batch these operations.

Apply this pattern to both pruning methods:

-        // Remove transaction data and tracking
-        for transaction_id in &oldest_transactions {
-            let tx_data_key = self.transaction_data_key_name(transaction_id);
-            let _: () = conn.del(&tx_data_key).await?;
-        }
-
-        // Remove from tracking sets
-        for transaction_id in &oldest_transactions {
-            let _: () = conn.zrem(&completed_eoa_key, transaction_id).await?;
-            let _: () = conn.zrem(&completed_global_key, transaction_id).await?;
-        }
+        // Batch remove transaction data and tracking
+        let mut pipeline = twmq::redis::pipe();
+        for transaction_id in &oldest_transactions {
+            let tx_data_key = self.transaction_data_key_name(transaction_id);
+            pipeline.del(&tx_data_key);
+            pipeline.zrem(&completed_eoa_key, transaction_id);
+            pipeline.zrem(&completed_global_key, transaction_id);
+        }
+        let _: Vec<twmq::redis::Value> = pipeline.query_async(&mut conn).await?;

Also applies to: 1894-1902

🤖 Prompt for AI Agents
In executors/src/eoa/store.rs around lines 1852 to 1861 and also 1894 to 1902,
the code performs individual Redis `del` and `zrem` operations inside loops,
which is inefficient. Refactor these loops to use Redis pipelines by batching
all `del` commands together and all `zrem` commands together in a single
pipeline execution. This will reduce the number of round trips to Redis and
improve performance.


Ok(oldest_transactions.len() as u64)
}

/// Prune old completed transactions globally if it exceeds the global cap
pub async fn prune_completed_transactions_globally(
&self,
max_global: u64,
batch_size: u64,
) -> Result<u64, TransactionStoreError> {
let completed_global_key = self.completed_transactions_global_key_name();
let mut conn = self.redis.clone();

// Check current count
let current_count: u64 = conn.zcard(&completed_global_key).await?;
if current_count <= max_global {
return Ok(0); // No pruning needed
}

let to_remove = current_count - max_global;
let batch_to_remove = to_remove.min(batch_size);

// Get oldest transactions (lowest scores)
let oldest_transactions: Vec<String> = conn
.zrange(&completed_global_key, 0, (batch_to_remove - 1) as isize)
.await?;

if oldest_transactions.is_empty() {
return Ok(0);
}

// Remove transaction data
for transaction_id in &oldest_transactions {
let tx_data_key = self.transaction_data_key_name(transaction_id);
let _: () = conn.del(&tx_data_key).await?;
}

// Remove from global tracking
for transaction_id in &oldest_transactions {
let _: () = conn.zrem(&completed_global_key, transaction_id).await?;
}

// Also remove from per-EOA tracking sets (we need to scan for these)
// Note: This is less efficient but necessary to keep consistency
self.remove_transactions_from_all_eoa_tracking(&oldest_transactions)
.await?;

Ok(oldest_transactions.len() as u64)
}

/// Helper to remove transactions from all EOA tracking sets
async fn remove_transactions_from_all_eoa_tracking(
&self,
transaction_ids: &[String],
) -> Result<(), TransactionStoreError> {
let mut conn = self.redis.clone();
let pattern = match &self.namespace {
Some(ns) => format!("{ns}:eoa_executor:completed:*"),
None => "eoa_executor:completed:*".to_string(),
};

// Get all EOA-specific completed transaction keys
let keys: Vec<String> = conn.keys(&pattern).await?;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Replace KEYS command with SCAN to avoid blocking Redis.

Using keys() can block Redis in production environments with many keys. Use SCAN for non-blocking iteration.

Replace the KEYS usage with SCAN:

-        // Get all EOA-specific completed transaction keys
-        let keys: Vec<String> = conn.keys(&pattern).await?;
+        // Get all EOA-specific completed transaction keys using SCAN
+        let mut keys = Vec::new();
+        let mut cursor = 0u64;
+        loop {
+            let (new_cursor, batch): (u64, Vec<String>) = twmq::redis::cmd("SCAN")
+                .arg(cursor)
+                .arg("MATCH")
+                .arg(&pattern)
+                .arg("COUNT")
+                .arg(100)
+                .query_async(&mut conn)
+                .await?;
+            keys.extend(batch);
+            cursor = new_cursor;
+            if cursor == 0 {
+                break;
+            }
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let keys: Vec<String> = conn.keys(&pattern).await?;
// Get all EOA-specific completed transaction keys using SCAN
let mut keys = Vec::new();
let mut cursor = 0u64;
loop {
let (new_cursor, batch): (u64, Vec<String>) = twmq::redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg(&pattern)
.arg("COUNT")
.arg(100)
.query_async(&mut conn)
.await?;
keys.extend(batch);
cursor = new_cursor;
if cursor == 0 {
break;
}
}
🤖 Prompt for AI Agents
In executors/src/eoa/store.rs at line 1924, replace the blocking Redis KEYS
command with the non-blocking SCAN command. Modify the code to asynchronously
iterate over keys matching the pattern using SCAN, collecting them into a
Vec<String> without blocking Redis, ensuring efficient and safe key retrieval in
production environments.


// Remove transactions from each EOA tracking set
for key in keys {
for transaction_id in transaction_ids {
let _: () = conn.zrem(&key, transaction_id).await?;
}
}

Ok(())
}
}

// Additional error types
Expand Down Expand Up @@ -1974,7 +2193,7 @@ impl<'a> ScopedEoaExecutorStore<'a> {
/// Efficiently batch fail and requeue multiple transactions
pub async fn batch_fail_and_requeue_transactions(
&self,
failures: Vec<crate::eoa::worker::TransactionFailure>,
failures: Vec<crate::eoa::worker::TransactionReplacement>,
) -> Result<(), TransactionStoreError> {
self.store
.batch_fail_and_requeue_transactions(self.eoa, self.chain_id, &self.worker_id, failures)
Expand Down Expand Up @@ -2153,4 +2372,58 @@ impl<'a> ScopedEoaExecutorStore<'a> {
) -> Result<Option<TransactionData>, TransactionStoreError> {
self.store.get_transaction_data(transaction_id).await
}

/// Get count of submitted transactions awaiting confirmation
pub async fn get_submitted_transactions_count(&self) -> Result<u64, TransactionStoreError> {
self.store
.get_submitted_transactions_count(self.eoa, self.chain_id)
.await
}

/// Fail a transaction that's in the pending queue
pub async fn fail_pending_transaction(
&self,
transaction_id: &str,
failure_reason: &str,
) -> Result<(), TransactionStoreError> {
self.store
.fail_pending_transaction(
self.eoa,
self.chain_id,
&self.worker_id,
transaction_id,
failure_reason,
)
.await
}

/// Fail a transaction that's in the borrowed state (we know the nonce)
pub async fn fail_borrowed_transaction(
&self,
transaction_id: &str,
nonce: u64,
failure_reason: &str,
) -> Result<(), TransactionStoreError> {
self.store
.fail_borrowed_transaction(
self.eoa,
self.chain_id,
&self.worker_id,
transaction_id,
nonce,
failure_reason,
)
.await
}

/// Prune old completed transactions for this EOA if it exceeds the cap
pub async fn prune_completed_transactions(
&self,
max_per_eoa: u64,
batch_size: u64,
) -> Result<u64, TransactionStoreError> {
self.store
.prune_completed_transactions_for_eoa(self.eoa, self.chain_id, max_per_eoa, batch_size)
.await
}
}
Loading