Skip to content

Commit 16957ca

Browse files
committed
Wait at most 30s for empty socket write buffer
Closes #83.
1 parent fdb67ba commit 16957ca

File tree

3 files changed

+25
-2
lines changed

3 files changed

+25
-2
lines changed

docs/config.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,16 @@ for easy modification.
267267
*Default*: ``5.0``
268268

269269

270+
``constants.SOCKET_CLOSE_WAIT_TIMEOUT``
271+
272+
Maximum time in seconds to wait for the socket's write buffer to get empty.
273+
Set to 0 to disable waiting for the socket write buffer to get empty.
274+
275+
*Type*: ``float``
276+
277+
*Default*: ``30.0``
278+
279+
270280
``constants.QUEUE_CHECK_INTERVAL``
271281

272282
Interval in seconds to check the internal queue for new messages

logstash_async/constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ class Constants:
1313
"""
1414
# timeout in seconds for TCP connections
1515
SOCKET_TIMEOUT = 5.0
16+
# maximum time in seconds to wait for the socket's write buffer to get empty
17+
SOCKET_CLOSE_WAIT_TIMEOUT = 30.0
1618
# interval in seconds to check the internal queue for new messages to be cached in the database
1719
QUEUE_CHECK_INTERVAL = 2.0
1820
# interval in seconds to send cached events from the database to Logstash

logstash_async/transport.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import pylogbeat
1919
import requests
2020

21+
from logstash_async.constants import constants
2122
from logstash_async.utils import ichunked
2223

2324

@@ -126,12 +127,22 @@ def _convert_data_to_send(self, data):
126127
def _close(self, force=False):
127128
if not self._keep_connection or force:
128129
if self._sock:
129-
while not self._is_sock_write_buff_empty():
130-
time.sleep(0.05)
130+
self._wait_for_socket_buffer_empty()
131131
self._sock.shutdown(socket.SHUT_WR)
132132
self._sock.close()
133133
self._sock = None
134134

135+
# ----------------------------------------------------------------------
136+
def _wait_for_socket_buffer_empty(self):
137+
wait_timeout = constants.SOCKET_CLOSE_WAIT_TIMEOUT
138+
interval = 0.05
139+
time_waited = 0
140+
# wait until the socket's write buffer is empty
141+
# but do not wait longer than SOCKET_CLOSE_WAIT_TIMEOUT
142+
while not self._is_sock_write_buff_empty() and time_waited < wait_timeout:
143+
time_waited += interval
144+
time.sleep(interval)
145+
135146
# ----------------------------------------------------------------------
136147
def _is_sock_write_buff_empty(self):
137148
socket_fd = self._sock.fileno()

0 commit comments

Comments
 (0)