Skip to content

Commit 4bf44c7

Browse files
committed
Merge tag '2.16.2'
version 2.16.2
2 parents 2f746d5 + 90df2c9 commit 4bf44c7

27 files changed

+455
-314
lines changed

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
1+
2.16.2
2+
===========
3+
4+
Bug Fixes
5+
--------
6+
* [CPP-946] Core dump on unclean event loop shutdown
7+
* [PR #513] Fix SNI events
8+
* [PR #518] Replace deprecated function for OpenSSL >= 3.0
9+
110
2.16.1
211
===========
312

include/cassandra.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353

5454
#define CASS_VERSION_MAJOR 2
5555
#define CASS_VERSION_MINOR 16
56-
#define CASS_VERSION_PATCH 1
56+
#define CASS_VERSION_PATCH 2
5757
#define CASS_VERSION_SUFFIX ""
5858

5959
#ifdef __cplusplus

src/address_factory.cpp

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020

2121
using namespace datastax::internal::core;
2222

23-
bool DefaultAddressFactory::create(const Row* peers_row, const Host::Ptr& connected_host,
24-
Address* output) {
23+
bool AddressFactory::create(const Row* peers_row, const Host::Ptr& connected_host,
24+
Address* output) {
2525
Address connected_address = connected_host->address();
2626
const Value* peer_value = peers_row->get_by_name("peer");
2727
const Value* rpc_value = peers_row->get_by_name("rpc_address");
@@ -59,6 +59,12 @@ bool DefaultAddressFactory::create(const Row* peers_row, const Host::Ptr& connec
5959
return true;
6060
}
6161

62+
bool AddressFactory::is_peer(const Row* peers_row, const Host::Ptr& connected_host,
63+
const Address& expected) {
64+
Address address;
65+
return create(peers_row, connected_host, &address) && address == expected;
66+
}
67+
6268
bool SniAddressFactory::create(const Row* peers_row, const Host::Ptr& connected_host,
6369
Address* output) {
6470
CassUuid host_id;
@@ -78,3 +84,14 @@ bool SniAddressFactory::create(const Row* peers_row, const Host::Ptr& connected_
7884
connected_host->address().port(), to_string(host_id));
7985
return true;
8086
}
87+
88+
bool SniAddressFactory::is_peer(const Row* peers_row, const Host::Ptr& connected_host,
89+
const Address& expected) {
90+
const Value* value = peers_row->get_by_name("rpc_address");
91+
Address rpc_address;
92+
if (!value ||
93+
!value->decoder().as_inet(value->size(), connected_host->address().port(), &rpc_address)) {
94+
return false;
95+
}
96+
return rpc_address == expected;
97+
}

src/address_factory.hpp

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,15 @@ namespace datastax { namespace internal { namespace core {
2626
class Row;
2727

2828
/**
29-
* An interface for constructing `Address` from `system.local`/`system.peers` row data.
29+
* An address factory that creates `Address` using the `rpc_address` column.
3030
*/
3131
class AddressFactory : public RefCounted<AddressFactory> {
3232
public:
3333
typedef SharedRefPtr<AddressFactory> Ptr;
3434
virtual ~AddressFactory() {}
35-
virtual bool create(const Row* peers_row, const Host::Ptr& connected_host, Address* output) = 0;
36-
};
37-
38-
/**
39-
* An address factory that creates `Address` using the `rpc_address` column.
40-
*/
41-
class DefaultAddressFactory : public AddressFactory {
4235
virtual bool create(const Row* peers_row, const Host::Ptr& connected_host, Address* output);
36+
virtual bool is_peer(const Row* peers_row, const Host::Ptr& connected_host,
37+
const Address& expected);
4338
};
4439

4540
/**
@@ -48,13 +43,15 @@ class DefaultAddressFactory : public AddressFactory {
4843
*/
4944
class SniAddressFactory : public AddressFactory {
5045
virtual bool create(const Row* peers_row, const Host::Ptr& connected_host, Address* output);
46+
virtual bool is_peer(const Row* peers_row, const Host::Ptr& connected_host,
47+
const Address& expected);
5148
};
5249

5350
inline AddressFactory* create_address_factory_from_config(const Config& config) {
5451
if (config.cloud_secure_connection_config().is_loaded()) {
5552
return new SniAddressFactory();
5653
} else {
57-
return new DefaultAddressFactory();
54+
return new AddressFactory();
5855
}
5956
}
6057

src/cluster.cpp

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,17 @@ LockedHostMap::LockedHostMap(const HostMap& hosts)
171171
LockedHostMap::~LockedHostMap() { uv_mutex_destroy(&mutex_); }
172172

173173
LockedHostMap::const_iterator LockedHostMap::find(const Address& address) const {
174-
return hosts_.find(address);
174+
HostMap::const_iterator it = hosts_.find(address);
175+
if (it == hosts_.end()) {
176+
// If this is from an event (not SNI) and we're using SNI addresses then fallback to using the
177+
// "rpc_address" to compare.
178+
for (HostMap::const_iterator i = hosts_.begin(), end = hosts_.end(); i != end; ++i) {
179+
if (i->second->rpc_address() == address) {
180+
return i;
181+
}
182+
}
183+
}
184+
return it;
175185
}
176186

177187
Host::Ptr LockedHostMap::get(const Address& address) const {
@@ -470,9 +480,10 @@ void Cluster::on_reconnect(ControlConnector* connector) {
470480

471481
void Cluster::internal_close() {
472482
is_closing_ = true;
483+
bool was_timer_running = timer_.is_running();
484+
timer_.stop();
473485
monitor_reporting_timer_.stop();
474-
if (timer_.is_running()) {
475-
timer_.stop();
486+
if (was_timer_running) {
476487
handle_close();
477488
} else if (reconnector_) {
478489
reconnector_->cancel();
@@ -648,7 +659,7 @@ void Cluster::notify_host_remove(const Address& address) {
648659
notify_or_record(ClusterEvent(ClusterEvent::HOST_DOWN, host));
649660
}
650661

651-
hosts_.erase(address);
662+
hosts_.erase(it->first);
652663
for (LoadBalancingPolicy::Vec::const_iterator it = load_balancing_policies_.begin(),
653664
end = load_balancing_policies_.end();
654665
it != end; ++it) {

src/collection_iterator.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,23 @@ bool CollectionIterator::next() {
2727
}
2828

2929
bool CollectionIterator::decode_value() {
30-
DataType::ConstPtr data_type;
3130
if (collection_->value_type() == CASS_VALUE_TYPE_MAP) {
32-
data_type =
31+
const DataType::ConstPtr& data_type =
3332
(index_ % 2 == 0) ? collection_->primary_data_type() : collection_->secondary_data_type();
33+
value_ = decoder_.decode_value(data_type);
3434
} else {
35-
data_type = collection_->primary_data_type();
35+
value_ = decoder_.decode_value(collection_->primary_data_type());
3636
}
3737

38-
return decoder_.decode_value(data_type, value_, true);
38+
return value_.is_valid();
3939
}
4040

4141
bool TupleIterator::next() {
4242
if (next_ == end_) {
4343
return false;
4444
}
4545
current_ = next_++;
46-
return decoder_.decode_value(*current_, value_);
46+
47+
value_ = decoder_.decode_value(*current_);
48+
return value_.is_valid();
4749
}

src/control_connection.cpp

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ static NopControlConnectionListener nop_listener__;
307307
ControlConnectionSettings::ControlConnectionSettings()
308308
: use_schema(CASS_DEFAULT_USE_SCHEMA)
309309
, use_token_aware_routing(CASS_DEFAULT_USE_TOKEN_AWARE_ROUTING)
310-
, address_factory(new DefaultAddressFactory()) {}
310+
, address_factory(new AddressFactory()) {}
311311

312312
ControlConnectionSettings::ControlConnectionSettings(const Config& config)
313313
: connection_settings(config)
@@ -395,17 +395,16 @@ void ControlConnection::handle_refresh_node(RefreshNodeCallback* callback) {
395395
const Row* row = NULL;
396396
ResultIterator rows(callback->result().get());
397397

398-
while (rows.next() && !found_host) {
399-
row = rows.row();
400-
if (callback->is_all_peers) {
401-
Address address;
402-
bool is_valid_address = settings_.address_factory->create(row, connection_->host(), &address);
403-
if (is_valid_address && callback->address == address) {
398+
if (callback->is_all_peers) {
399+
while (!found_host && rows.next()) {
400+
row = rows.row();
401+
if (settings_.address_factory->is_peer(row, connection_->host(), callback->address)) {
404402
found_host = true;
405403
}
406-
} else {
407-
found_host = true;
408404
}
405+
} else if (rows.next()) {
406+
row = rows.row();
407+
found_host = true;
409408
}
410409

411410
if (!found_host) {

src/decoder.cpp

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -138,33 +138,39 @@ bool Decoder::decode_warnings(WarningVec& output) {
138138
return true;
139139
}
140140

141-
bool Decoder::decode_value(const DataType::ConstPtr& data_type, Value& output,
142-
bool is_inside_collection /*= false*/) {
143-
const char* buffer = NULL;
141+
Value Decoder::decode_value(const DataType::ConstPtr& data_type) {
144142
int32_t size = 0;
145-
146-
if (!decode_int32(size)) {
147-
return false;
148-
}
143+
if (!decode_int32(size)) return Value();
149144

150145
if (size >= 0) {
151-
buffer = input_;
146+
Decoder decoder(input_, size, protocol_version_);
152147
input_ += size;
153148
remaining_ -= size;
154-
Decoder decoder(buffer, size, protocol_version_);
155-
156-
if (data_type->is_collection()) {
157-
int32_t count;
158-
if (!decoder.decode_int32(count)) return false;
159-
output = Value(data_type, count, decoder);
160-
} else {
161-
output = Value(data_type, decoder);
149+
150+
int32_t count = 0;
151+
if (!data_type->is_collection()) {
152+
return Value(data_type, decoder);
153+
} else if (decoder.decode_int32(count)) {
154+
return Value(data_type, count, decoder);
162155
}
163-
} else { // null value
164-
output = Value(data_type);
156+
return Value();
165157
}
158+
return Value(data_type);
159+
}
166160

167-
return true;
161+
bool Decoder::update_value(Value& value) {
162+
int32_t size = 0;
163+
if (decode_int32(size)) {
164+
if (size >= 0) {
165+
Decoder decoder(input_, size, protocol_version_);
166+
input_ += size;
167+
remaining_ -= size;
168+
return value.update(decoder);
169+
}
170+
Decoder decoder;
171+
return value.update(decoder);
172+
}
173+
return false;
168174
}
169175

170176
void Decoder::notify_error(const char* detail, size_t bytes) const {

src/decoder.hpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -558,8 +558,10 @@ class Decoder {
558558
bool decode_write_type(CassWriteType& output);
559559
bool decode_warnings(WarningVec& output);
560560

561-
bool decode_value(const DataType::ConstPtr& data_type, Value& output,
562-
bool is_inside_collection = false);
561+
Value decode_value(const DataType::ConstPtr& data_type);
562+
bool update_value(Value& value);
563+
564+
bool is_null() const { return input_ == NULL; }
563565

564566
protected:
565567
// Testing only

src/host.cpp

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,6 @@ void Host::set(const Row* row, bool use_tokens) {
140140
if (dse_server_version_ < VersionNumber(6, 7, 0)) {
141141
server_version_ = VersionNumber(3, 11, 0);
142142
}
143-
} else {
144-
LOG_WARN("Invalid DSE version string \"%s\" on host %s", dse_version_str.c_str(),
145-
address().to_string().c_str());
146143
}
147144
}
148145

@@ -170,6 +167,18 @@ void Host::set(const Row* row, bool use_tokens) {
170167
"If this is incorrect you should configure a specific interface for rpc_address on "
171168
"the server.",
172169
address_string_.c_str());
170+
v = row->get_by_name("listen_address"); // Available in system.local
171+
if (v && !v->is_null()) {
172+
v->decoder().as_inet(v->size(), address_.port(), &rpc_address_);
173+
} else {
174+
v = row->get_by_name("peer"); // Available in system.peers
175+
if (v && !v->is_null()) {
176+
v->decoder().as_inet(v->size(), address_.port(), &rpc_address_);
177+
}
178+
}
179+
if (!rpc_address_.is_valid()) {
180+
LOG_WARN("Unable to set rpc_address from either listen_address or peer");
181+
}
173182
}
174183
} else {
175184
LOG_WARN("No rpc_address for host %s in system.local or system.peers.",
@@ -213,30 +222,32 @@ void Host::close_unpooled_connections(uv_loop_t *loop) {
213222
}
214223
}
215224

225+
static CassInet to_inet(const Host::Ptr& host) {
226+
CassInet address;
227+
if (host->address().is_resolved()) {
228+
address.address_length = host->address().to_inet(address.address);
229+
} else {
230+
address.address_length = host->rpc_address().to_inet(&address.address);
231+
}
232+
return address;
233+
}
234+
216235
ExternalHostListener::ExternalHostListener(const CassHostListenerCallback callback, void* data)
217236
: callback_(callback)
218237
, data_(data) {}
219238

220239
void ExternalHostListener::on_host_up(const Host::Ptr& host) {
221-
CassInet address;
222-
address.address_length = host->address().to_inet(address.address);
223-
callback_(CASS_HOST_LISTENER_EVENT_UP, address, data_);
240+
callback_(CASS_HOST_LISTENER_EVENT_UP, to_inet(host), data_);
224241
}
225242

226243
void ExternalHostListener::on_host_down(const Host::Ptr& host) {
227-
CassInet address;
228-
address.address_length = host->address().to_inet(address.address);
229-
callback_(CASS_HOST_LISTENER_EVENT_DOWN, address, data_);
244+
callback_(CASS_HOST_LISTENER_EVENT_DOWN, to_inet(host), data_);
230245
}
231246

232247
void ExternalHostListener::on_host_added(const Host::Ptr& host) {
233-
CassInet address;
234-
address.address_length = host->address().to_inet(address.address);
235-
callback_(CASS_HOST_LISTENER_EVENT_ADD, address, data_);
248+
callback_(CASS_HOST_LISTENER_EVENT_ADD, to_inet(host), data_);
236249
}
237250

238251
void ExternalHostListener::on_host_removed(const Host::Ptr& host) {
239-
CassInet address;
240-
address.address_length = host->address().to_inet(address.address);
241-
callback_(CASS_HOST_LISTENER_EVENT_REMOVE, address, data_);
252+
callback_(CASS_HOST_LISTENER_EVENT_REMOVE, to_inet(host), data_);
242253
}

0 commit comments

Comments
 (0)