mptc/
server.rs

1use super::signals::SignalTracker;
2use super::worker::{Worker, WorkerInstance, WorkerState, WorkerStateEvent};
3use super::{Request, RequestStream};
4use std::collections::HashMap;
5use std::sync::mpsc;
6use std::thread;
7use std::time::Duration;
8use std::time::Instant;
9
10/// How often do we log our idle/active thread counts.
11const LOG_THREAD_STATS_FREQUENCY: i32 = 5;
12
13/// Only log thread stats if at least this many threads are active.
14const LOG_THREAD_MIN_ACTIVE: usize = 5;
15
16type RequestSendChannel = mpsc::Sender<Box<dyn Request>>;
17type RequestReceiveChannel = mpsc::Receiver<Box<dyn Request>>;
18
19type StateEventSendChannel = mpsc::Sender<WorkerStateEvent>;
20type StateEventReceiveChannel = mpsc::Receiver<WorkerStateEvent>;
21
22pub struct Server {
23    worker_id_gen: u64,
24    workers: HashMap<u64, WorkerInstance>,
25
26    to_parent_rx: StateEventReceiveChannel,
27    to_parent_tx: StateEventSendChannel,
28
29    min_workers: usize,
30    max_workers: usize,
31    min_idle_workers: usize,
32    max_worker_requests: usize,
33
34    sig_tracker: SignalTracker,
35
36    /// All inbound requests arrive via this stream.
37    stream: Box<dyn RequestStream>,
38}
39
40impl Server {
41    pub fn new(stream: Box<dyn RequestStream>) -> Server {
42        let (tx, rx): (StateEventSendChannel, StateEventReceiveChannel) = mpsc::channel();
43
44        Server {
45            stream,
46            workers: HashMap::new(),
47            sig_tracker: SignalTracker::new(),
48            worker_id_gen: 0,
49            to_parent_tx: tx,
50            to_parent_rx: rx,
51            min_workers: super::DEFAULT_MIN_WORKERS,
52            min_idle_workers: super::DEFAULT_MIN_IDLE_WORKERS,
53            max_workers: super::DEFAULT_MAX_WORKERS,
54            max_worker_requests: super::DEFAULT_MAX_WORKER_REQUESTS,
55        }
56    }
57
58    pub fn set_min_workers(&mut self, v: usize) {
59        self.min_workers = v;
60    }
61    pub fn set_min_idle_workers(&mut self, v: usize) {
62        self.min_idle_workers = v;
63    }
64    pub fn set_max_workers(&mut self, v: usize) {
65        self.max_workers = v;
66    }
67    pub fn set_max_worker_requests(&mut self, v: usize) {
68        self.max_worker_requests = v;
69    }
70
71    fn next_worker_id(&mut self) -> u64 {
72        self.worker_id_gen += 1;
73        self.worker_id_gen
74    }
75
76    fn start_workers(&mut self) {
77        while self.workers.len() < self.min_workers {
78            self.start_one_worker();
79        }
80    }
81
82    fn stop_workers(&mut self) {
83        while let Some(id) = self.workers.keys().next().copied() {
84            log::debug!("Server cleaning up worker {}", id);
85            self.remove_worker(&id, false);
86        }
87    }
88
89    fn start_one_worker(&mut self) -> u64 {
90        let worker_id = self.next_worker_id();
91        let to_parent_tx = self.to_parent_tx.clone();
92        let max_reqs = self.max_worker_requests;
93        let handler = self.stream.new_handler();
94        let sig_tracker = self.sig_tracker.clone();
95
96        log::debug!(
97            "Starting worker with idle={} active={}",
98            self.idle_worker_count(),
99            self.active_worker_count(),
100        );
101
102        let (tx, rx): (RequestSendChannel, RequestReceiveChannel) = mpsc::channel();
103
104        let handle = thread::spawn(move || {
105            let mut w = Worker::new(worker_id, max_reqs, sig_tracker, to_parent_tx, rx, handler);
106            w.run();
107        });
108
109        let instance = WorkerInstance {
110            worker_id,
111            state: WorkerState::Idle,
112            join_handle: handle,
113            to_worker_tx: tx,
114        };
115
116        self.workers.insert(worker_id, instance);
117
118        worker_id
119    }
120
121    /// Add additional idle workers if needed.
122    ///
123    /// Spawn at most one worker per maintenance cycle.
124    fn perform_idle_worker_maint(&mut self) {
125        let idle_workers = self.idle_worker_count();
126
127        if self.min_idle_workers > 0
128            && self.workers.len() < self.max_workers
129            && idle_workers < self.min_idle_workers
130        {
131            self.start_one_worker();
132            log::debug!("Sawned idle worker; idle={idle_workers}");
133        }
134    }
135
136    fn active_worker_count(&self) -> usize {
137        self.workers
138            .values()
139            .filter(|v| v.state == WorkerState::Active)
140            .count()
141    }
142
143    fn idle_worker_count(&self) -> usize {
144        self.workers
145            .values()
146            .filter(|v| v.state == WorkerState::Idle)
147            .count()
148    }
149
150    fn remove_worker(&mut self, worker_id: &u64, respawn: bool) {
151        log::debug!("server: removing worker {}", worker_id);
152
153        if let Some(worker) = self.workers.remove(worker_id) {
154            if let Err(e) = worker.join_handle.join() {
155                log::error!("Worker join failed with: {e:?}");
156            }
157        }
158        if respawn {
159            self.start_workers();
160        }
161    }
162
163    /// Set the state of our thread worker based on the state reported
164    /// to us by the thread.
165    fn handle_worker_event(&mut self, evt: &WorkerStateEvent) {
166        log::trace!("server received WorkerStateEvent: {evt}");
167
168        let worker_id = evt.worker_id();
169
170        let worker = match self.workers.get_mut(&worker_id) {
171            Some(w) => w,
172            None => {
173                log::error!("No worker found with id {worker_id}");
174                return;
175            }
176        };
177
178        if evt.state() == &WorkerState::Done {
179            // Worker is done -- remove it and fire up new ones as needed.
180            self.remove_worker(&worker_id, true);
181        } else {
182            log::trace!("Updating thread state for worker: {}", worker_id);
183            worker.state = evt.state().clone();
184        }
185
186        let idle = self.idle_worker_count();
187        let active = self.active_worker_count();
188
189        log::trace!("Workers idle={idle} active={active}");
190
191        if idle == 0 {
192            // Try to keep at least one spare worker.
193            if active < self.max_workers {
194                self.start_one_worker();
195            } else {
196                log::warn!("server: reached max workers.  Cannot create spare worker");
197            }
198        }
199    }
200
201    // Check for threads that panic!ed and were unable to send any
202    // worker state info to us.
203    fn check_failed_threads(&mut self) {
204        let failed: Vec<u64> = self
205            .workers
206            .iter()
207            .filter(|(_, v)| v.join_handle.is_finished())
208            .map(|(k, _)| *k) // k is a &u64
209            .collect();
210
211        for worker_id in failed {
212            log::debug!("Found a thread that exited ungracefully: {worker_id}");
213            self.remove_worker(&worker_id, true);
214        }
215    }
216
217    /// Returns true if the it's time to shut down.
218    ///
219    /// * `block` - Continue performing housekeeping until an idle worker
220    ///   becomes available.
221    fn housekeeping(&mut self, block: bool) -> bool {
222        loop {
223            if self.sig_tracker.reload_requested() {
224                log::info!("Reload request received.");
225                self.sig_tracker.handle_reload_requested();
226
227                if let Err(e) = self.stream.reload() {
228                    log::error!("Reload command failed, exiting. {e}");
229                    return true;
230                }
231            }
232
233            if self.sig_tracker.any_shutdown_requested() {
234                log::info!("Shutdown request received.");
235                self.stream.shutdown();
236                return true;
237            }
238
239            if block {
240                log::debug!("Waiting for a worker to become available...");
241
242                // Wait up to 1 second for a worker state event, then
243                // resume housekeeping, looping back around and trying
244                // again if necessary.
245                if let Ok(evt) = self.to_parent_rx.recv_timeout(Duration::from_secs(1)) {
246                    self.handle_worker_event(&evt);
247                }
248            }
249
250            // Pull all state events from the channel.
251            while let Ok(evt) = self.to_parent_rx.try_recv() {
252                self.handle_worker_event(&evt);
253            }
254
255            // Finally clean up any threads that panic!ed before they
256            // could send a state event.
257            self.check_failed_threads();
258
259            self.perform_idle_worker_maint();
260
261            if !block || self.idle_worker_count() > 0 {
262                return false;
263            }
264        }
265    }
266
267    pub fn run(&mut self) {
268        self.sig_tracker.track_graceful_shutdown();
269        self.sig_tracker.track_fast_shutdown();
270        self.sig_tracker.track_reload();
271
272        log::debug!(
273            "server: starting workers min-workers={} mid-idle-workers={} max-workers={} max-worker-requests={}",
274            self.min_workers,
275            self.min_idle_workers,
276            self.max_workers,
277            self.max_worker_requests,
278        );
279
280        self.start_workers();
281
282        let mut log_timer = Instant::now();
283
284        loop {
285            match self.stream.next() {
286                Ok(req_op) => {
287                    if let Some(req) = req_op {
288                        self.dispatch_request(req);
289                    }
290                }
291                Err(e) => {
292                    log::error!("Exiting on stream error: {e}");
293                    break;
294                }
295            }
296
297            if self.housekeeping(false) {
298                break;
299            }
300
301            self.log_thread_counts(&mut log_timer);
302        }
303
304        self.stop_workers();
305    }
306
307    /// Periodically report our active/idle thread disposition
308    /// so monitoring tools can keep track.
309    ///
310    /// Nothing is logged if all threads are idle.
311    ///
312    /// You can also do things via command line like: $ ps huH p $pid
313    fn log_thread_counts(&self, timer: &mut Instant) {
314        let elapsed = timer.elapsed().as_secs() as i32;
315
316        if LOG_THREAD_STATS_FREQUENCY - elapsed > 0 {
317            return;
318        }
319
320        let active_count = self.active_worker_count();
321
322        if active_count < LOG_THREAD_MIN_ACTIVE {
323            return;
324        }
325
326        log::info!(
327            "MPTC max-threads={} active-threads={} idle-threads={}",
328            self.max_workers,
329            active_count,
330            self.idle_worker_count(),
331        );
332
333        *timer = Instant::now();
334    }
335
336    fn dispatch_request(&mut self, request: Box<dyn Request>) {
337        let wid = self.next_idle_worker();
338        if let Some(worker) = self.workers.get_mut(&wid) {
339            worker.state = WorkerState::Active;
340
341            if let Err(e) = worker.to_worker_tx.send(request) {
342                // If sending to the worker fails, which really should
343                // not happen, since this worker was just verified idle,
344                // then the request as a whole is dropped.  We could
345                // handle this in a more robust way, but the assumption
346                // this should in effect never happen.  The logs will tell.
347                log::error!("Error sending data to worker: {e}");
348            }
349        }
350    }
351
352    fn next_idle_worker(&mut self) -> u64 {
353        // 1. Find an idle worker
354        if let Some((k, _)) = self
355            .workers
356            .iter()
357            .find(|(_, w)| w.state() == &WorkerState::Idle)
358        {
359            return *k; // &u64
360        }
361
362        // 2. Create an idle worker if we can
363        if self.workers.len() < self.max_workers {
364            return self.start_one_worker();
365        }
366
367        log::warn!("Max workers reached.  Cannot spawn new worker");
368
369        loop {
370            // 3. Wait for a worker to become idle.
371            self.housekeeping(true);
372
373            if let Some((k, _)) = self
374                .workers
375                .iter()
376                .find(|(_, w)| w.state() == &WorkerState::Idle)
377            {
378                return *k; // &u64
379            }
380        }
381    }
382}