@@ -18,6 +18,7 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
1818 let Arguments {
1919 key,
2020 runtime_opts,
21+ promise_metrics,
2122 timing,
2223 mut memory_limit_rx,
2324 cpu_timer,
@@ -55,11 +56,13 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
5556 let is_wall_clock_limit_disabled = wall_clock_limit_ms == 0 ;
5657 let mut is_worker_entered = false ;
5758 let mut is_wall_clock_beforeunload_armed = false ;
59+ let mut is_cpu_time_soft_limit_reached = false ;
60+ let mut is_termination_requested = false ;
61+ let mut have_all_reqs_been_acknowledged = false ;
5862
5963 let mut cpu_usage_metrics_rx = cpu_usage_metrics_rx. unwrap ( ) ;
6064 let mut cpu_usage_ms = 0i64 ;
6165
62- let mut cpu_time_soft_limit_reached = false ;
6366 let mut wall_clock_alerts = 0 ;
6467 let mut req_ack_count = 0usize ;
6568
@@ -113,20 +116,23 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
113116 tokio:: pin!( wall_clock_duration_alert) ;
114117 tokio:: pin!( wall_clock_beforeunload_alert) ;
115118
116- loop {
119+ let result = ' scope : loop {
117120 tokio:: select! {
118121 _ = supervise. cancelled( ) => {
119- return ( ShutdownReason :: TerminationRequested , cpu_usage_ms) ;
122+ break ' scope ( ShutdownReason :: TerminationRequested , cpu_usage_ms) ;
120123 }
121124
122125 _ = async {
123126 match termination. as_ref( ) {
124127 Some ( token) => token. inbound. cancelled( ) . await ,
125128 None => pending( ) . await ,
126129 }
127- } => {
128- terminate_fn( ) ;
129- return ( ShutdownReason :: TerminationRequested , cpu_usage_ms) ;
130+ } , if !is_termination_requested => {
131+ is_termination_requested = true ;
132+ if promise_metrics. have_all_promises_been_resolved( ) {
133+ terminate_fn( ) ;
134+ break ' scope ( ShutdownReason :: TerminationRequested , cpu_usage_ms) ;
135+ }
130136 }
131137
132138 Some ( metrics) = cpu_usage_metrics_rx. recv( ) => {
@@ -160,17 +166,28 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
160166 if cpu_usage_ms >= hard_limit_ms as i64 {
161167 terminate_fn( ) ;
162168 error!( "CPU time hard limit reached: isolate: {:?}" , key) ;
163- return ( ShutdownReason :: CPUTime , cpu_usage_ms) ;
164- } else if cpu_usage_ms >= soft_limit_ms as i64 && !cpu_time_soft_limit_reached {
169+ break ' scope ( ShutdownReason :: CPUTime , cpu_usage_ms) ;
170+ } else if cpu_usage_ms >= soft_limit_ms as i64 && !is_cpu_time_soft_limit_reached {
165171 early_retire_fn( ) ;
166172 error!( "CPU time soft limit reached: isolate: {:?}" , key) ;
167- cpu_time_soft_limit_reached = true ;
168173
169- if req_ack_count == demand. load( Ordering :: Acquire ) {
174+ is_cpu_time_soft_limit_reached = true ;
175+ have_all_reqs_been_acknowledged = req_ack_count == demand. load( Ordering :: Acquire ) ;
176+
177+ if have_all_reqs_been_acknowledged
178+ && promise_metrics. have_all_promises_been_resolved( )
179+ {
170180 terminate_fn( ) ;
171181 error!( "early termination due to the last request being completed: isolate: {:?}" , key) ;
172- return ( ShutdownReason :: EarlyDrop , cpu_usage_ms) ;
182+ break ' scope ( ShutdownReason :: EarlyDrop , cpu_usage_ms) ;
173183 }
184+
185+ } else if is_cpu_time_soft_limit_reached
186+ && have_all_reqs_been_acknowledged
187+ && promise_metrics. have_all_promises_been_resolved( )
188+ {
189+ terminate_fn( ) ;
190+ break ' scope ( ShutdownReason :: EarlyDrop , cpu_usage_ms) ;
174191 }
175192 }
176193 }
@@ -179,42 +196,50 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
179196
180197 Some ( _) = wait_cpu_alarm( cpu_alarms_rx. as_mut( ) ) => {
181198 if is_worker_entered {
182- if !cpu_time_soft_limit_reached {
199+ if !is_cpu_time_soft_limit_reached {
183200 early_retire_fn( ) ;
184201 error!( "CPU time soft limit reached: isolate: {:?}" , key) ;
185- cpu_time_soft_limit_reached = true ;
186202
187- if req_ack_count == demand. load( Ordering :: Acquire ) {
203+ is_cpu_time_soft_limit_reached = true ;
204+ have_all_reqs_been_acknowledged = req_ack_count == demand. load( Ordering :: Acquire ) ;
205+
206+ if have_all_reqs_been_acknowledged
207+ && promise_metrics. have_all_promises_been_resolved( )
208+ {
188209 terminate_fn( ) ;
189210 error!( "early termination due to the last request being completed: isolate: {:?}" , key) ;
190- return ( ShutdownReason :: EarlyDrop , cpu_usage_ms) ;
211+ break ' scope ( ShutdownReason :: EarlyDrop , cpu_usage_ms) ;
191212 }
192213 } else {
193214 terminate_fn( ) ;
194215 error!( "CPU time hard limit reached: isolate: {:?}" , key) ;
195- return ( ShutdownReason :: CPUTime , cpu_usage_ms) ;
216+ break ' scope ( ShutdownReason :: CPUTime , cpu_usage_ms) ;
196217 }
197218 }
198219 }
199220
200221 Some ( _) = req_end_rx. recv( ) => {
201222 req_ack_count += 1 ;
223+ have_all_reqs_been_acknowledged = req_ack_count == demand. load( Ordering :: Acquire ) ;
202224
203- if !cpu_time_soft_limit_reached {
225+ if !is_cpu_time_soft_limit_reached {
204226 if let Some ( tx) = pool_msg_tx. clone( ) {
205227 if tx. send( UserWorkerMsgs :: Idle ( key) ) . is_err( ) {
206228 error!( "failed to send idle msg to pool: {:?}" , key) ;
207229 }
208230 }
209231 }
210232
211- if !cpu_time_soft_limit_reached || req_ack_count != demand. load( Ordering :: Acquire ) {
233+ if !is_cpu_time_soft_limit_reached
234+ || !have_all_reqs_been_acknowledged
235+ || !promise_metrics. have_all_promises_been_resolved( )
236+ {
212237 continue ;
213238 }
214239
215240 terminate_fn( ) ;
216241 error!( "early termination due to the last request being completed: isolate: {:?}" , key) ;
217- return ( ShutdownReason :: EarlyDrop , cpu_usage_ms) ;
242+ break ' scope ( ShutdownReason :: EarlyDrop , cpu_usage_ms) ;
218243 }
219244
220245 _ = wall_clock_duration_alert. tick( ) , if !is_wall_clock_limit_disabled => {
@@ -229,10 +254,8 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
229254 let is_in_flight_req_exists = req_ack_count != demand. load( Ordering :: Acquire ) ;
230255
231256 terminate_fn( ) ;
232-
233257 error!( "wall clock duration reached: isolate: {:?} (in_flight_req_exists = {})" , key, is_in_flight_req_exists) ;
234-
235- return ( ShutdownReason :: WallClockTime , cpu_usage_ms) ;
258+ break ' scope ( ShutdownReason :: WallClockTime , cpu_usage_ms) ;
236259 }
237260 }
238261
@@ -252,8 +275,16 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
252275 Some ( _) = memory_limit_rx. recv( ) => {
253276 terminate_fn( ) ;
254277 error!( "memory limit reached for the worker: isolate: {:?}" , key) ;
255- return ( ShutdownReason :: Memory , cpu_usage_ms) ;
278+ break ' scope ( ShutdownReason :: Memory , cpu_usage_ms) ;
256279 }
257280 }
281+ } ;
282+
283+ match result {
284+ ( ShutdownReason :: EarlyDrop , cpu_usage_ms) if is_termination_requested => {
285+ ( ShutdownReason :: TerminationRequested , cpu_usage_ms)
286+ }
287+
288+ result => result,
258289 }
259290}
0 commit comments