1use super::signals::SignalTracker;
2use super::worker::{Worker, WorkerInstance, WorkerState, WorkerStateEvent};
3use super::{Request, RequestStream};
4use std::collections::HashMap;
5use std::sync::mpsc;
6use std::thread;
7use std::time::Duration;
8use std::time::Instant;
9
10const LOG_THREAD_STATS_FREQUENCY: i32 = 5;
12
13const LOG_THREAD_MIN_ACTIVE: usize = 5;
15
16type RequestSendChannel = mpsc::Sender<Box<dyn Request>>;
17type RequestReceiveChannel = mpsc::Receiver<Box<dyn Request>>;
18
19type StateEventSendChannel = mpsc::Sender<WorkerStateEvent>;
20type StateEventReceiveChannel = mpsc::Receiver<WorkerStateEvent>;
21
22pub struct Server {
23 worker_id_gen: u64,
24 workers: HashMap<u64, WorkerInstance>,
25
26 to_parent_rx: StateEventReceiveChannel,
27 to_parent_tx: StateEventSendChannel,
28
29 min_workers: usize,
30 max_workers: usize,
31 min_idle_workers: usize,
32 max_worker_requests: usize,
33
34 sig_tracker: SignalTracker,
35
36 stream: Box<dyn RequestStream>,
38}
39
40impl Server {
41 pub fn new(stream: Box<dyn RequestStream>) -> Server {
42 let (tx, rx): (StateEventSendChannel, StateEventReceiveChannel) = mpsc::channel();
43
44 Server {
45 stream,
46 workers: HashMap::new(),
47 sig_tracker: SignalTracker::new(),
48 worker_id_gen: 0,
49 to_parent_tx: tx,
50 to_parent_rx: rx,
51 min_workers: super::DEFAULT_MIN_WORKERS,
52 min_idle_workers: super::DEFAULT_MIN_IDLE_WORKERS,
53 max_workers: super::DEFAULT_MAX_WORKERS,
54 max_worker_requests: super::DEFAULT_MAX_WORKER_REQUESTS,
55 }
56 }
57
58 pub fn set_min_workers(&mut self, v: usize) {
59 self.min_workers = v;
60 }
61 pub fn set_min_idle_workers(&mut self, v: usize) {
62 self.min_idle_workers = v;
63 }
64 pub fn set_max_workers(&mut self, v: usize) {
65 self.max_workers = v;
66 }
67 pub fn set_max_worker_requests(&mut self, v: usize) {
68 self.max_worker_requests = v;
69 }
70
71 fn next_worker_id(&mut self) -> u64 {
72 self.worker_id_gen += 1;
73 self.worker_id_gen
74 }
75
76 fn start_workers(&mut self) {
77 while self.workers.len() < self.min_workers {
78 self.start_one_worker();
79 }
80 }
81
82 fn stop_workers(&mut self) {
83 while let Some(id) = self.workers.keys().next().copied() {
84 log::debug!("Server cleaning up worker {}", id);
85 self.remove_worker(&id, false);
86 }
87 }
88
89 fn start_one_worker(&mut self) -> u64 {
90 let worker_id = self.next_worker_id();
91 let to_parent_tx = self.to_parent_tx.clone();
92 let max_reqs = self.max_worker_requests;
93 let handler = self.stream.new_handler();
94 let sig_tracker = self.sig_tracker.clone();
95
96 log::debug!(
97 "Starting worker with idle={} active={}",
98 self.idle_worker_count(),
99 self.active_worker_count(),
100 );
101
102 let (tx, rx): (RequestSendChannel, RequestReceiveChannel) = mpsc::channel();
103
104 let handle = thread::spawn(move || {
105 let mut w = Worker::new(worker_id, max_reqs, sig_tracker, to_parent_tx, rx, handler);
106 w.run();
107 });
108
109 let instance = WorkerInstance {
110 worker_id,
111 state: WorkerState::Idle,
112 join_handle: handle,
113 to_worker_tx: tx,
114 };
115
116 self.workers.insert(worker_id, instance);
117
118 worker_id
119 }
120
121 fn perform_idle_worker_maint(&mut self) {
125 let idle_workers = self.idle_worker_count();
126
127 if self.min_idle_workers > 0
128 && self.workers.len() < self.max_workers
129 && idle_workers < self.min_idle_workers
130 {
131 self.start_one_worker();
132 log::debug!("Sawned idle worker; idle={idle_workers}");
133 }
134 }
135
136 fn active_worker_count(&self) -> usize {
137 self.workers
138 .values()
139 .filter(|v| v.state == WorkerState::Active)
140 .count()
141 }
142
143 fn idle_worker_count(&self) -> usize {
144 self.workers
145 .values()
146 .filter(|v| v.state == WorkerState::Idle)
147 .count()
148 }
149
150 fn remove_worker(&mut self, worker_id: &u64, respawn: bool) {
151 log::debug!("server: removing worker {}", worker_id);
152
153 if let Some(worker) = self.workers.remove(worker_id) {
154 if let Err(e) = worker.join_handle.join() {
155 log::error!("Worker join failed with: {e:?}");
156 }
157 }
158 if respawn {
159 self.start_workers();
160 }
161 }
162
163 fn handle_worker_event(&mut self, evt: &WorkerStateEvent) {
166 log::trace!("server received WorkerStateEvent: {evt}");
167
168 let worker_id = evt.worker_id();
169
170 let worker = match self.workers.get_mut(&worker_id) {
171 Some(w) => w,
172 None => {
173 log::error!("No worker found with id {worker_id}");
174 return;
175 }
176 };
177
178 if evt.state() == &WorkerState::Done {
179 self.remove_worker(&worker_id, true);
181 } else {
182 log::trace!("Updating thread state for worker: {}", worker_id);
183 worker.state = evt.state().clone();
184 }
185
186 let idle = self.idle_worker_count();
187 let active = self.active_worker_count();
188
189 log::trace!("Workers idle={idle} active={active}");
190
191 if idle == 0 {
192 if active < self.max_workers {
194 self.start_one_worker();
195 } else {
196 log::warn!("server: reached max workers. Cannot create spare worker");
197 }
198 }
199 }
200
201 fn check_failed_threads(&mut self) {
204 let failed: Vec<u64> = self
205 .workers
206 .iter()
207 .filter(|(_, v)| v.join_handle.is_finished())
208 .map(|(k, _)| *k) .collect();
210
211 for worker_id in failed {
212 log::debug!("Found a thread that exited ungracefully: {worker_id}");
213 self.remove_worker(&worker_id, true);
214 }
215 }
216
217 fn housekeeping(&mut self, block: bool) -> bool {
222 loop {
223 if self.sig_tracker.reload_requested() {
224 log::info!("Reload request received.");
225 self.sig_tracker.handle_reload_requested();
226
227 if let Err(e) = self.stream.reload() {
228 log::error!("Reload command failed, exiting. {e}");
229 return true;
230 }
231 }
232
233 if self.sig_tracker.any_shutdown_requested() {
234 log::info!("Shutdown request received.");
235 self.stream.shutdown();
236 return true;
237 }
238
239 if block {
240 log::debug!("Waiting for a worker to become available...");
241
242 if let Ok(evt) = self.to_parent_rx.recv_timeout(Duration::from_secs(1)) {
246 self.handle_worker_event(&evt);
247 }
248 }
249
250 while let Ok(evt) = self.to_parent_rx.try_recv() {
252 self.handle_worker_event(&evt);
253 }
254
255 self.check_failed_threads();
258
259 self.perform_idle_worker_maint();
260
261 if !block || self.idle_worker_count() > 0 {
262 return false;
263 }
264 }
265 }
266
267 pub fn run(&mut self) {
268 self.sig_tracker.track_graceful_shutdown();
269 self.sig_tracker.track_fast_shutdown();
270 self.sig_tracker.track_reload();
271
272 log::debug!(
273 "server: starting workers min-workers={} mid-idle-workers={} max-workers={} max-worker-requests={}",
274 self.min_workers,
275 self.min_idle_workers,
276 self.max_workers,
277 self.max_worker_requests,
278 );
279
280 self.start_workers();
281
282 let mut log_timer = Instant::now();
283
284 loop {
285 match self.stream.next() {
286 Ok(req_op) => {
287 if let Some(req) = req_op {
288 self.dispatch_request(req);
289 }
290 }
291 Err(e) => {
292 log::error!("Exiting on stream error: {e}");
293 break;
294 }
295 }
296
297 if self.housekeeping(false) {
298 break;
299 }
300
301 self.log_thread_counts(&mut log_timer);
302 }
303
304 self.stop_workers();
305 }
306
307 fn log_thread_counts(&self, timer: &mut Instant) {
314 let elapsed = timer.elapsed().as_secs() as i32;
315
316 if LOG_THREAD_STATS_FREQUENCY - elapsed > 0 {
317 return;
318 }
319
320 let active_count = self.active_worker_count();
321
322 if active_count < LOG_THREAD_MIN_ACTIVE {
323 return;
324 }
325
326 log::info!(
327 "MPTC max-threads={} active-threads={} idle-threads={}",
328 self.max_workers,
329 active_count,
330 self.idle_worker_count(),
331 );
332
333 *timer = Instant::now();
334 }
335
336 fn dispatch_request(&mut self, request: Box<dyn Request>) {
337 let wid = self.next_idle_worker();
338 if let Some(worker) = self.workers.get_mut(&wid) {
339 worker.state = WorkerState::Active;
340
341 if let Err(e) = worker.to_worker_tx.send(request) {
342 log::error!("Error sending data to worker: {e}");
348 }
349 }
350 }
351
352 fn next_idle_worker(&mut self) -> u64 {
353 if let Some((k, _)) = self
355 .workers
356 .iter()
357 .find(|(_, w)| w.state() == &WorkerState::Idle)
358 {
359 return *k; }
361
362 if self.workers.len() < self.max_workers {
364 return self.start_one_worker();
365 }
366
367 log::warn!("Max workers reached. Cannot spawn new worker");
368
369 loop {
370 self.housekeeping(true);
372
373 if let Some((k, _)) = self
374 .workers
375 .iter()
376 .find(|(_, w)| w.state() == &WorkerState::Idle)
377 {
378 return *k; }
380 }
381 }
382}