Skip to content

[KIP-848] Tests for dev_kip848_fix_fast_subscribe_or_unsubscribe #5026

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

emasab
Copy link
Contributor

@emasab emasab commented Apr 8, 2025

No description provided.

@emasab emasab requested a review from a team as a code owner April 8, 2025 18:21
@confluent-cla-assistant
Copy link

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

Base automatically changed from dev_kip848_fix_fast_subscribe_or_unsubscribe to master April 8, 2025 19:09
@airlock-confluentinc airlock-confluentinc bot force-pushed the dev_kip848_fix_fast_subscribe_or_unsubscribe_tests branch from 0999c46 to 050b06c Compare April 9, 2025 09:37
TEST_SAY("Creating %" PRIusz " topics\n", RD_ARRAY_SIZE(topics));
TEST_CALL_ERR__(test_CreateTopics_simple(
rk, NULL, topics, RD_ARRAY_SIZE(topics), partition_cnt, NULL));
test_wait_topic_exists(rk, topics[0], 5000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to check for last topic.

RD_IF_FREE(expected_assignment,
rd_kafka_topic_partition_list_destroy);
expected_assignment = rd_kafka_topic_partition_list_new(
subscription_size * partition_cnt);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscription_size * partition_cnt is constant throughout the loop. Extract it in a variable.

expected_assignment = rd_kafka_topic_partition_list_new(
subscription_size * partition_cnt);
for (j = i; j < (int)RD_MIN(i + subscription_size,
RD_ARRAY_SIZE(topics));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to use RD_ARRAY_SIZE(topics)). Directly extract the size in a define and use it even when initializing the array.

topic = topics[j];
rd_kafka_topic_partition_list_add(
subscription, topic, RD_KAFKA_PARTITION_UA);
/* Every 7 * 5 we unsubscribe and check that assignment
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/* Every 7 * 5 we unsubscribe and check that assignment
/* We unsubscribe every 7 iteration and we check assignments every 5 iteration so at 7 * 5 unsubscribe and check that assignment

char *group;
rd_kafka_t *rk;
rd_kafka_conf_t *conf;
const int subscription_size = 5, partition_cnt = 4;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const int subscription_size = 5, partition_cnt = 4;
const int max_subscription_size = 5, partition_cnt = 4;

tests/test.c Outdated
Comment on lines 3171 to 3181
do {
verified =
test_consumer_verify_assignment_topic_partition_list0(
func, line, rk, expected_assignment);
if (!verified) {
if (do_poll)
test_consumer_poll_once(rk, NULL, 100);
else
rd_usleep(100 * 1000, NULL);
}
} while (test_clock() < end && !verified);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for do-while loop. Normal while will work. We can remove checking !verified at 2 places.

Suggested change
do {
verified =
test_consumer_verify_assignment_topic_partition_list0(
func, line, rk, expected_assignment);
if (!verified) {
if (do_poll)
test_consumer_poll_once(rk, NULL, 100);
else
rd_usleep(100 * 1000, NULL);
}
} while (test_clock() < end && !verified);
while (test_clock() < end) {
verified =
test_consumer_verify_assignment_topic_partition_list0(
func, line, rk, expected_assignment);
if(verified)
break;
if (do_poll)
test_consumer_poll_once(rk, NULL, 100);
else
rd_usleep(100 * 1000, NULL);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That way we still need to call

                verified =
	                    test_consumer_verify_assignment_topic_partition_list0(
	                        func, line, rk, expected_assignment);

before of after the loop in case we call it with 0 timeout, I use the break but keep the do ... while

} while (test_clock() < end && !verified);

if (!verified) {
TEST_FAIL(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return after test fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed, TEST_FAIL either calls thrd_exit or assert

tests/test.c Outdated
rd_bool_t verified = rd_false;

TEST_SAY("Verifying assignment\n");
rd_kafka_topic_partition_list_sort(expected_assignment, NULL, NULL);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't sort the list here. Its the responsibility of the test_consumer_verify_assignment_topic_partition_list0 on how to verify.

tests/test.c Outdated
Comment on lines 3122 to 3123
rd_kafka_topic_partition_list_sort(assignment, NULL, NULL);
rd_kafka_topic_partition_list_sort(expected_assignment, NULL, NULL);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we should not be sorting these lists. Caller will expect it to be as it is. I would prefer creating new list for expected_assignment, match that and then destroy that. assignment is local so we can do anything with it.

tests/test.c Outdated
Comment on lines 3125 to 3153
if (assignment->cnt != expected_assignment->cnt) {
ret = rd_false;
goto done;
} else {
for (i = 0; i < assignment->cnt; i++) {
if (strcmp(assignment->elems[i].topic,
expected_assignment->elems[i].topic) ||
assignment->elems[i].partition !=
expected_assignment->elems[i].partition) {
ret = rd_false;
goto done;
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

goto done or break. Whatever you want.

Suggested change
if (assignment->cnt != expected_assignment->cnt) {
ret = rd_false;
goto done;
} else {
for (i = 0; i < assignment->cnt; i++) {
if (strcmp(assignment->elems[i].topic,
expected_assignment->elems[i].topic) ||
assignment->elems[i].partition !=
expected_assignment->elems[i].partition) {
ret = rd_false;
goto done;
}
}
}
if (assignment->cnt != expected_assignment->cnt) {
ret = rd_false;
goto done;
}
for (i = 0; i < assignment->cnt; i++) {
if (strcmp(assignment->elems[i].topic,
expected_assignment->elems[i].topic) ||
assignment->elems[i].partition !=
expected_assignment->elems[i].partition) {
ret = rd_false;
break;
}
}

@@ -862,6 +950,12 @@ int main_0045_resubscribe_with_regex(int argc, char **argv) {
return 0;
}

int main_0045_many_updates(int argc, char **argv) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int main_0045_many_updates(int argc, char **argv) {
int main_0045_subscribe_many_updates(int argc, char **argv) {

* It needs to poll the consumer when awaiting for the
* assignment in this case.
*/
static void do_test_many_updates(rd_bool_t with_rebalance_cb) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
static void do_test_many_updates(rd_bool_t with_rebalance_cb) {
static void do_test_subscribe_many_updates(rd_bool_t with_rebalance_cb) {

@airlock-confluentinc airlock-confluentinc bot force-pushed the dev_kip848_fix_fast_subscribe_or_unsubscribe_tests branch from 050b06c to 93df6bf Compare June 26, 2025 10:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants