Skip to content

Commit 4ec213a

Browse files
jsvdmashhurs
andauthored
Backport #229: Name netty threads with plugin id and their purpose (#230)
Co-authored-by: Mashhur <99575341+mashhurs@users.noreply.github.com>
1 parent 3c0c18a commit 4ec213a

File tree

6 files changed

+43
-8
lines changed

6 files changed

+43
-8
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 6.4.5
2+
- Name netty threads with plugin id and their purpose [229](https://github.com/logstash-plugins/logstash-input-tcp/pull/229)
3+
14
## 6.4.4
25
- update netty to 4.1.115 [#227](https://github.com/logstash-plugins/logstash-input-tcp/pull/227)
36

lib/logstash/inputs/tcp.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ def register
181181
validate_ssl_config!
182182

183183
if server?
184-
@loop = InputLoop.new(@host, @port, DecoderImpl.new(@codec, self), @tcp_keep_alive, java_ssl_context)
184+
@loop = InputLoop.new(@id, @host, @port, DecoderImpl.new(@codec, self), @tcp_keep_alive, java_ssl_context)
185185
end
186186
end
187187

logstash-input-tcp.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ Gem::Specification.new do |s|
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
77
s.authors = ["Elastic"]
88
s.email = 'info@elastic.co'
9-
s.homepage = "http://www.elastic.co/guide/en/logstash/current/index.html"
9+
s.homepage = "https://elastic.co/logstash"
1010
s.platform = "java"
1111
s.require_paths = ["lib", "vendor/jar-dependencies"]
1212

src/main/java/org/logstash/tcp/InputLoop.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.io.IOException;
2222
import java.net.InetSocketAddress;
2323

24+
import static org.logstash.tcp.util.DaemonThreadFactory.daemonThreadFactory;
25+
2426
/**
2527
* Plain TCP Server Implementation.
2628
*/
@@ -66,13 +68,13 @@ public final class InputLoop implements Runnable, Closeable {
6668
* @param decoder {@link Decoder} provided by Jruby
6769
* @param keepAlive set to true to instruct the socket to issue TCP keep alive
6870
*/
69-
public InputLoop(final String host, final int port, final Decoder decoder, final boolean keepAlive,
71+
public InputLoop(final String id, final String host, final int port, final Decoder decoder, final boolean keepAlive,
7072
final SslContext sslContext) {
7173
this.sslContext = sslContext;
7274
this.host = host;
7375
this.port = port;
74-
worker = new NioEventLoopGroup();
75-
boss = new NioEventLoopGroup(1);
76+
boss = new NioEventLoopGroup(1, daemonThreadFactory(id + "-bossGroup"));
77+
worker = new NioEventLoopGroup(daemonThreadFactory(id + "-workGroup"));
7678
serverBootstrap = new ServerBootstrap().group(boss, worker)
7779
.channel(NioServerSocketChannel.class)
7880
.option(ChannelOption.SO_BACKLOG, 1024)
@@ -152,7 +154,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
152154
}
153155

154156
/**
155-
* Listeners that flushes the the JRuby supplied {@link Decoder} when the socket is closed.
157+
* Listeners that flushes the JRuby supplied {@link Decoder} when the socket is closed.
156158
*/
157159
private static final class FlushOnCloseListener implements GenericFutureListener<Future<Void>> {
158160

@@ -199,7 +201,7 @@ private static final class DecoderAdapter extends ChannelInboundHandlerAdapter {
199201
this.decoder = decoder;
200202
}
201203

202-
// 6.07 updated to pass in the full netty ChannelHandlerContext instead of the remoteaddress field
204+
// 6.07 updated to pass in the full netty ChannelHandlerContext instead of the remote address field
203205
// corresponding interface updated
204206
@Override
205207
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package org.logstash.tcp.util;
2+
3+
import java.util.concurrent.ThreadFactory;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
6+
public class DaemonThreadFactory implements ThreadFactory {
7+
8+
final ThreadGroup group;
9+
final AtomicInteger threadNumber = new AtomicInteger(1);
10+
final String namePrefix;
11+
12+
DaemonThreadFactory(String namePrefix) {
13+
this.namePrefix = namePrefix;
14+
group = Thread.currentThread().getThreadGroup();
15+
}
16+
17+
@Override
18+
public Thread newThread(Runnable r) {
19+
Thread t = new Thread(group, r,
20+
namePrefix + "[T#" + threadNumber.getAndIncrement() + "]",
21+
0);
22+
t.setDaemon(true);
23+
return t;
24+
}
25+
26+
public static ThreadFactory daemonThreadFactory(String namePrefix) {
27+
return new DaemonThreadFactory(namePrefix);
28+
}
29+
30+
}

version

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
6.4.4
1+
6.4.5

0 commit comments

Comments
 (0)