1919
2020package org .apache .thrift .server ;
2121
22- import java .util .Random ;
23- import java .util .WeakHashMap ;
22+ import java .util .Optional ;
2423import java .util .concurrent .ExecutorService ;
2524import java .util .concurrent .RejectedExecutionException ;
2625import java .util .concurrent .SynchronousQueue ;
26+ import java .util .concurrent .ThreadFactory ;
2727import java .util .concurrent .ThreadPoolExecutor ;
2828import java .util .concurrent .TimeUnit ;
2929
4141 * a worker pool that deals with client connections in blocking way.
4242 */
4343public class TThreadPoolServer extends TServer {
44- private static final Logger LOGGER = LoggerFactory .getLogger (TThreadPoolServer .class . getName () );
44+ private static final Logger LOGGER = LoggerFactory .getLogger (TThreadPoolServer .class );
4545
4646 public static class Args extends AbstractServerArgs <Args > {
4747 public int minWorkerThreads = 5 ;
4848 public int maxWorkerThreads = Integer .MAX_VALUE ;
4949 public ExecutorService executorService ;
5050 public int stopTimeoutVal = 60 ;
5151 public TimeUnit stopTimeoutUnit = TimeUnit .SECONDS ;
52- public int requestTimeout = 20 ;
53- public TimeUnit requestTimeoutUnit = TimeUnit .SECONDS ;
54- public int beBackoffSlotLength = 100 ;
55- public TimeUnit beBackoffSlotLengthUnit = TimeUnit .MILLISECONDS ;
5652
5753 public Args (TServerTransport transport ) {
5854 super (transport );
@@ -78,27 +74,6 @@ public Args stopTimeoutUnit(TimeUnit tu) {
7874 return this ;
7975 }
8076
81- public Args requestTimeout (int n ) {
82- requestTimeout = n ;
83- return this ;
84- }
85-
86- public Args requestTimeoutUnit (TimeUnit tu ) {
87- requestTimeoutUnit = tu ;
88- return this ;
89- }
90- //Binary exponential backoff slot length
91- public Args beBackoffSlotLength (int n ) {
92- beBackoffSlotLength = n ;
93- return this ;
94- }
95-
96- //Binary exponential backoff slot time unit
97- public Args beBackoffSlotLengthUnit (TimeUnit tu ) {
98- beBackoffSlotLengthUnit = tu ;
99- return this ;
100- }
101-
10277 public Args executorService (ExecutorService executorService ) {
10378 this .executorService = executorService ;
10479 return this ;
@@ -107,49 +82,40 @@ public Args executorService(ExecutorService executorService) {
10782
10883 // Executor service for handling client connections
10984 private ExecutorService executorService_ ;
110- private WeakHashMap <WorkerProcess , Boolean > activeWorkers = new WeakHashMap <>();
11185
11286 private final TimeUnit stopTimeoutUnit ;
11387
11488 private final long stopTimeoutVal ;
11589
116- private final TimeUnit requestTimeoutUnit ;
117-
118- private final long requestTimeout ;
119-
120- private final long beBackoffSlotInMillis ;
121-
122- private Random random = new Random (System .currentTimeMillis ());
123-
12490 public TThreadPoolServer (Args args ) {
12591 super (args );
12692
12793 stopTimeoutUnit = args .stopTimeoutUnit ;
12894 stopTimeoutVal = args .stopTimeoutVal ;
129- requestTimeoutUnit = args .requestTimeoutUnit ;
130- requestTimeout = args .requestTimeout ;
131- beBackoffSlotInMillis = args .beBackoffSlotLengthUnit .toMillis (args .beBackoffSlotLength );
13295
13396 executorService_ = args .executorService != null ?
13497 args .executorService : createDefaultExecutorService (args );
13598 }
13699
137100 private static ExecutorService createDefaultExecutorService (Args args ) {
138- SynchronousQueue <Runnable > executorQueue =
139- new SynchronousQueue <Runnable >();
140- return new ThreadPoolExecutor (args .minWorkerThreads ,
141- args .maxWorkerThreads ,
142- args .stopTimeoutVal ,
143- args .stopTimeoutUnit ,
144- executorQueue );
101+ return new ThreadPoolExecutor (args .minWorkerThreads , args .maxWorkerThreads , 60L , TimeUnit .SECONDS ,
102+ new SynchronousQueue <>(), new ThreadFactory () {
103+ @ Override
104+ public Thread newThread (Runnable r ) {
105+ Thread thread = new Thread (r );
106+ thread .setDaemon (true );
107+ thread .setName ("TThreadPoolServer WorkerProcess-%d" );
108+ return thread ;
109+ }
110+ });
145111 }
146112
147113 protected ExecutorService getExecutorService () {
148114 return executorService_ ;
149115 }
150116
151117 protected boolean preServe () {
152- try {
118+ try {
153119 serverTransport_ .listen ();
154120 } catch (TTransportException ttx ) {
155121 LOGGER .error ("Error occurred during listening." , ttx );
@@ -166,13 +132,16 @@ protected boolean preServe() {
166132 }
167133
168134 public void serve () {
169- if (!preServe ()) {
170- return ;
171- }
135+ if (!preServe ()) {
136+ return ;
137+ }
138+
139+ execute ();
140+
141+ executorService_ .shutdownNow ();
172142
173- execute ();
174143 if (!waitForShutdown ()) {
175- LOGGER .error ("Shutdown is not done after " + stopTimeoutVal + stopTimeoutUnit );
144+ LOGGER .error ("Shutdown is not done after " + stopTimeoutVal + stopTimeoutUnit );
176145 }
177146
178147 setServing (false );
@@ -182,51 +151,17 @@ protected void execute() {
182151 while (!stopped_ ) {
183152 try {
184153 TTransport client = serverTransport_ .accept ();
185- WorkerProcess wp = new WorkerProcess (client );
186-
187- int retryCount = 0 ;
188- long remainTimeInMillis = requestTimeoutUnit .toMillis (requestTimeout );
189- while (true ) {
190- try {
191- executorService_ .execute (wp );
192- activeWorkers .put (wp , Boolean .TRUE );
193- break ;
194- } catch (Throwable t ) {
195- if (t instanceof RejectedExecutionException ) {
196- retryCount ++;
197- try {
198- if (remainTimeInMillis > 0 ) {
199- //do a truncated 20 binary exponential backoff sleep
200- long sleepTimeInMillis = ((long ) (random .nextDouble () *
201- (1L << Math .min (retryCount , 20 )))) * beBackoffSlotInMillis ;
202- sleepTimeInMillis = Math .min (sleepTimeInMillis , remainTimeInMillis );
203- TimeUnit .MILLISECONDS .sleep (sleepTimeInMillis );
204- remainTimeInMillis = remainTimeInMillis - sleepTimeInMillis ;
205- } else {
206- client .close ();
207- wp = null ;
208- LOGGER .warn ("Task has been rejected by ExecutorService " + retryCount
209- + " times till timedout, reason: " + t );
210- break ;
211- }
212- } catch (InterruptedException e ) {
213- LOGGER .warn ("Interrupted while waiting to place client on executor queue." );
214- Thread .currentThread ().interrupt ();
215- break ;
216- }
217- } else if (t instanceof Error ) {
218- LOGGER .error ("ExecutorService threw error: " + t , t );
219- throw (Error )t ;
220- } else {
221- //for other possible runtime errors from ExecutorService, should also not kill serve
222- LOGGER .warn ("ExecutorService threw error: " + t , t );
223- break ;
224- }
154+ try {
155+ executorService_ .execute (new WorkerProcess (client ));
156+ } catch (RejectedExecutionException ree ) {
157+ if (!stopped_ ) {
158+ LOGGER .warn ("ThreadPool is saturated with incoming requests. Closing latest connection." );
225159 }
160+ client .close ();
226161 }
227162 } catch (TTransportException ttx ) {
228163 if (!stopped_ ) {
229- LOGGER .warn ("Transport error occurred during acceptance of message. " , ttx );
164+ LOGGER .warn ("Transport error occurred during acceptance of message" , ttx );
230165 }
231166 }
232167 }
@@ -241,8 +176,7 @@ protected boolean waitForShutdown() {
241176 long now = System .currentTimeMillis ();
242177 while (timeoutMS >= 0 ) {
243178 try {
244- executorService_ .awaitTermination (timeoutMS , TimeUnit .MILLISECONDS );
245- return true ;
179+ return executorService_ .awaitTermination (timeoutMS , TimeUnit .MILLISECONDS );
246180 } catch (InterruptedException ix ) {
247181 long newnow = System .currentTimeMillis ();
248182 timeoutMS -= (newnow - now );
@@ -255,10 +189,6 @@ protected boolean waitForShutdown() {
255189 public void stop () {
256190 stopped_ = true ;
257191 serverTransport_ .interrupt ();
258- executorService_ .shutdown ();
259- for (WorkerProcess wp : activeWorkers .keySet ()) {
260- wp .stop ();
261- }
262192 }
263193
264194 private class WorkerProcess implements Runnable {
@@ -287,7 +217,7 @@ public void run() {
287217 TProtocol inputProtocol = null ;
288218 TProtocol outputProtocol = null ;
289219
290- TServerEventHandler eventHandler = null ;
220+ Optional < TServerEventHandler > eventHandler = Optional . empty () ;
291221 ServerContext connectionContext = null ;
292222
293223 try {
@@ -297,22 +227,25 @@ public void run() {
297227 inputProtocol = inputProtocolFactory_ .getProtocol (inputTransport );
298228 outputProtocol = outputProtocolFactory_ .getProtocol (outputTransport );
299229
300- eventHandler = getEventHandler ();
301- if (eventHandler != null ) {
302- connectionContext = eventHandler .createContext (inputProtocol , outputProtocol );
303- }
304- // we check stopped_ first to make sure we're not supposed to be shutting
305- // down. this is necessary for graceful shutdown.
306- while (true ) {
230+ eventHandler = Optional .ofNullable (getEventHandler ());
307231
308- if (eventHandler != null ) {
309- eventHandler .processContext ( connectionContext , inputTransport , outputTransport );
310- }
232+ if (eventHandler . isPresent () ) {
233+ connectionContext = eventHandler .get (). createContext ( inputProtocol , outputProtocol );
234+ }
311235
312- if (stopped_ ) {
313- break ;
314- }
315- processor .process (inputProtocol , outputProtocol );
236+ while (true ) {
237+ if (Thread .currentThread ().isInterrupted ()) {
238+ LOGGER .debug ("WorkerProcess requested to shutdown" );
239+ break ;
240+ }
241+ if (eventHandler .isPresent ()) {
242+ eventHandler .get ().processContext (connectionContext , inputTransport , outputTransport );
243+ }
244+ // This process cannot be interrupted by Interrupting the Thread. This
245+ // will return once a message has been processed or the socket timeout
246+ // has elapsed, at which point it will return and check the interrupt
247+ // state of the thread.
248+ processor .process (inputProtocol , outputProtocol );
316249 }
317250 } catch (Exception x ) {
318251 LOGGER .debug ("Error processing request" , x );
@@ -322,11 +255,11 @@ public void run() {
322255 // Ignore err-logging all transport-level/type exceptions
323256 if (!isIgnorableException (x )) {
324257 // Log the exception at error level and continue
325- LOGGER .error ((x instanceof TException ? "Thrift " : "" ) + "Error occurred during processing of message." , x );
258+ LOGGER .error ((x instanceof TException ? "Thrift " : "" ) + "Error occurred during processing of message." , x );
326259 }
327260 } finally {
328- if (eventHandler != null ) {
329- eventHandler .deleteContext (connectionContext , inputProtocol , outputProtocol );
261+ if (eventHandler . isPresent () ) {
262+ eventHandler .get (). deleteContext (connectionContext , inputProtocol , outputProtocol );
330263 }
331264 if (inputTransport != null ) {
332265 inputTransport .close ();
@@ -344,10 +277,9 @@ private boolean isIgnorableException(Exception x) {
344277 TTransportException tTransportException = null ;
345278
346279 if (x instanceof TTransportException ) {
347- tTransportException = (TTransportException )x ;
348- }
349- else if (x .getCause () instanceof TTransportException ) {
350- tTransportException = (TTransportException )x .getCause ();
280+ tTransportException = (TTransportException ) x ;
281+ } else if (x .getCause () instanceof TTransportException ) {
282+ tTransportException = (TTransportException ) x .getCause ();
351283 }
352284
353285 if (tTransportException != null ) {
@@ -359,9 +291,5 @@ else if (x.getCause() instanceof TTransportException) {
359291 }
360292 return false ;
361293 }
362-
363- private void stop () {
364- client_ .close ();
365- }
366294 }
367295}
0 commit comments