Skip to content

Commit 65c88ef

Browse files
committed
Move peer discovery from "services" to "peers-clear"
This is necessary to implement TLS as service only report the clear service port. It will be easy to expand the "services_name" to return peers-tls
1 parent 6ac1a06 commit 65c88ef

File tree

5 files changed

+146
-32
lines changed

5 files changed

+146
-32
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ error-chain = "0.12"
3535
parking_lot = "0.9"
3636
pwhash = "0.3"
3737
serde = { version = "1.0", features = ["derive"], optional = true }
38+
logos = "0.12.0"
3839

3940
[features]
4041
serialization = ["serde"]

src/cluster/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub mod node;
1717
pub mod node_validator;
1818
pub mod partition;
1919
pub mod partition_tokenizer;
20+
pub mod peers;
2021

2122
use std::collections::HashMap;
2223
use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering};

src/cluster/node.rs

Lines changed: 5 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ use crate::errors::{ErrorKind, Result, ResultExt};
3030
use crate::net::{ConnectionPool, Host, PooledConnection};
3131
use crate::policy::ClientPolicy;
3232

33+
use super::peers::parse_peers_info;
34+
3335
pub const PARTITIONS: usize = 4096;
3436

3537
#[derive(Debug)]
@@ -138,9 +140,9 @@ impl Node {
138140

139141
const fn services_name(&self) -> &'static str {
140142
if self.client_policy.use_services_alternate {
141-
"services-alternate"
143+
"peers-clear-alt"
142144
} else {
143-
"services"
145+
"peers-clear-std"
144146
}
145147
}
146148

@@ -197,35 +199,7 @@ impl Node {
197199
Some(friend_string) => friend_string,
198200
};
199201

200-
let friend_names = friend_string.split(';');
201-
for friend in friend_names {
202-
let mut friend_info = friend.split(':');
203-
if friend_info.clone().count() != 2 {
204-
error!(
205-
"Node info from asinfo:services is malformed. Expected HOST:PORT, but got \
206-
'{}'",
207-
friend
208-
);
209-
continue;
210-
}
211-
212-
let host = friend_info.next().unwrap();
213-
let port = u16::from_str(friend_info.next().unwrap())?;
214-
let alias = match self.client_policy.ip_map {
215-
Some(ref ip_map) if ip_map.contains_key(host) => {
216-
Host::new(ip_map.get(host).unwrap(), port)
217-
}
218-
_ => Host::new(host, port),
219-
};
220-
221-
if current_aliases.contains_key(&alias) {
222-
self.reference_count.fetch_add(1, Ordering::Relaxed);
223-
} else if !friends.contains(&alias) {
224-
friends.push(alias);
225-
}
226-
}
227-
228-
Ok(friends)
202+
parse_peers_info(friend_string)
229203
}
230204

231205
fn update_partitions(&self, info_map: &HashMap<String, String>) -> Result<()> {

src/cluster/peers.rs

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
use crate::errors::{ErrorKind, Result};
2+
use crate::net::{Host, ToHosts};
3+
use logos::{Lexer, Logos};
4+
5+
#[derive(Logos, Debug, PartialEq)]
6+
enum Token {
7+
#[token("[")]
8+
OpenBracket,
9+
10+
#[token("]")]
11+
CloseBracket,
12+
13+
#[regex("[0-9a-zA-Z-./_: ]+")]
14+
Text,
15+
16+
#[error]
17+
#[regex(r"[,]+", logos::skip)]
18+
Error,
19+
}
20+
21+
fn parse_error(lex: &Lexer<Token>, source: &str) -> String {
22+
format!(
23+
"Failed to parse peers: {}, at {:?} ({})",
24+
source,
25+
lex.span(),
26+
lex.slice()
27+
)
28+
}
29+
30+
pub fn parse_peers_info(info_peers: &str) -> Result<Vec<Host>> {
31+
let mut lex = Token::lexer(info_peers);
32+
33+
let _peer_gen = match lex.next() {
34+
Some(Token::Text) => lex.slice(),
35+
_ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))),
36+
};
37+
let default_port_str = match lex.next() {
38+
Some(Token::Text) => lex.slice(),
39+
_ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))),
40+
};
41+
42+
let default_port = match default_port_str.parse::<u16>() {
43+
Ok(port) => port,
44+
Err(_) => bail!(ErrorKind::BadResponse(format!(
45+
"Invalid default port: {}",
46+
default_port_str
47+
))),
48+
};
49+
50+
match lex.next() {
51+
Some(Token::OpenBracket) => parse_peers(info_peers, &mut lex, default_port),
52+
_ => Ok(Vec::new()),
53+
}
54+
}
55+
56+
fn parse_peers(info_peers: &str, lex: &mut Lexer<Token>, default_port: u16) -> Result<Vec<Host>> {
57+
let mut peers = Vec::new();
58+
loop {
59+
match lex.next() {
60+
Some(Token::OpenBracket) => peers.extend(parse_peer(info_peers, lex, default_port)?),
61+
Some(Token::CloseBracket) => return Ok(peers),
62+
_ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))),
63+
}
64+
lex.next(); // Close brackets
65+
}
66+
}
67+
68+
fn parse_peer(info_peers: &str, lex: &mut Lexer<Token>, default_port: u16) -> Result<Vec<Host>> {
69+
let _id = match lex.next() {
70+
Some(Token::Text) => lex.slice(),
71+
_ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))),
72+
};
73+
74+
let mut token = lex.next();
75+
if Some(Token::Text) == token {
76+
let _tls_hostname = lex.slice();
77+
token = lex.next();
78+
}
79+
80+
match token {
81+
Some(Token::OpenBracket) => (),
82+
_ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))),
83+
};
84+
85+
let hosts = match lex.next() {
86+
Some(Token::Text) => lex.slice(),
87+
_ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))),
88+
}
89+
.to_hosts_with_default_port(default_port)?;
90+
91+
lex.next(); // Close brackets
92+
Ok(hosts)
93+
}
94+
95+
#[cfg(test)]
96+
mod tests {
97+
use std::vec;
98+
99+
use super::*;
100+
101+
#[test]
102+
fn parse_peers_works() {
103+
let work = "6,3000,[[12A0,aerospike.com,[1.2.3.4:4333]],[BB9040011AC4202,,[10.11.12.13]],[11A1,,[localhost]]]";
104+
let fail = "6,3foobar,[[12A0,aerospike.com,[1.2.3.4:4333]],[11A1,,[10.11.12.13:4333]]]";
105+
let empty = "6,3000,[]";
106+
assert!(parse_peers_info(fail).is_err());
107+
let work = parse_peers_info(work).unwrap();
108+
println!("{:?}", work);
109+
assert!(
110+
work == vec![
111+
Host {
112+
name: "1.2.3.4".to_string(),
113+
port: 4333
114+
},
115+
Host {
116+
name: "10.11.12.13".to_string(),
117+
port: 3000
118+
},
119+
Host {
120+
name: "localhost".to_string(),
121+
port: 3000
122+
}
123+
]
124+
);
125+
let empty = parse_peers_info(empty).unwrap();
126+
assert!(empty == vec![]);
127+
}
128+
}

src/net/host.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,17 +67,24 @@ pub trait ToHosts {
6767
///
6868
/// Any errors encountered during conversion will be returned as an `Err`.
6969
fn to_hosts(&self) -> Result<Vec<Host>>;
70+
fn to_hosts_with_default_port(&self, default_port: u16) -> Result<Vec<Host>>;
7071
}
7172

7273
impl ToHosts for Vec<Host> {
7374
fn to_hosts(&self) -> Result<Vec<Host>> {
7475
Ok(self.clone())
7576
}
77+
fn to_hosts_with_default_port(&self, default_port: u16) -> Result<Vec<Host>> {
78+
Ok(self.clone())
79+
}
7680
}
7781

7882
impl ToHosts for String {
7983
fn to_hosts(&self) -> Result<Vec<Host>> {
80-
let mut parser = Parser::new(self, 3000);
84+
self.to_hosts_with_default_port(3000)
85+
}
86+
fn to_hosts_with_default_port(&self, default_port: u16) -> Result<Vec<Host>> {
87+
let mut parser = Parser::new(self, default_port);
8188
parser
8289
.read_hosts()
8390
.chain_err(|| ErrorKind::InvalidArgument(format!("Invalid hosts list: '{}'", self)))
@@ -88,6 +95,9 @@ impl<'a> ToHosts for &'a str {
8895
fn to_hosts(&self) -> Result<Vec<Host>> {
8996
(*self).to_string().to_hosts()
9097
}
98+
fn to_hosts_with_default_port(&self, default_port: u16) -> Result<Vec<Host>> {
99+
(*self).to_string().to_hosts_with_default_port(default_port)
100+
}
91101
}
92102

93103
#[cfg(test)]

0 commit comments

Comments
 (0)