@@ -5799,6 +5799,8 @@ static void rd_kafka_cgrp_handle_assign_op(rd_kafka_cgrp_t *rkcg,
5799
5799
rkcg -> rkcg_rebalance_incr_assignment = NULL ;
5800
5800
}
5801
5801
5802
+ rkcg -> rkcg_rebalance_rejoin = rd_false ;
5803
+
5802
5804
rko -> rko_u .assign .method = RD_KAFKA_ASSIGN_METHOD_ASSIGN ;
5803
5805
5804
5806
if (rkcg -> rkcg_join_state ==
@@ -6083,11 +6085,94 @@ void rd_kafka_cgrp_consumer_group_heartbeat(rd_kafka_cgrp_t *rkcg,
6083
6085
rd_kafka_cgrp_handle_ConsumerGroupHeartbeat , NULL );
6084
6086
}
6085
6087
6088
+ #define rd_kafka_cgrp_consumer_check_invariant_mutually_exclusive ( \
6089
+ FIELD , FLAG_A , FLAG_B ) \
6090
+ (((FIELD) & (FLAG_A | FLAG_B)) != (FLAG_A | FLAG_B))
6091
+
6092
+ /**
6093
+ * @brief Verify that the consumer group never enters an invalid state.
6094
+ * below is a list of allowed join states plus the global and
6095
+ * state-dependent invariants.
6096
+ *
6097
+ * @remark Must only be called when devel mode is enabled
6098
+ *
6099
+ * @locality main thread
6100
+ * @locks none
6101
+ */
6102
+ static void rd_kafka_cgrp_consumer_check_invariants (rd_kafka_cgrp_t * rkcg ) {
6103
+ /* CONSUMER_F_SEND_NEW_SUBSCRIPTION and
6104
+ * CONSUMER_F_SENDING_NEW_SUBSCRIPTION aren't always mutually exclusive
6105
+ * given you can ask to send a new subscription when the previous
6106
+ * one is being sent. */
6107
+
6108
+ /* F_SUBSCRIPTION iff all subscription fields are non-NULL. */
6109
+ rd_dassert (((rkcg -> rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION ) > 0 ) ==
6110
+ (rkcg -> rkcg_subscription != NULL &&
6111
+ rkcg -> rkcg_subscription_topics != NULL &&
6112
+ rkcg -> rkcg_subscription_regex != NULL ));
6113
+
6114
+ /* F_WILDCARD_SUBSCRIPTION iff F_SUBSCRIPTION and regex is not empty. */
6115
+ rd_dassert (
6116
+ ((rkcg -> rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION ) > 0 ) ==
6117
+ (rkcg -> rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION &&
6118
+ RD_KAFKAP_STR_LEN (rkcg -> rkcg_subscription_regex ) > 0 ));
6119
+
6120
+ /* CONSUMER_F_WAIT_ACK and
6121
+ * CONSUMER_F_SENDING_ACK aren't always mutually exclusive.
6122
+ * given when you receive a HB response in STEADY state and
6123
+ * you were waiting for an ACK you need to verify if the HB was
6124
+ * for sending the ACK, as it could have started in previous states. */
6125
+
6126
+ /* CONSUMER_F_WAIT_REJOIN and CONSUMER_F_WAIT_REJOIN_TO_COMPLETE
6127
+ * are mutually exclusive. */
6128
+ rd_dassert (rd_kafka_cgrp_consumer_check_invariant_mutually_exclusive (
6129
+ rkcg -> rkcg_consumer_flags , RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN ,
6130
+ RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE ));
6131
+
6132
+ /* F_LEAVE_ON_UNASSIGN_DONE and F_WAIT_LEAVE
6133
+ * are mutually exclusive. */
6134
+ rd_dassert (rd_kafka_cgrp_consumer_check_invariant_mutually_exclusive (
6135
+ rkcg -> rkcg_flags , RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE ,
6136
+ RD_KAFKA_CGRP_F_WAIT_LEAVE ));
6137
+
6138
+ switch (rkcg -> rkcg_join_state ) {
6139
+ case RD_KAFKA_CGRP_JOIN_STATE_INIT :
6140
+ rd_dassert (rkcg -> rkcg_current_assignment -> cnt == 0 );
6141
+ rd_dassert (!(rkcg -> rkcg_consumer_flags &
6142
+ RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK ));
6143
+
6144
+ /* Generation id can be greater than 0 in this state
6145
+ * if target assignment is empty
6146
+ * a reconciliation isn't started. */
6147
+ /* FALLTHRU */
6148
+ case RD_KAFKA_CGRP_JOIN_STATE_STEADY :
6149
+ rd_dassert (!rkcg -> rkcg_rebalance_incr_assignment );
6150
+ rd_dassert (!rkcg -> rkcg_rebalance_rejoin );
6151
+ rd_dassert (!(rkcg -> rkcg_flags &
6152
+ RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE ));
6153
+ rd_dassert (!(rkcg -> rkcg_consumer_flags &
6154
+ RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE ));
6155
+ break ;
6156
+ case RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL :
6157
+ case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL :
6158
+ case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE :
6159
+ case RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE :
6160
+ break ;
6161
+ default :
6162
+ rd_kafka_log (
6163
+ rkcg -> rkcg_rk , LOG_EMERG , "CGRP" ,
6164
+ "Invalid state detected: %s" ,
6165
+ rd_kafka_cgrp_join_state_names [rkcg -> rkcg_join_state ]);
6166
+ rd_dassert (!* "invalid state" );
6167
+ break ;
6168
+ }
6169
+ }
6170
+
6086
6171
static rd_bool_t
6087
6172
rd_kafka_cgrp_consumer_heartbeat_preconditions_met (rd_kafka_cgrp_t * rkcg ) {
6088
- rd_dassert (
6089
- ! (rkcg -> rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT &&
6090
- rkcg -> rkcg_flags & RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE ));
6173
+ #if ENABLE_DEVEL == 1
6174
+ rd_kafka_cgrp_consumer_check_invariants (rkcg );
6175
+ #endif
6091
6176
6092
6177
if (!(rkcg -> rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION ))
6093
6178
return rd_false ;
0 commit comments