Skip to content

Commit 050b06c

Browse files
committed
[KIP-848] Tests for dev_kip848_fix_fast_subscribe_or_unsubscribe
1 parent 8e83ba2 commit 050b06c

File tree

4 files changed

+215
-0
lines changed

4 files changed

+215
-0
lines changed

src/rd.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,12 @@ static RD_INLINE RD_UNUSED char *rd_strndup(const char *s, size_t len) {
226226
#define RD_ARRAYSIZE(A) RD_ARRAY_SIZE(A)
227227
#define RD_SIZEOF(TYPE, MEMBER) sizeof(((TYPE *)NULL)->MEMBER)
228228
#define RD_OFFSETOF(TYPE, MEMBER) ((size_t) & (((TYPE *)NULL)->MEMBER))
229+
/** Array foreach */
230+
#define RD_ARRAY_FOREACH(ELEM, ARRAY, INDEX) \
231+
for ((INDEX = 0, (ELEM) = (ARRAY)[INDEX]); \
232+
INDEX < RD_ARRAY_SIZE(ARRAY); \
233+
(ELEM) = \
234+
(++INDEX < RD_ARRAY_SIZE(ARRAY) ? (ARRAY)[INDEX] : (ELEM)))
229235

230236
/**
231237
* Returns the 'I'th array element from static sized array 'A'

tests/0045-subscribe_update.c

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -819,6 +819,94 @@ static void do_test_resubscribe_with_regex() {
819819
SUB_TEST_PASS();
820820
}
821821

822+
/**
823+
* @brief Create many topics and apply several subscription
824+
* updates, unsubscribing and re-subscribing too.
825+
* After changing some subscriptions verifies that the assignment
826+
* corresponds to last one.
827+
*
828+
* @param with_rebalance_cb Use a rebalance callback to perform the assignment.
829+
* It needs to poll the consumer when awaiting for the
830+
* assignment in this case.
831+
*/
832+
static void do_test_many_updates(rd_bool_t with_rebalance_cb) {
833+
char *topics[100] = {0};
834+
char *topic;
835+
size_t i;
836+
char *group;
837+
rd_kafka_t *rk;
838+
rd_kafka_conf_t *conf;
839+
const int subscription_size = 5, partition_cnt = 4;
840+
rd_kafka_topic_partition_list_t *expected_assignment = NULL;
841+
842+
SUB_TEST("%s", with_rebalance_cb ? "with rebalance callback"
843+
: "without rebalance callback");
844+
845+
RD_ARRAY_FOREACH(topic, topics, i) {
846+
topics[i] = rd_strdup(test_mk_topic_name("topic", 1));
847+
}
848+
group = topics[0];
849+
850+
test_conf_init(&conf, NULL, 60);
851+
if (with_rebalance_cb)
852+
rd_kafka_conf_set_rebalance_cb(conf, test_rebalance_cb);
853+
rk = test_create_consumer(group, NULL, conf, NULL);
854+
855+
TEST_SAY("Creating %" PRIusz " topics\n", RD_ARRAY_SIZE(topics));
856+
TEST_CALL_ERR__(test_CreateTopics_simple(
857+
rk, NULL, topics, RD_ARRAY_SIZE(topics), partition_cnt, NULL));
858+
test_wait_topic_exists(rk, topics[0], 5000);
859+
/* Give the cluster some more time to propagate metadata
860+
* for 100 topics */
861+
rd_sleep(1);
862+
863+
RD_ARRAY_FOREACH(topic, topics, i) {
864+
int j, k;
865+
rd_kafka_topic_partition_list_t *subscription =
866+
rd_kafka_topic_partition_list_new(subscription_size);
867+
RD_IF_FREE(expected_assignment,
868+
rd_kafka_topic_partition_list_destroy);
869+
expected_assignment = rd_kafka_topic_partition_list_new(
870+
subscription_size * partition_cnt);
871+
for (j = i; j < (int)RD_MIN(i + subscription_size,
872+
RD_ARRAY_SIZE(topics));
873+
j++) {
874+
topic = topics[j];
875+
rd_kafka_topic_partition_list_add(
876+
subscription, topic, RD_KAFKA_PARTITION_UA);
877+
/* Every 7 * 5 we unsubscribe and check that assignment
878+
* is empty. */
879+
if (i % (7 * 5) != 0)
880+
for (k = 0; k < partition_cnt; k++)
881+
rd_kafka_topic_partition_list_add(
882+
expected_assignment, topic, k);
883+
}
884+
TEST_CALL_ERR__(rd_kafka_subscribe(rk, subscription));
885+
rd_kafka_topic_partition_list_destroy(subscription);
886+
887+
if (i % 7 == 0)
888+
TEST_CALL_ERR__(rd_kafka_unsubscribe(rk));
889+
if (i % 5 == 0)
890+
test_consumer_wait_assignment_topic_partition_list(
891+
rk,
892+
/* poll when we have a rebalance callback */
893+
with_rebalance_cb, expected_assignment, 10000);
894+
}
895+
test_consumer_wait_assignment_topic_partition_list(
896+
rk, with_rebalance_cb, expected_assignment, 10000);
897+
898+
TEST_CALL_ERR__(test_DeleteTopics_simple(rk, NULL, topics,
899+
RD_ARRAY_SIZE(topics), NULL));
900+
RD_ARRAY_FOREACH(topic, topics, i) {
901+
rd_free(topics[i]);
902+
}
903+
904+
test_consumer_close(rk);
905+
rd_kafka_destroy(rk);
906+
907+
SUB_TEST_PASS();
908+
}
909+
822910
int main_0045_subscribe_update(int argc, char **argv) {
823911

824912
if (!test_can_create_topics(1))
@@ -862,6 +950,12 @@ int main_0045_resubscribe_with_regex(int argc, char **argv) {
862950
return 0;
863951
}
864952

953+
int main_0045_many_updates(int argc, char **argv) {
954+
do_test_many_updates(rd_false);
955+
do_test_many_updates(rd_true);
956+
return 0;
957+
}
958+
865959
int main_0045_subscribe_update_racks_mock(int argc, char **argv) {
866960
int use_replica_rack = 0;
867961
int use_client_rack = 0;

tests/test.c

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ _TEST_DECL(0045_subscribe_update_non_exist_and_partchange);
158158
_TEST_DECL(0045_subscribe_update_mock);
159159
_TEST_DECL(0045_subscribe_update_racks_mock);
160160
_TEST_DECL(0045_resubscribe_with_regex);
161+
_TEST_DECL(0045_many_updates);
161162
_TEST_DECL(0046_rkt_cache);
162163
_TEST_DECL(0047_partial_buf_tmout);
163164
_TEST_DECL(0048_partitioner);
@@ -383,6 +384,7 @@ struct test tests[] = {
383384
_TEST(0045_subscribe_update_mock, TEST_F_LOCAL),
384385
_TEST(0045_subscribe_update_racks_mock, TEST_F_LOCAL),
385386
_TEST(0045_resubscribe_with_regex, 0, TEST_BRKVER(0, 9, 0, 0)),
387+
_TEST(0045_many_updates, 0, TEST_BRKVER(0, 10, 2, 0)),
386388
_TEST(0046_rkt_cache, TEST_F_LOCAL),
387389
_TEST(0047_partial_buf_tmout, TEST_F_KNOWN_ISSUE),
388390
_TEST(0048_partitioner,
@@ -3093,6 +3095,99 @@ void test_consumer_verify_assignment0(const char *func,
30933095
rd_kafka_topic_partition_list_destroy(assignment);
30943096
}
30953097

3098+
/**
3099+
* @brief Verify that the consumer's assignment matches the expected assignment.
3100+
* passed as a topic partition list in \p expected_assignment .
3101+
*/
3102+
rd_bool_t test_consumer_verify_assignment_topic_partition_list0(
3103+
const char *func,
3104+
int line,
3105+
rd_kafka_t *rk,
3106+
rd_kafka_topic_partition_list_t *expected_assignment) {
3107+
rd_kafka_topic_partition_list_t *assignment;
3108+
rd_kafka_resp_err_t err;
3109+
int i;
3110+
rd_bool_t ret = rd_true;
3111+
3112+
if ((err = rd_kafka_assignment(rk, &assignment)))
3113+
TEST_FAIL("%s:%d: Failed to get assignment for %s: %s", func,
3114+
line, rd_kafka_name(rk), rd_kafka_err2str(err));
3115+
3116+
TEST_SAY("%s assignment (%d partition(s)):\n", rd_kafka_name(rk),
3117+
assignment->cnt);
3118+
for (i = 0; i < assignment->cnt; i++)
3119+
TEST_SAY(" %s [%" PRId32 "]\n", assignment->elems[i].topic,
3120+
assignment->elems[i].partition);
3121+
3122+
rd_kafka_topic_partition_list_sort(assignment, NULL, NULL);
3123+
rd_kafka_topic_partition_list_sort(expected_assignment, NULL, NULL);
3124+
3125+
if (assignment->cnt != expected_assignment->cnt) {
3126+
ret = rd_false;
3127+
goto done;
3128+
} else {
3129+
for (i = 0; i < assignment->cnt; i++) {
3130+
if (strcmp(assignment->elems[i].topic,
3131+
expected_assignment->elems[i].topic) ||
3132+
assignment->elems[i].partition !=
3133+
expected_assignment->elems[i].partition) {
3134+
ret = rd_false;
3135+
goto done;
3136+
}
3137+
}
3138+
}
3139+
3140+
done:
3141+
rd_kafka_topic_partition_list_destroy(assignment);
3142+
return ret;
3143+
}
3144+
3145+
/**
3146+
* @brief Wait until the consumer's assignment matches the expected assignment.
3147+
* passed as a topic partition list in \p expected_assignment .
3148+
* Polling if \p do_poll is true, otherwise sleeps.
3149+
* Until \p timeout_ms milliseconds.
3150+
*/
3151+
void test_consumer_wait_assignment_topic_partition_list0(
3152+
const char *func,
3153+
int line,
3154+
rd_kafka_t *rk,
3155+
rd_bool_t do_poll,
3156+
rd_kafka_topic_partition_list_t *expected_assignment,
3157+
int timeout_ms) {
3158+
int i;
3159+
rd_ts_t end = test_clock() + timeout_ms * 1000;
3160+
rd_bool_t verified = rd_false;
3161+
3162+
TEST_SAY("Verifying assignment\n");
3163+
rd_kafka_topic_partition_list_sort(expected_assignment, NULL, NULL);
3164+
TEST_SAY("%s expected assignment (%d partition(s)):\n",
3165+
rd_kafka_name(rk), expected_assignment->cnt);
3166+
for (i = 0; i < expected_assignment->cnt; i++)
3167+
TEST_SAY(" %s [%" PRId32 "]\n",
3168+
expected_assignment->elems[i].topic,
3169+
expected_assignment->elems[i].partition);
3170+
3171+
do {
3172+
verified =
3173+
test_consumer_verify_assignment_topic_partition_list0(
3174+
func, line, rk, expected_assignment);
3175+
if (!verified) {
3176+
if (do_poll)
3177+
test_consumer_poll_once(rk, NULL, 100);
3178+
else
3179+
rd_usleep(100 * 1000, NULL);
3180+
}
3181+
} while (test_clock() < end && !verified);
3182+
3183+
if (!verified) {
3184+
TEST_FAIL(
3185+
"%s:%d: Expected assignment not found in %s's "
3186+
"assignment",
3187+
func, line, rd_kafka_name(rk));
3188+
}
3189+
TEST_SAY("Verified assignment\n");
3190+
}
30963191

30973192

30983193
/**

tests/test.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -676,6 +676,26 @@ void test_consumer_verify_assignment0(const char *func,
676676
test_consumer_verify_assignment0(__FUNCTION__, __LINE__, rk, \
677677
fail_immediately, __VA_ARGS__)
678678

679+
rd_bool_t test_consumer_verify_assignment_topic_partition_list0(
680+
const char *func,
681+
int line,
682+
rd_kafka_t *rk,
683+
rd_kafka_topic_partition_list_t *parts);
684+
#define test_consumer_verify_assignment_topic_partition_list(rk, parts) \
685+
test_consumer_verify_assignment_topic_partition_list0( \
686+
__FUNCTION__, __LINE__, rk, parts)
687+
688+
void test_consumer_wait_assignment_topic_partition_list0(
689+
const char *func,
690+
int line,
691+
rd_kafka_t *rk,
692+
rd_bool_t do_poll,
693+
rd_kafka_topic_partition_list_t *expected_assignment,
694+
int timeout_ms);
695+
#define test_consumer_wait_assignment_topic_partition_list(rk, do_poll, parts, \
696+
timeout_ms) \
697+
test_consumer_wait_assignment_topic_partition_list0( \
698+
__FUNCTION__, __LINE__, rk, do_poll, parts, timeout_ms)
679699

680700
void test_consumer_assign(const char *what,
681701
rd_kafka_t *rk,

0 commit comments

Comments
 (0)