@@ -5868,6 +5868,8 @@ static void rd_kafka_cgrp_handle_assign_op(rd_kafka_cgrp_t *rkcg,
5868
5868
rkcg -> rkcg_rebalance_incr_assignment = NULL ;
5869
5869
}
5870
5870
5871
+ rkcg -> rkcg_rebalance_rejoin = rd_false ;
5872
+
5871
5873
rko -> rko_u .assign .method = RD_KAFKA_ASSIGN_METHOD_ASSIGN ;
5872
5874
5873
5875
if (rkcg -> rkcg_join_state ==
@@ -6152,11 +6154,94 @@ void rd_kafka_cgrp_consumer_group_heartbeat(rd_kafka_cgrp_t *rkcg,
6152
6154
rd_kafka_cgrp_handle_ConsumerGroupHeartbeat , NULL );
6153
6155
}
6154
6156
6157
+ #define rd_kafka_cgrp_consumer_check_invariant_mutually_exclusive ( \
6158
+ FIELD , FLAG_A , FLAG_B ) \
6159
+ (((FIELD) & (FLAG_A | FLAG_B)) != (FLAG_A | FLAG_B))
6160
+
6161
+ /**
6162
+ * @brief Verify that the consumer group never enters an invalid state.
6163
+ * below is a list of allowed join states plus the global and
6164
+ * state-dependent invariants.
6165
+ *
6166
+ * @remark Must only be called when devel mode is enabled
6167
+ *
6168
+ * @locality main thread
6169
+ * @locks none
6170
+ */
6171
+ static void rd_kafka_cgrp_consumer_check_invariants (rd_kafka_cgrp_t * rkcg ) {
6172
+ /* CONSUMER_F_SEND_NEW_SUBSCRIPTION and
6173
+ * CONSUMER_F_SENDING_NEW_SUBSCRIPTION aren't always mutually exclusive
6174
+ * given you can ask to send a new subscription when the previous
6175
+ * one is being sent. */
6176
+
6177
+ /* F_SUBSCRIPTION iff all subscription fields are non-NULL. */
6178
+ rd_dassert (((rkcg -> rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION ) > 0 ) ==
6179
+ (rkcg -> rkcg_subscription != NULL &&
6180
+ rkcg -> rkcg_subscription_topics != NULL &&
6181
+ rkcg -> rkcg_subscription_regex != NULL ));
6182
+
6183
+ /* F_WILDCARD_SUBSCRIPTION iff F_SUBSCRIPTION and regex is not empty. */
6184
+ rd_dassert (
6185
+ ((rkcg -> rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION ) > 0 ) ==
6186
+ (rkcg -> rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION &&
6187
+ RD_KAFKAP_STR_LEN (rkcg -> rkcg_subscription_regex ) > 0 ));
6188
+
6189
+ /* CONSUMER_F_WAIT_ACK and
6190
+ * CONSUMER_F_SENDING_ACK aren't always mutually exclusive.
6191
+ * given when you receive a HB response in STEADY state and
6192
+ * you were waiting for an ACK you need to verify if the HB was
6193
+ * for sending the ACK, as it could have started in previous states. */
6194
+
6195
+ /* CONSUMER_F_WAIT_REJOIN and CONSUMER_F_WAIT_REJOIN_TO_COMPLETE
6196
+ * are mutually exclusive. */
6197
+ rd_dassert (rd_kafka_cgrp_consumer_check_invariant_mutually_exclusive (
6198
+ rkcg -> rkcg_consumer_flags , RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN ,
6199
+ RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE ));
6200
+
6201
+ /* F_LEAVE_ON_UNASSIGN_DONE and F_WAIT_LEAVE
6202
+ * are mutually exclusive. */
6203
+ rd_dassert (rd_kafka_cgrp_consumer_check_invariant_mutually_exclusive (
6204
+ rkcg -> rkcg_flags , RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE ,
6205
+ RD_KAFKA_CGRP_F_WAIT_LEAVE ));
6206
+
6207
+ switch (rkcg -> rkcg_join_state ) {
6208
+ case RD_KAFKA_CGRP_JOIN_STATE_INIT :
6209
+ rd_dassert (rkcg -> rkcg_current_assignment -> cnt == 0 );
6210
+ rd_dassert (!(rkcg -> rkcg_consumer_flags &
6211
+ RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK ));
6212
+
6213
+ /* Generation id can be greater than 0 in this state
6214
+ * if target assignment is empty
6215
+ * a reconciliation isn't started. */
6216
+ /* FALLTHRU */
6217
+ case RD_KAFKA_CGRP_JOIN_STATE_STEADY :
6218
+ rd_dassert (!rkcg -> rkcg_rebalance_incr_assignment );
6219
+ rd_dassert (!rkcg -> rkcg_rebalance_rejoin );
6220
+ rd_dassert (!(rkcg -> rkcg_flags &
6221
+ RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE ));
6222
+ rd_dassert (!(rkcg -> rkcg_consumer_flags &
6223
+ RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE ));
6224
+ break ;
6225
+ case RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL :
6226
+ case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL :
6227
+ case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE :
6228
+ case RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE :
6229
+ break ;
6230
+ default :
6231
+ rd_kafka_log (
6232
+ rkcg -> rkcg_rk , LOG_EMERG , "CGRP" ,
6233
+ "Invalid state detected: %s" ,
6234
+ rd_kafka_cgrp_join_state_names [rkcg -> rkcg_join_state ]);
6235
+ rd_dassert (!* "invalid state" );
6236
+ break ;
6237
+ }
6238
+ }
6239
+
6155
6240
static rd_bool_t
6156
6241
rd_kafka_cgrp_consumer_heartbeat_preconditions_met (rd_kafka_cgrp_t * rkcg ) {
6157
- rd_dassert (
6158
- ! (rkcg -> rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT &&
6159
- rkcg -> rkcg_flags & RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE ));
6242
+ #if ENABLE_DEVEL == 1
6243
+ rd_kafka_cgrp_consumer_check_invariants (rkcg );
6244
+ #endif
6160
6245
6161
6246
if (!(rkcg -> rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION ))
6162
6247
return rd_false ;
0 commit comments