Skip to content

Commit 4fb2574

Browse files
fubhyclaude
andcommitted
node, chain: Add extensible compression support for RPC requests
- Replace boolean compression_enabled with Compression enum (None, Gzip) - Support per-provider compression configuration via "compression" field - Add placeholders for future compression methods (Brotli, Deflate) - Update transport layer to handle compression enum with match statement - Add comprehensive unit tests for compression configuration parsing - Update example configuration and documentation Configuration examples: compression = "gzip" # Enable gzip compression compression = "none" # Disable compression (default) Addresses issue #5671 with future-extensible design. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent d4ddfaf commit 4fb2574

File tree

8 files changed

+229
-7
lines changed

8 files changed

+229
-7
lines changed

Cargo.lock

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

chain/ethereum/src/transport.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use graph::components::network_provider::ProviderName;
2-
use graph::endpoint::{EndpointMetrics, RequestLabels};
2+
use graph::endpoint::{Compression, EndpointMetrics, RequestLabels};
33
use jsonrpc_core::types::Call;
44
use jsonrpc_core::Value;
55

@@ -54,12 +54,25 @@ impl Transport {
5454
headers: graph::http::HeaderMap,
5555
metrics: Arc<EndpointMetrics>,
5656
provider: impl AsRef<str>,
57+
compression: Compression,
5758
) -> Self {
5859
// Unwrap: This only fails if something is wrong with the system's TLS config.
59-
let client = reqwest::Client::builder()
60-
.default_headers(headers)
61-
.build()
62-
.unwrap();
60+
let mut client_builder = reqwest::Client::builder().default_headers(headers);
61+
62+
match compression {
63+
Compression::Gzip => {
64+
// Enable gzip compression/decompression for requests and responses
65+
client_builder = client_builder.gzip(true);
66+
}
67+
Compression::None => {
68+
// No compression
69+
} // Future compression methods can be handled here:
70+
// Compression::Brotli => {
71+
// client_builder = client_builder.brotli(true);
72+
// }
73+
}
74+
75+
let client = client_builder.build().unwrap();
6376

6477
Transport::RPC {
6578
client: http::Http::with_client(client, rpc),

graph/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ chrono = "0.4.41"
2626
envconfig = "0.11.0"
2727
Inflector = "0.11.3"
2828
atty = "0.2"
29-
reqwest = { version = "0.12.15", features = ["json", "stream", "multipart"] }
29+
reqwest = { version = "0.12.15", features = ["json", "stream", "multipart", "gzip"] }
3030
ethabi = "17.2"
3131
hex = "0.4.3"
3232
http0 = { version = "0", package = "http" }

graph/src/endpoint.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::{
77
};
88

99
use prometheus::IntCounterVec;
10+
use serde::{Deserialize, Serialize};
1011
use slog::{warn, Logger};
1112

1213
use crate::components::network_provider::ProviderName;
@@ -17,6 +18,26 @@ use crate::{components::metrics::MetricsRegistry, data::value::Word};
1718
/// avoid locking since we don't need to modify the entire struture.
1819
type ProviderCount = Arc<HashMap<ProviderName, AtomicU64>>;
1920

21+
/// Compression methods for RPC transports
22+
#[derive(Copy, Clone, Debug, Deserialize, Serialize, PartialEq)]
23+
pub enum Compression {
24+
#[serde(rename = "none")]
25+
None,
26+
#[serde(rename = "gzip")]
27+
Gzip,
28+
// Future compression methods can be added here:
29+
// #[serde(rename = "brotli")]
30+
// Brotli,
31+
// #[serde(rename = "deflate")]
32+
// Deflate,
33+
}
34+
35+
impl Default for Compression {
36+
fn default() -> Self {
37+
Compression::None
38+
}
39+
}
40+
2041
/// This struct represents all the current labels except for the result
2142
/// which is added separately. If any new labels are necessary they should
2243
/// remain in the same order as added in [`EndpointMetrics::new`]
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
# Plan: Implement Extensible Compression for RPC Requests
2+
3+
## Overview
4+
Add extensible compression support for Graph Node's outgoing RPC requests to upstream providers, configurable on a per-provider basis with future compression methods in mind.
5+
6+
## Implementation Steps (COMPLETED)
7+
8+
### 1. ✅ Create Compression Enum (`node/src/config.rs`)
9+
- Added `Compression` enum with `None` and `Gzip` variants
10+
- Commented placeholders for future compression methods (Brotli, Deflate)
11+
- Default implementation returns `Compression::None`
12+
13+
### 2. ✅ Update Configuration Structure (`node/src/config.rs`)
14+
- Replaced `compression_enabled: bool` with `compression: Compression` field in `Web3Provider` struct
15+
- Updated all existing code to use new enum
16+
- Added unit tests for both "gzip" and "none" compression options
17+
18+
### 3. ✅ Modify HTTP Transport (`chain/ethereum/src/transport.rs`)
19+
- Updated `Transport::new_rpc()` to accept `Compression` enum parameter
20+
- Implemented match statement for different compression types
21+
- Added comments showing where future compression methods can be added
22+
- Uses reqwest's `.gzip(true)` for automatic compression/decompression
23+
24+
### 4. ✅ Update Transport Creation (`node/src/chain.rs`)
25+
- Pass compression enum from config to transport
26+
- Updated logging to show compression method using debug format
27+
28+
### 5. ✅ Update Dependencies (`graph/Cargo.toml`)
29+
- Added "gzip" feature to reqwest dependency
30+
31+
### 6. ✅ Update Test Configuration
32+
- Updated `full_config.toml` example to use new enum format
33+
- Added comprehensive unit tests for compression parsing
34+
35+
## Configuration Examples
36+
37+
### Gzip Compression
38+
```toml
39+
[chains.mainnet]
40+
provider = [
41+
{
42+
label = "mainnet-rpc",
43+
details = {
44+
type = "web3",
45+
url = "http://rpc.example.com",
46+
features = ["archive"],
47+
compression = "gzip"
48+
}
49+
}
50+
]
51+
```
52+
53+
### No Compression (Default)
54+
```toml
55+
[chains.mainnet]
56+
provider = [
57+
{
58+
label = "mainnet-rpc",
59+
details = {
60+
type = "web3",
61+
url = "http://rpc.example.com",
62+
features = ["archive"],
63+
compression = "none" # or omit entirely
64+
}
65+
}
66+
]
67+
```
68+
69+
### Future Extension Example
70+
```rust
71+
// Future compression methods can be easily added:
72+
#[derive(Copy, Clone, Debug, Deserialize, Serialize, PartialEq)]
73+
pub enum Compression {
74+
#[serde(rename = "none")]
75+
None,
76+
#[serde(rename = "gzip")]
77+
Gzip,
78+
#[serde(rename = "brotli")]
79+
Brotli,
80+
#[serde(rename = "deflate")]
81+
Deflate,
82+
}
83+
84+
// And handled in transport:
85+
match compression {
86+
Compression::Gzip => client_builder = client_builder.gzip(true),
87+
Compression::Brotli => client_builder = client_builder.brotli(true),
88+
Compression::Deflate => client_builder = client_builder.deflate(true),
89+
Compression::None => {} // No compression
90+
}
91+
```
92+
93+
## Benefits of This Implementation
94+
- **Extensible**: Easy to add new compression methods without breaking changes
95+
- **Backward Compatible**: Defaults to no compression, existing configs work unchanged
96+
- **Type Safe**: Enum prevents invalid compression method strings
97+
- **Future Proof**: Clear pattern for adding Brotli, Deflate, etc.
98+
- **Per-Provider**: Each RPC provider can have different compression settings

node/resources/tests/full_config.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ shard = "primary"
4848
provider = [
4949
{ label = "mainnet-0", url = "http://rpc.mainnet.io", features = ["archive", "traces"] },
5050
{ label = "mainnet-1", details = { type = "web3call", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }},
51+
{ label = "mainnet-2", details = { type = "web3", url = "http://rpc.mainnet.io", features = ["archive"], compression = "gzip" }},
5152
{ label = "firehose", details = { type = "firehose", url = "http://localhost:9000", features = [] }},
5253
{ label = "substreams", details = { type = "substreams", url = "http://localhost:9000", features = [] }},
5354
]

node/src/chain.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,8 @@ pub async fn create_ethereum_networks_for_chain(
282282
logger,
283283
"Creating transport";
284284
"url" => &web3.url,
285-
"capabilities" => capabilities
285+
"capabilities" => capabilities,
286+
"compression" => ?web3.compression
286287
);
287288

288289
use crate::config::Transport::*;
@@ -293,6 +294,7 @@ pub async fn create_ethereum_networks_for_chain(
293294
web3.headers.clone(),
294295
endpoint_metrics.cheap_clone(),
295296
&provider.label,
297+
web3.compression,
296298
),
297299
Ipc => Transport::new_ipc(&web3.url).await,
298300
Ws => Transport::new_ws(&web3.url).await,

node/src/config.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use graph::{
22
anyhow::Error,
33
blockchain::BlockchainKind,
44
components::network_provider::ChainName,
5+
endpoint::Compression,
56
env::ENV_VARS,
67
firehose::{SubgraphLimit, SUBGRAPHS_PER_CONN},
78
itertools::Itertools,
@@ -502,6 +503,7 @@ impl ChainSection {
502503
features,
503504
headers: Default::default(),
504505
rules: vec![],
506+
compression: Compression::None,
505507
}),
506508
};
507509
let entry = chains.entry(name.to_string()).or_insert_with(|| Chain {
@@ -705,6 +707,10 @@ pub struct Web3Provider {
705707

706708
#[serde(default, rename = "match")]
707709
rules: Vec<Web3Rule>,
710+
711+
/// Compression method for RPC requests and responses
712+
#[serde(default)]
713+
pub compression: Compression,
708714
}
709715

710716
impl Web3Provider {
@@ -901,6 +907,7 @@ impl<'de> Deserialize<'de> for Provider {
901907
.ok_or_else(|| serde::de::Error::missing_field("features"))?,
902908
headers: headers.unwrap_or_else(HeaderMap::new),
903909
rules: nodes,
910+
compression: Compression::None,
904911
}),
905912
};
906913

@@ -944,6 +951,7 @@ pub enum Transport {
944951
Ipc,
945952
}
946953

954+
947955
impl Default for Transport {
948956
fn default() -> Self {
949957
Self::Rpc
@@ -1307,6 +1315,7 @@ mod tests {
13071315
features: BTreeSet::new(),
13081316
headers: HeaderMap::new(),
13091317
rules: Vec::new(),
1318+
compression: Compression::None,
13101319
}),
13111320
},
13121321
actual
@@ -1333,6 +1342,7 @@ mod tests {
13331342
features: BTreeSet::new(),
13341343
headers: HeaderMap::new(),
13351344
rules: Vec::new(),
1345+
compression: Compression::None,
13361346
}),
13371347
},
13381348
actual
@@ -1440,6 +1450,7 @@ mod tests {
14401450
features,
14411451
headers,
14421452
rules: Vec::new(),
1453+
compression: Compression::None,
14431454
}),
14441455
},
14451456
actual
@@ -1465,6 +1476,7 @@ mod tests {
14651476
features: BTreeSet::new(),
14661477
headers: HeaderMap::new(),
14671478
rules: Vec::new(),
1479+
compression: Compression::None,
14681480
}),
14691481
},
14701482
actual
@@ -1834,6 +1846,7 @@ mod tests {
18341846
features: BTreeSet::new(),
18351847
headers: HeaderMap::new(),
18361848
rules: Vec::new(),
1849+
compression: Compression::None,
18371850
}),
18381851
},
18391852
actual
@@ -1846,6 +1859,66 @@ mod tests {
18461859
assert!(SubgraphLimit::Limit(10) > SubgraphLimit::Disabled);
18471860
}
18481861

1862+
#[test]
1863+
fn it_parses_web3_provider_with_compression() {
1864+
let actual = toml::from_str(
1865+
r#"
1866+
label = "compressed"
1867+
details = { type = "web3", url = "http://localhost:8545", features = ["archive"], compression = "gzip" }
1868+
"#,
1869+
)
1870+
.unwrap();
1871+
1872+
assert_eq!(
1873+
Provider {
1874+
label: "compressed".to_owned(),
1875+
details: ProviderDetails::Web3(Web3Provider {
1876+
transport: Transport::Rpc,
1877+
url: "http://localhost:8545".to_owned(),
1878+
features: {
1879+
let mut features = BTreeSet::new();
1880+
features.insert("archive".to_string());
1881+
features
1882+
},
1883+
headers: HeaderMap::new(),
1884+
rules: Vec::new(),
1885+
compression: Compression::Gzip,
1886+
}),
1887+
},
1888+
actual
1889+
);
1890+
}
1891+
1892+
#[test]
1893+
fn it_parses_web3_provider_with_no_compression() {
1894+
let actual = toml::from_str(
1895+
r#"
1896+
label = "uncompressed"
1897+
details = { type = "web3", url = "http://localhost:8545", features = ["archive"], compression = "none" }
1898+
"#,
1899+
)
1900+
.unwrap();
1901+
1902+
assert_eq!(
1903+
Provider {
1904+
label: "uncompressed".to_owned(),
1905+
details: ProviderDetails::Web3(Web3Provider {
1906+
transport: Transport::Rpc,
1907+
url: "http://localhost:8545".to_owned(),
1908+
features: {
1909+
let mut features = BTreeSet::new();
1910+
features.insert("archive".to_string());
1911+
features
1912+
},
1913+
headers: HeaderMap::new(),
1914+
rules: Vec::new(),
1915+
compression: Compression::None,
1916+
}),
1917+
},
1918+
actual
1919+
);
1920+
}
1921+
18491922
#[test]
18501923
fn duplicated_labels_are_not_allowed_within_chain() {
18511924
let mut actual = toml::from_str::<ChainSection>(

0 commit comments

Comments
 (0)