-
Notifications
You must be signed in to change notification settings - Fork 2
EOA Production Readiness #14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
cc4ff1d
81d56cc
67a7ab7
1c72974
0925946
1862517
d233439
1518cd8
6b70ab5
9a29963
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -403,6 +403,22 @@ impl EoaExecutorStore { | |||||||||||||||||||||||||||||||||||||||
None => format!("eoa_executor:health:{chain_id}:{eoa}"), | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
/// Name of the sorted set for completed transactions per EOA (for pruning) | ||||||||||||||||||||||||||||||||||||||||
fn completed_transactions_per_eoa_key_name(&self, eoa: Address, chain_id: u64) -> String { | ||||||||||||||||||||||||||||||||||||||||
match &self.namespace { | ||||||||||||||||||||||||||||||||||||||||
Some(ns) => format!("{ns}:eoa_executor:completed:{chain_id}:{eoa}"), | ||||||||||||||||||||||||||||||||||||||||
None => format!("eoa_executor:completed:{chain_id}:{eoa}"), | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
/// Name of the sorted set for completed transactions globally (for pruning) | ||||||||||||||||||||||||||||||||||||||||
fn completed_transactions_global_key_name(&self) -> String { | ||||||||||||||||||||||||||||||||||||||||
match &self.namespace { | ||||||||||||||||||||||||||||||||||||||||
Some(ns) => format!("{ns}:eoa_executor:completed_global"), | ||||||||||||||||||||||||||||||||||||||||
None => "eoa_executor:completed_global".to_string(), | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
#[derive(Debug, Clone, Serialize, Deserialize)] | ||||||||||||||||||||||||||||||||||||||||
|
@@ -1417,6 +1433,8 @@ impl EoaExecutorStore { | |||||||||||||||||||||||||||||||||||||||
let submitted_key = self.submitted_transactions_zset_name(eoa, chain_id); | ||||||||||||||||||||||||||||||||||||||||
let hash_to_id_key = self.transaction_hash_to_id_key_name(hash); | ||||||||||||||||||||||||||||||||||||||||
let tx_data_key = self.transaction_data_key_name(transaction_id); | ||||||||||||||||||||||||||||||||||||||||
let completed_eoa_key = self.completed_transactions_per_eoa_key_name(eoa, chain_id); | ||||||||||||||||||||||||||||||||||||||||
let completed_global_key = self.completed_transactions_global_key_name(); | ||||||||||||||||||||||||||||||||||||||||
let now = chrono::Utc::now().timestamp_millis().max(0) as u64; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
// Remove this hash:id from submitted | ||||||||||||||||||||||||||||||||||||||||
|
@@ -1430,6 +1448,10 @@ impl EoaExecutorStore { | |||||||||||||||||||||||||||||||||||||||
pipeline.hset(&tx_data_key, "completed_at", now); | ||||||||||||||||||||||||||||||||||||||||
pipeline.hset(&tx_data_key, "receipt", receipt); | ||||||||||||||||||||||||||||||||||||||||
pipeline.hset(&tx_data_key, "status", "confirmed"); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
// Add to completed transactions tracking for pruning | ||||||||||||||||||||||||||||||||||||||||
pipeline.zadd(&completed_eoa_key, transaction_id, now); | ||||||||||||||||||||||||||||||||||||||||
pipeline.zadd(&completed_global_key, transaction_id, now); | ||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||
.await | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
@@ -1484,7 +1506,7 @@ impl EoaExecutorStore { | |||||||||||||||||||||||||||||||||||||||
eoa: Address, | ||||||||||||||||||||||||||||||||||||||||
chain_id: u64, | ||||||||||||||||||||||||||||||||||||||||
worker_id: &str, | ||||||||||||||||||||||||||||||||||||||||
failures: Vec<crate::eoa::worker::TransactionFailure>, | ||||||||||||||||||||||||||||||||||||||||
failures: Vec<crate::eoa::worker::TransactionReplacement>, | ||||||||||||||||||||||||||||||||||||||||
) -> Result<(), TransactionStoreError> { | ||||||||||||||||||||||||||||||||||||||||
if failures.is_empty() { | ||||||||||||||||||||||||||||||||||||||||
return Ok(()); | ||||||||||||||||||||||||||||||||||||||||
|
@@ -1532,6 +1554,8 @@ impl EoaExecutorStore { | |||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
self.with_lock_check(eoa, chain_id, worker_id, |pipeline| { | ||||||||||||||||||||||||||||||||||||||||
let submitted_key = self.submitted_transactions_zset_name(eoa, chain_id); | ||||||||||||||||||||||||||||||||||||||||
let completed_eoa_key = self.completed_transactions_per_eoa_key_name(eoa, chain_id); | ||||||||||||||||||||||||||||||||||||||||
let completed_global_key = self.completed_transactions_global_key_name(); | ||||||||||||||||||||||||||||||||||||||||
let now = chrono::Utc::now().timestamp_millis().max(0) as u64; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
for success in &successes { | ||||||||||||||||||||||||||||||||||||||||
|
@@ -1548,6 +1572,10 @@ impl EoaExecutorStore { | |||||||||||||||||||||||||||||||||||||||
pipeline.hset(&tx_data_key, "completed_at", now); | ||||||||||||||||||||||||||||||||||||||||
pipeline.hset(&tx_data_key, "receipt", &success.receipt_data); | ||||||||||||||||||||||||||||||||||||||||
pipeline.hset(&tx_data_key, "status", "confirmed"); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
// Add to completed transactions tracking for pruning | ||||||||||||||||||||||||||||||||||||||||
pipeline.zadd(&completed_eoa_key, &success.transaction_id, now); | ||||||||||||||||||||||||||||||||||||||||
pipeline.zadd(&completed_global_key, &success.transaction_id, now); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||
.await | ||||||||||||||||||||||||||||||||||||||||
|
@@ -1713,6 +1741,197 @@ impl EoaExecutorStore { | |||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
Ok(()) | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
/// Get count of submitted transactions awaiting confirmation | ||||||||||||||||||||||||||||||||||||||||
pub async fn get_submitted_transactions_count( | ||||||||||||||||||||||||||||||||||||||||
&self, | ||||||||||||||||||||||||||||||||||||||||
eoa: Address, | ||||||||||||||||||||||||||||||||||||||||
chain_id: u64, | ||||||||||||||||||||||||||||||||||||||||
) -> Result<u64, TransactionStoreError> { | ||||||||||||||||||||||||||||||||||||||||
let submitted_key = self.submitted_transactions_zset_name(eoa, chain_id); | ||||||||||||||||||||||||||||||||||||||||
let mut conn = self.redis.clone(); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
let count: u64 = conn.zcard(&submitted_key).await?; | ||||||||||||||||||||||||||||||||||||||||
Ok(count) | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
/// Fail a transaction that's in the pending queue | ||||||||||||||||||||||||||||||||||||||||
pub async fn fail_pending_transaction( | ||||||||||||||||||||||||||||||||||||||||
&self, | ||||||||||||||||||||||||||||||||||||||||
eoa: Address, | ||||||||||||||||||||||||||||||||||||||||
chain_id: u64, | ||||||||||||||||||||||||||||||||||||||||
worker_id: &str, | ||||||||||||||||||||||||||||||||||||||||
transaction_id: &str, | ||||||||||||||||||||||||||||||||||||||||
failure_reason: &str, | ||||||||||||||||||||||||||||||||||||||||
) -> Result<(), TransactionStoreError> { | ||||||||||||||||||||||||||||||||||||||||
self.with_lock_check(eoa, chain_id, worker_id, |pipeline| { | ||||||||||||||||||||||||||||||||||||||||
let pending_key = self.pending_transactions_list_name(eoa, chain_id); | ||||||||||||||||||||||||||||||||||||||||
let tx_data_key = self.transaction_data_key_name(transaction_id); | ||||||||||||||||||||||||||||||||||||||||
let completed_eoa_key = self.completed_transactions_per_eoa_key_name(eoa, chain_id); | ||||||||||||||||||||||||||||||||||||||||
let completed_global_key = self.completed_transactions_global_key_name(); | ||||||||||||||||||||||||||||||||||||||||
let now = chrono::Utc::now().timestamp_millis().max(0) as u64; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
// Remove from pending queue | ||||||||||||||||||||||||||||||||||||||||
pipeline.lrem(&pending_key, 0, transaction_id); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
// Update transaction data with failure | ||||||||||||||||||||||||||||||||||||||||
pipeline.hset(&tx_data_key, "completed_at", now); | ||||||||||||||||||||||||||||||||||||||||
pipeline.hset(&tx_data_key, "failure_reason", failure_reason); | ||||||||||||||||||||||||||||||||||||||||
pipeline.hset(&tx_data_key, "status", "failed"); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
// Add to completed transactions tracking for pruning | ||||||||||||||||||||||||||||||||||||||||
pipeline.zadd(&completed_eoa_key, transaction_id, now); | ||||||||||||||||||||||||||||||||||||||||
pipeline.zadd(&completed_global_key, transaction_id, now); | ||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||
.await | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
/// Fail a transaction that's in the borrowed state (we know the nonce) | ||||||||||||||||||||||||||||||||||||||||
pub async fn fail_borrowed_transaction( | ||||||||||||||||||||||||||||||||||||||||
&self, | ||||||||||||||||||||||||||||||||||||||||
eoa: Address, | ||||||||||||||||||||||||||||||||||||||||
chain_id: u64, | ||||||||||||||||||||||||||||||||||||||||
worker_id: &str, | ||||||||||||||||||||||||||||||||||||||||
transaction_id: &str, | ||||||||||||||||||||||||||||||||||||||||
nonce: u64, | ||||||||||||||||||||||||||||||||||||||||
failure_reason: &str, | ||||||||||||||||||||||||||||||||||||||||
) -> Result<(), TransactionStoreError> { | ||||||||||||||||||||||||||||||||||||||||
self.with_lock_check(eoa, chain_id, worker_id, |pipeline| { | ||||||||||||||||||||||||||||||||||||||||
let borrowed_key = self.borrowed_transactions_hashmap_name(eoa, chain_id); | ||||||||||||||||||||||||||||||||||||||||
let tx_data_key = self.transaction_data_key_name(transaction_id); | ||||||||||||||||||||||||||||||||||||||||
let completed_eoa_key = self.completed_transactions_per_eoa_key_name(eoa, chain_id); | ||||||||||||||||||||||||||||||||||||||||
let completed_global_key = self.completed_transactions_global_key_name(); | ||||||||||||||||||||||||||||||||||||||||
let now = chrono::Utc::now().timestamp_millis().max(0) as u64; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
// Remove from borrowed state using the known nonce | ||||||||||||||||||||||||||||||||||||||||
pipeline.hdel(&borrowed_key, nonce.to_string()); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
// Update transaction data with failure | ||||||||||||||||||||||||||||||||||||||||
pipeline.hset(&tx_data_key, "completed_at", now); | ||||||||||||||||||||||||||||||||||||||||
pipeline.hset(&tx_data_key, "failure_reason", failure_reason); | ||||||||||||||||||||||||||||||||||||||||
pipeline.hset(&tx_data_key, "status", "failed"); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
// Add to completed transactions tracking for pruning | ||||||||||||||||||||||||||||||||||||||||
pipeline.zadd(&completed_eoa_key, transaction_id, now); | ||||||||||||||||||||||||||||||||||||||||
pipeline.zadd(&completed_global_key, transaction_id, now); | ||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||
.await | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
/// Prune old completed transactions for a specific EOA if it exceeds the cap | ||||||||||||||||||||||||||||||||||||||||
pub async fn prune_completed_transactions_for_eoa( | ||||||||||||||||||||||||||||||||||||||||
&self, | ||||||||||||||||||||||||||||||||||||||||
eoa: Address, | ||||||||||||||||||||||||||||||||||||||||
chain_id: u64, | ||||||||||||||||||||||||||||||||||||||||
max_per_eoa: u64, | ||||||||||||||||||||||||||||||||||||||||
batch_size: u64, | ||||||||||||||||||||||||||||||||||||||||
) -> Result<u64, TransactionStoreError> { | ||||||||||||||||||||||||||||||||||||||||
let completed_eoa_key = self.completed_transactions_per_eoa_key_name(eoa, chain_id); | ||||||||||||||||||||||||||||||||||||||||
let completed_global_key = self.completed_transactions_global_key_name(); | ||||||||||||||||||||||||||||||||||||||||
let mut conn = self.redis.clone(); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
// Check current count | ||||||||||||||||||||||||||||||||||||||||
let current_count: u64 = conn.zcard(&completed_eoa_key).await?; | ||||||||||||||||||||||||||||||||||||||||
if current_count <= max_per_eoa { | ||||||||||||||||||||||||||||||||||||||||
return Ok(0); // No pruning needed | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
let to_remove = current_count - max_per_eoa; | ||||||||||||||||||||||||||||||||||||||||
let batch_to_remove = to_remove.min(batch_size); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
// Get oldest transactions (lowest scores) | ||||||||||||||||||||||||||||||||||||||||
let oldest_transactions: Vec<String> = conn | ||||||||||||||||||||||||||||||||||||||||
.zrange(&completed_eoa_key, 0, (batch_to_remove - 1) as isize) | ||||||||||||||||||||||||||||||||||||||||
.await?; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
if oldest_transactions.is_empty() { | ||||||||||||||||||||||||||||||||||||||||
return Ok(0); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
// Remove transaction data and tracking | ||||||||||||||||||||||||||||||||||||||||
for transaction_id in &oldest_transactions { | ||||||||||||||||||||||||||||||||||||||||
let tx_data_key = self.transaction_data_key_name(transaction_id); | ||||||||||||||||||||||||||||||||||||||||
let _: () = conn.del(&tx_data_key).await?; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
// Remove from tracking sets | ||||||||||||||||||||||||||||||||||||||||
for transaction_id in &oldest_transactions { | ||||||||||||||||||||||||||||||||||||||||
let _: () = conn.zrem(&completed_eoa_key, transaction_id).await?; | ||||||||||||||||||||||||||||||||||||||||
let _: () = conn.zrem(&completed_global_key, transaction_id).await?; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
Ok(oldest_transactions.len() as u64) | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
/// Prune old completed transactions globally if it exceeds the global cap | ||||||||||||||||||||||||||||||||||||||||
pub async fn prune_completed_transactions_globally( | ||||||||||||||||||||||||||||||||||||||||
&self, | ||||||||||||||||||||||||||||||||||||||||
max_global: u64, | ||||||||||||||||||||||||||||||||||||||||
batch_size: u64, | ||||||||||||||||||||||||||||||||||||||||
) -> Result<u64, TransactionStoreError> { | ||||||||||||||||||||||||||||||||||||||||
let completed_global_key = self.completed_transactions_global_key_name(); | ||||||||||||||||||||||||||||||||||||||||
let mut conn = self.redis.clone(); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
// Check current count | ||||||||||||||||||||||||||||||||||||||||
let current_count: u64 = conn.zcard(&completed_global_key).await?; | ||||||||||||||||||||||||||||||||||||||||
if current_count <= max_global { | ||||||||||||||||||||||||||||||||||||||||
return Ok(0); // No pruning needed | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
let to_remove = current_count - max_global; | ||||||||||||||||||||||||||||||||||||||||
let batch_to_remove = to_remove.min(batch_size); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
// Get oldest transactions (lowest scores) | ||||||||||||||||||||||||||||||||||||||||
let oldest_transactions: Vec<String> = conn | ||||||||||||||||||||||||||||||||||||||||
.zrange(&completed_global_key, 0, (batch_to_remove - 1) as isize) | ||||||||||||||||||||||||||||||||||||||||
.await?; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
if oldest_transactions.is_empty() { | ||||||||||||||||||||||||||||||||||||||||
return Ok(0); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
// Remove transaction data | ||||||||||||||||||||||||||||||||||||||||
for transaction_id in &oldest_transactions { | ||||||||||||||||||||||||||||||||||||||||
let tx_data_key = self.transaction_data_key_name(transaction_id); | ||||||||||||||||||||||||||||||||||||||||
let _: () = conn.del(&tx_data_key).await?; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
// Remove from global tracking | ||||||||||||||||||||||||||||||||||||||||
for transaction_id in &oldest_transactions { | ||||||||||||||||||||||||||||||||||||||||
let _: () = conn.zrem(&completed_global_key, transaction_id).await?; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
// Also remove from per-EOA tracking sets (we need to scan for these) | ||||||||||||||||||||||||||||||||||||||||
// Note: This is less efficient but necessary to keep consistency | ||||||||||||||||||||||||||||||||||||||||
self.remove_transactions_from_all_eoa_tracking(&oldest_transactions) | ||||||||||||||||||||||||||||||||||||||||
.await?; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
Ok(oldest_transactions.len() as u64) | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
/// Helper to remove transactions from all EOA tracking sets | ||||||||||||||||||||||||||||||||||||||||
async fn remove_transactions_from_all_eoa_tracking( | ||||||||||||||||||||||||||||||||||||||||
&self, | ||||||||||||||||||||||||||||||||||||||||
transaction_ids: &[String], | ||||||||||||||||||||||||||||||||||||||||
) -> Result<(), TransactionStoreError> { | ||||||||||||||||||||||||||||||||||||||||
let mut conn = self.redis.clone(); | ||||||||||||||||||||||||||||||||||||||||
let pattern = match &self.namespace { | ||||||||||||||||||||||||||||||||||||||||
Some(ns) => format!("{ns}:eoa_executor:completed:*"), | ||||||||||||||||||||||||||||||||||||||||
None => "eoa_executor:completed:*".to_string(), | ||||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
// Get all EOA-specific completed transaction keys | ||||||||||||||||||||||||||||||||||||||||
let keys: Vec<String> = conn.keys(&pattern).await?; | ||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Replace KEYS command with SCAN to avoid blocking Redis. Using Replace the KEYS usage with SCAN: - // Get all EOA-specific completed transaction keys
- let keys: Vec<String> = conn.keys(&pattern).await?;
+ // Get all EOA-specific completed transaction keys using SCAN
+ let mut keys = Vec::new();
+ let mut cursor = 0u64;
+ loop {
+ let (new_cursor, batch): (u64, Vec<String>) = twmq::redis::cmd("SCAN")
+ .arg(cursor)
+ .arg("MATCH")
+ .arg(&pattern)
+ .arg("COUNT")
+ .arg(100)
+ .query_async(&mut conn)
+ .await?;
+ keys.extend(batch);
+ cursor = new_cursor;
+ if cursor == 0 {
+ break;
+ }
+ } 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents
|
||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
// Remove transactions from each EOA tracking set | ||||||||||||||||||||||||||||||||||||||||
for key in keys { | ||||||||||||||||||||||||||||||||||||||||
for transaction_id in transaction_ids { | ||||||||||||||||||||||||||||||||||||||||
let _: () = conn.zrem(&key, transaction_id).await?; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
Ok(()) | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
// Additional error types | ||||||||||||||||||||||||||||||||||||||||
|
@@ -1974,7 +2193,7 @@ impl<'a> ScopedEoaExecutorStore<'a> { | |||||||||||||||||||||||||||||||||||||||
/// Efficiently batch fail and requeue multiple transactions | ||||||||||||||||||||||||||||||||||||||||
pub async fn batch_fail_and_requeue_transactions( | ||||||||||||||||||||||||||||||||||||||||
&self, | ||||||||||||||||||||||||||||||||||||||||
failures: Vec<crate::eoa::worker::TransactionFailure>, | ||||||||||||||||||||||||||||||||||||||||
failures: Vec<crate::eoa::worker::TransactionReplacement>, | ||||||||||||||||||||||||||||||||||||||||
) -> Result<(), TransactionStoreError> { | ||||||||||||||||||||||||||||||||||||||||
self.store | ||||||||||||||||||||||||||||||||||||||||
.batch_fail_and_requeue_transactions(self.eoa, self.chain_id, &self.worker_id, failures) | ||||||||||||||||||||||||||||||||||||||||
|
@@ -2153,4 +2372,58 @@ impl<'a> ScopedEoaExecutorStore<'a> { | |||||||||||||||||||||||||||||||||||||||
) -> Result<Option<TransactionData>, TransactionStoreError> { | ||||||||||||||||||||||||||||||||||||||||
self.store.get_transaction_data(transaction_id).await | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
/// Get count of submitted transactions awaiting confirmation | ||||||||||||||||||||||||||||||||||||||||
pub async fn get_submitted_transactions_count(&self) -> Result<u64, TransactionStoreError> { | ||||||||||||||||||||||||||||||||||||||||
self.store | ||||||||||||||||||||||||||||||||||||||||
.get_submitted_transactions_count(self.eoa, self.chain_id) | ||||||||||||||||||||||||||||||||||||||||
.await | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
/// Fail a transaction that's in the pending queue | ||||||||||||||||||||||||||||||||||||||||
pub async fn fail_pending_transaction( | ||||||||||||||||||||||||||||||||||||||||
&self, | ||||||||||||||||||||||||||||||||||||||||
transaction_id: &str, | ||||||||||||||||||||||||||||||||||||||||
failure_reason: &str, | ||||||||||||||||||||||||||||||||||||||||
) -> Result<(), TransactionStoreError> { | ||||||||||||||||||||||||||||||||||||||||
self.store | ||||||||||||||||||||||||||||||||||||||||
.fail_pending_transaction( | ||||||||||||||||||||||||||||||||||||||||
self.eoa, | ||||||||||||||||||||||||||||||||||||||||
self.chain_id, | ||||||||||||||||||||||||||||||||||||||||
&self.worker_id, | ||||||||||||||||||||||||||||||||||||||||
transaction_id, | ||||||||||||||||||||||||||||||||||||||||
failure_reason, | ||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||
.await | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
/// Fail a transaction that's in the borrowed state (we know the nonce) | ||||||||||||||||||||||||||||||||||||||||
pub async fn fail_borrowed_transaction( | ||||||||||||||||||||||||||||||||||||||||
&self, | ||||||||||||||||||||||||||||||||||||||||
transaction_id: &str, | ||||||||||||||||||||||||||||||||||||||||
nonce: u64, | ||||||||||||||||||||||||||||||||||||||||
failure_reason: &str, | ||||||||||||||||||||||||||||||||||||||||
) -> Result<(), TransactionStoreError> { | ||||||||||||||||||||||||||||||||||||||||
self.store | ||||||||||||||||||||||||||||||||||||||||
.fail_borrowed_transaction( | ||||||||||||||||||||||||||||||||||||||||
self.eoa, | ||||||||||||||||||||||||||||||||||||||||
self.chain_id, | ||||||||||||||||||||||||||||||||||||||||
&self.worker_id, | ||||||||||||||||||||||||||||||||||||||||
transaction_id, | ||||||||||||||||||||||||||||||||||||||||
nonce, | ||||||||||||||||||||||||||||||||||||||||
failure_reason, | ||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||
.await | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
/// Prune old completed transactions for this EOA if it exceeds the cap | ||||||||||||||||||||||||||||||||||||||||
pub async fn prune_completed_transactions( | ||||||||||||||||||||||||||||||||||||||||
&self, | ||||||||||||||||||||||||||||||||||||||||
max_per_eoa: u64, | ||||||||||||||||||||||||||||||||||||||||
batch_size: u64, | ||||||||||||||||||||||||||||||||||||||||
) -> Result<u64, TransactionStoreError> { | ||||||||||||||||||||||||||||||||||||||||
self.store | ||||||||||||||||||||||||||||||||||||||||
.prune_completed_transactions_for_eoa(self.eoa, self.chain_id, max_per_eoa, batch_size) | ||||||||||||||||||||||||||||||||||||||||
.await | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Batch Redis operations in pruning methods for better performance.
The pruning methods execute individual
del
andzrem
operations in loops, which is inefficient. For better performance, use Redis pipelines to batch these operations.Apply this pattern to both pruning methods:
Also applies to: 1894-1902
🤖 Prompt for AI Agents