Skip to content

Commit dd421b3

Browse files
authored
[KIP-848] Tests for: Fix for a rapid unsubscribe while the member id is still not assigned (#4700)
local tests to check a previous segfault is avoided with fast subscribe/unsubscribe changes.
1 parent 733ef31 commit dd421b3

File tree

1 file changed

+75
-0
lines changed

1 file changed

+75
-0
lines changed

tests/0147-consumer_group_consumer_mock.c

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -858,6 +858,79 @@ static void do_test_adherence_to_hb_interval(void) {
858858
SUB_TEST_PASS();
859859
}
860860

861+
typedef enum do_test_quick_unsubscribe_variation_t {
862+
/* No mock cluster, no coordinator available. */
863+
DO_TEST_QUICK_UNSUBSCRIBE_VARIATION_NO_CLUSTER = 0,
864+
/* Mock cluster is ready */
865+
DO_TEST_QUICK_UNSUBSCRIBE_VARIATION_CLUSTER_READY = 1,
866+
DO_TEST_QUICK_UNSUBSCRIBE_VARIATION__CNT
867+
} do_test_quick_unsubscribe_variation_t;
868+
869+
/**
870+
* @brief A series of subscribe and unsubscribe call shouldn't cause
871+
* assert failures.
872+
*
873+
* @param variation Test variation.
874+
*
875+
* @sa `do_test_quick_unsubscribe_variation_t`
876+
*/
877+
static void
878+
do_test_quick_unsubscribe(do_test_quick_unsubscribe_variation_t variation) {
879+
int i;
880+
rd_kafka_t *c;
881+
rd_kafka_topic_partition_list_t *subscription;
882+
rd_kafka_mock_cluster_t *mcluster = NULL;
883+
const char *bootstraps = "localhost:9999";
884+
const char *topic = test_mk_topic_name(__FUNCTION__, 0);
885+
886+
SUB_TEST_QUICK(
887+
"%s", variation == DO_TEST_QUICK_UNSUBSCRIBE_VARIATION_NO_CLUSTER
888+
? "no cluster"
889+
: "mock cluster ready");
890+
891+
if (variation == DO_TEST_QUICK_UNSUBSCRIBE_VARIATION_NO_CLUSTER) {
892+
test_curr->is_fatal_cb = error_is_fatal_cb;
893+
allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
894+
} else if (variation ==
895+
DO_TEST_QUICK_UNSUBSCRIBE_VARIATION_CLUSTER_READY) {
896+
mcluster = test_mock_cluster_new(1, &bootstraps);
897+
rd_kafka_mock_topic_create(mcluster, topic, 1, 1);
898+
}
899+
900+
c = create_consumer(bootstraps, topic, rd_true);
901+
902+
subscription = rd_kafka_topic_partition_list_new(1);
903+
rd_kafka_topic_partition_list_add(subscription, topic,
904+
RD_KAFKA_PARTITION_UA);
905+
906+
for (i = 0; i < 2; i++) {
907+
TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription));
908+
TEST_CALL_ERR__(rd_kafka_unsubscribe(c));
909+
}
910+
911+
rd_kafka_topic_partition_list_destroy(subscription);
912+
rd_kafka_destroy(c);
913+
RD_IF_FREE(mcluster, test_mock_cluster_destroy);
914+
915+
test_curr->is_fatal_cb = NULL;
916+
allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
917+
SUB_TEST_PASS();
918+
}
919+
920+
/**
921+
* @brief Test all `do_test_quick_unsubscribe` variations.
922+
*
923+
* @sa `do_test_quick_unsubscribe_variation_t`
924+
*/
925+
static void do_test_quick_unsubscribe_tests(void) {
926+
do_test_quick_unsubscribe_variation_t variation;
927+
for (variation = DO_TEST_QUICK_UNSUBSCRIBE_VARIATION_NO_CLUSTER;
928+
variation < DO_TEST_QUICK_UNSUBSCRIBE_VARIATION__CNT;
929+
variation++) {
930+
do_test_quick_unsubscribe(variation);
931+
}
932+
}
933+
861934
int main_0147_consumer_group_consumer_mock(int argc, char **argv) {
862935
TEST_SKIP_MOCK_CLUSTER(0);
863936

@@ -876,5 +949,7 @@ int main_0147_consumer_group_consumer_mock(int argc, char **argv) {
876949

877950
do_test_adherence_to_hb_interval();
878951

952+
do_test_quick_unsubscribe_tests();
953+
879954
return 0;
880955
}

0 commit comments

Comments
 (0)