-
Notifications
You must be signed in to change notification settings - Fork 3.2k
[KIP-848] Tests for dev_kip848_fix_fast_subscribe_or_unsubscribe #5026
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
🎉 All Contributor License Agreements have been signed. Ready to merge. |
0999c46
to
050b06c
Compare
tests/0045-subscribe_update.c
Outdated
TEST_SAY("Creating %" PRIusz " topics\n", RD_ARRAY_SIZE(topics)); | ||
TEST_CALL_ERR__(test_CreateTopics_simple( | ||
rk, NULL, topics, RD_ARRAY_SIZE(topics), partition_cnt, NULL)); | ||
test_wait_topic_exists(rk, topics[0], 5000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to check for last topic.
tests/0045-subscribe_update.c
Outdated
RD_IF_FREE(expected_assignment, | ||
rd_kafka_topic_partition_list_destroy); | ||
expected_assignment = rd_kafka_topic_partition_list_new( | ||
subscription_size * partition_cnt); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
subscription_size * partition_cnt
is constant throughout the loop. Extract it in a variable.
tests/0045-subscribe_update.c
Outdated
expected_assignment = rd_kafka_topic_partition_list_new( | ||
subscription_size * partition_cnt); | ||
for (j = i; j < (int)RD_MIN(i + subscription_size, | ||
RD_ARRAY_SIZE(topics)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to use RD_ARRAY_SIZE(topics))
. Directly extract the size in a define and use it even when initializing the array.
tests/0045-subscribe_update.c
Outdated
topic = topics[j]; | ||
rd_kafka_topic_partition_list_add( | ||
subscription, topic, RD_KAFKA_PARTITION_UA); | ||
/* Every 7 * 5 we unsubscribe and check that assignment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/* Every 7 * 5 we unsubscribe and check that assignment | |
/* We unsubscribe every 7 iteration and we check assignments every 5 iteration so at 7 * 5 unsubscribe and check that assignment |
tests/0045-subscribe_update.c
Outdated
char *group; | ||
rd_kafka_t *rk; | ||
rd_kafka_conf_t *conf; | ||
const int subscription_size = 5, partition_cnt = 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const int subscription_size = 5, partition_cnt = 4; | |
const int max_subscription_size = 5, partition_cnt = 4; |
tests/test.c
Outdated
do { | ||
verified = | ||
test_consumer_verify_assignment_topic_partition_list0( | ||
func, line, rk, expected_assignment); | ||
if (!verified) { | ||
if (do_poll) | ||
test_consumer_poll_once(rk, NULL, 100); | ||
else | ||
rd_usleep(100 * 1000, NULL); | ||
} | ||
} while (test_clock() < end && !verified); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for do-while loop. Normal while will work. We can remove checking !verified
at 2 places.
do { | |
verified = | |
test_consumer_verify_assignment_topic_partition_list0( | |
func, line, rk, expected_assignment); | |
if (!verified) { | |
if (do_poll) | |
test_consumer_poll_once(rk, NULL, 100); | |
else | |
rd_usleep(100 * 1000, NULL); | |
} | |
} while (test_clock() < end && !verified); | |
while (test_clock() < end) { | |
verified = | |
test_consumer_verify_assignment_topic_partition_list0( | |
func, line, rk, expected_assignment); | |
if(verified) | |
break; | |
if (do_poll) | |
test_consumer_poll_once(rk, NULL, 100); | |
else | |
rd_usleep(100 * 1000, NULL); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That way we still need to call
verified =
test_consumer_verify_assignment_topic_partition_list0(
func, line, rk, expected_assignment);
before of after the loop in case we call it with 0 timeout, I use the break but keep the do ... while
} while (test_clock() < end && !verified); | ||
|
||
if (!verified) { | ||
TEST_FAIL( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return after test fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed, TEST_FAIL
either calls thrd_exit
or assert
tests/test.c
Outdated
rd_bool_t verified = rd_false; | ||
|
||
TEST_SAY("Verifying assignment\n"); | ||
rd_kafka_topic_partition_list_sort(expected_assignment, NULL, NULL); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't sort the list here. Its the responsibility of the test_consumer_verify_assignment_topic_partition_list0
on how to verify.
tests/test.c
Outdated
rd_kafka_topic_partition_list_sort(assignment, NULL, NULL); | ||
rd_kafka_topic_partition_list_sort(expected_assignment, NULL, NULL); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we should not be sorting these lists. Caller will expect it to be as it is. I would prefer creating new list for expected_assignment
, match that and then destroy that. assignment
is local so we can do anything with it.
tests/test.c
Outdated
if (assignment->cnt != expected_assignment->cnt) { | ||
ret = rd_false; | ||
goto done; | ||
} else { | ||
for (i = 0; i < assignment->cnt; i++) { | ||
if (strcmp(assignment->elems[i].topic, | ||
expected_assignment->elems[i].topic) || | ||
assignment->elems[i].partition != | ||
expected_assignment->elems[i].partition) { | ||
ret = rd_false; | ||
goto done; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
goto done
or break
. Whatever you want.
if (assignment->cnt != expected_assignment->cnt) { | |
ret = rd_false; | |
goto done; | |
} else { | |
for (i = 0; i < assignment->cnt; i++) { | |
if (strcmp(assignment->elems[i].topic, | |
expected_assignment->elems[i].topic) || | |
assignment->elems[i].partition != | |
expected_assignment->elems[i].partition) { | |
ret = rd_false; | |
goto done; | |
} | |
} | |
} | |
if (assignment->cnt != expected_assignment->cnt) { | |
ret = rd_false; | |
goto done; | |
} | |
for (i = 0; i < assignment->cnt; i++) { | |
if (strcmp(assignment->elems[i].topic, | |
expected_assignment->elems[i].topic) || | |
assignment->elems[i].partition != | |
expected_assignment->elems[i].partition) { | |
ret = rd_false; | |
break; | |
} | |
} |
tests/0045-subscribe_update.c
Outdated
@@ -862,6 +950,12 @@ int main_0045_resubscribe_with_regex(int argc, char **argv) { | |||
return 0; | |||
} | |||
|
|||
int main_0045_many_updates(int argc, char **argv) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
int main_0045_many_updates(int argc, char **argv) { | |
int main_0045_subscribe_many_updates(int argc, char **argv) { |
tests/0045-subscribe_update.c
Outdated
* It needs to poll the consumer when awaiting for the | ||
* assignment in this case. | ||
*/ | ||
static void do_test_many_updates(rd_bool_t with_rebalance_cb) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static void do_test_many_updates(rd_bool_t with_rebalance_cb) { | |
static void do_test_subscribe_many_updates(rd_bool_t with_rebalance_cb) { |
050b06c
to
93df6bf
Compare
No description provided.