You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: src/RdKafka/FFI/Methods.php
+41-57Lines changed: 41 additions & 57 deletions
Original file line number
Diff line number
Diff line change
@@ -565,22 +565,16 @@ public static function rd_kafka_message_status(?\FFI\CData $rkmessage): int
565
565
566
566
/**
567
567
* <p>Create configuration object. </p>
568
-
* <p>When providing your own configuration to the <code>rd_kafka_*_new_*</code>() calls the rd_kafka_conf_t objects needs to be created with this function which will set up the defaults. I.e.: </p><div class="fragment"><div class="line">rd_kafka_conf_t *myconf;</div>
* <div class="ttc" id="ardkafka_8h_html_a63d5cd86ab1f77772b2be170e1c09c24"><div class="ttname">rd_kafka_new</div><div class="ttdeci">RD_EXPORT rd_kafka_t * rd_kafka_new(rd_kafka_type_t type, rd_kafka_conf_t *conf, char *errstr, size_t errstr_size)</div><div class="ttdoc">Creates a new Kafka handle and starts its operation according to the specified type (RD_KAFKA_CONSUME...</div></div>
* </div><!-- fragment --><p>Please see CONFIGURATION.md for the default settings or use rd_kafka_conf_properties_show() to provide the information at runtime.</p>
568
+
* <p>When providing your own configuration to the <code>rd_kafka_*_new_*</code>() calls the rd_kafka_conf_t objects needs to be created with this function which will set up the defaults. I.e.: </p><div><pre><code>rd_kafka_conf_t *myconf;
569
+
* rd_kafka_conf_res_t res;
570
+
*
571
+
* myconf = rd_kafka_conf_new();
572
+
* res = rd_kafka_conf_set(myconf, "socket.timeout.ms", "600",
573
+
* errstr, sizeof(errstr));
574
+
* if (res != RD_KAFKA_CONF_OK)
575
+
* die("%s\n", errstr);
576
+
*
577
+
* rk = rd_kafka_new(..., myconf);</code></pre></div><!-- fragment --><p>Please see CONFIGURATION.md for the default settings or use rd_kafka_conf_properties_show() to provide the information at runtime.</p>
584
578
* <p>The properties are identical to the Apache Kafka configuration properties whenever possible.</p>
585
579
* <dl class="section remark"><dt>Remarks</dt><dd>A successful call to rd_kafka_new() will assume ownership of the conf object and rd_kafka_conf_destroy() must not be called.</dd></dl>
586
580
*
@@ -748,47 +742,37 @@ public static function rd_kafka_conf_set_consume_cb(?\FFI\CData $conf, $consume_
748
742
* rd_kafka_assignment_lost() </dd>
749
743
* <dd>
750
744
* rd_kafka_rebalance_protocol()</dd></dl>
751
-
* <p>The following example shows the application's responsibilities: </p><div class="fragment"><div class="line"><span class="keyword">static</span> <span class="keywordtype">void</span> rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,</div>
* <div class="ttc" id="ardkafka_8h_html_a0566419eff2001f8371e3b50aa7d26e9"><div class="ttname">rd_kafka_assign</div><div class="ttdeci">RD_EXPORT rd_kafka_resp_err_t rd_kafka_assign(rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *partitions)</div><div class="ttdoc">Atomic assignment of partitions to consume.</div></div>
786
-
* <div class="ttc" id="ardkafka_8h_html_a27f7bd18e42ed44f33932c2f9b6a4192"><div class="ttname">rd_kafka_incremental_unassign</div><div class="ttdeci">RD_EXPORT rd_kafka_error_t * rd_kafka_incremental_unassign(rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *partitions)</div><div class="ttdoc">Incrementally remove partitions from the current assignment.</div></div>
787
-
* <div class="ttc" id="ardkafka_8h_html_a3bd9f42cf76b2a8cf2f4a4343abe8556"><div class="ttname">rd_kafka_incremental_assign</div><div class="ttdeci">RD_EXPORT rd_kafka_error_t * rd_kafka_incremental_assign(rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *partitions)</div><div class="ttdoc">Incrementally add partitions to the current assignment.</div></div>
788
-
* <div class="ttc" id="ardkafka_8h_html_a57d367712406848d59cdaae97ab29354"><div class="ttname">rd_kafka_rebalance_protocol</div><div class="ttdeci">RD_EXPORT const char * rd_kafka_rebalance_protocol(rd_kafka_t *rk)</div><div class="ttdoc">The rebalance protocol currently in use. This will be "NONE" if the consumer has not (yet) joined a g...</div></div>
789
-
* <div class="ttc" id="ardkafka_8h_html_ab96539928328f14c3c9177ea0c896c87"><div class="ttname">rd_kafka_commit</div><div class="ttdeci">RD_EXPORT rd_kafka_resp_err_t rd_kafka_commit(rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets, int async)</div><div class="ttdoc">Commit offsets on broker for the provided list of partitions.</div></div>
790
-
* <div class="ttc" id="astructrd__kafka__topic__partition__list__t_html"><div class="ttname">rd_kafka_topic_partition_list_t</div><div class="ttdoc">A growable list of Topic+Partitions.</div><div class="ttdef"><b>Definition:</b> rdkafka.h:917</div></div>
791
-
* </div><!-- fragment --><dl class="section remark"><dt>Remarks</dt><dd>The above example lacks error handling for assign calls, see the examples/ directory. </dd></dl>
745
+
* <p>The following example shows the application's responsibilities: </p><div><pre><code>static void rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
746
+
* rd_kafka_topic_partition_list_t *partitions,
747
+
* void *opaque) {
748
+
*
749
+
* switch (err)
750
+
* {
751
+
* case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
752
+
* // application may load offets from arbitrary external
753
+
* // storage here and update \p partitions
754
+
* if (!strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE"))
755
+
* rd_kafka_incremental_assign(rk, partitions);
756
+
* else // EAGER
757
+
* rd_kafka_assign(rk, partitions);
758
+
* break;
759
+
*
760
+
* case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
761
+
* if (manual_commits) // Optional explicit manual commit
* if (!strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE"))
765
+
* rd_kafka_incremental_unassign(rk, partitions);
766
+
* else // EAGER
767
+
* rd_kafka_assign(rk, NULL);
768
+
* break;
769
+
*
770
+
* default:
771
+
* handle_unlikely_error(err);
772
+
* rd_kafka_assign(rk, NULL); // sync state
773
+
* break;
774
+
* }
775
+
* }</code></pre></div><!-- fragment --><dl class="section remark"><dt>Remarks</dt><dd>The above example lacks error handling for assign calls, see the examples/ directory. </dd></dl>
0 commit comments