evergreen/osrf/
server.rs

1use crate::init;
2use crate::osrf::app;
3use crate::osrf::client::Client;
4use crate::osrf::conf;
5use crate::osrf::message;
6use crate::osrf::method;
7use crate::osrf::sclient::HostSettings;
8use crate::osrf::session;
9use crate::osrf::worker::{Worker, WorkerState, WorkerStateEvent};
10use crate::util;
11use crate::EgResult;
12use mptc::signals::SignalTracker;
13use std::collections::HashMap;
14use std::env;
15use std::sync::mpsc;
16use std::sync::OnceLock;
17use std::thread;
18use std::time;
19use std::time::Duration;
20use std::time::{SystemTime, UNIX_EPOCH};
21
22// Reminder to self: this code does not leverage MPTC, because MPTC is
23// operates as a top-down request/connection handler.  OpenSRF workers,
24// however, operate autonomously and report their availabilty in a way
25// MPTC is not designed to handle.  In other words, using MPTC, an
26// OpenSRF worker would appear to be busy from the moment the worker
27// was spawned until it exited, as opposed to being busy only when it's
28// handling a request.
29
30static REGISTERED_METHODS: OnceLock<HashMap<String, method::MethodDef>> = OnceLock::new();
31
32/// Warn when there are fewer than this many idle threads
33const IDLE_THREAD_WARN_THRESHOLD: usize = 1;
34/// How often do we wake to check for shutdown, etc. signals when
35/// no other activity is occurring.
36const IDLE_WAKE_TIME: u64 = 3;
37/// Max time in seconds to allow active workers to finish their tasks.
38const SHUTDOWN_MAX_WAIT: u64 = 30;
39const DEFAULT_MIN_WORKERS: usize = 3;
40const DEFAULT_MAX_WORKERS: usize = 30;
41const DEFAULT_MAX_WORKER_REQUESTS: usize = 1000;
42const DEFAULT_WORKER_KEEPALIVE: usize = 5;
43const DEFAULT_MIN_IDLE_WORKERS: usize = 1;
44
45/// How often do we log our idle/active thread counts.
46const LOG_THREAD_STATS_FREQUENCY: u64 = 3;
47
48/// Only log thread stats if at least this many threads are active.
49const LOG_THREAD_MIN_ACTIVE: usize = 5;
50
51#[derive(Debug)]
52pub struct WorkerThread {
53    pub state: WorkerState,
54    pub join_handle: thread::JoinHandle<()>,
55}
56
57pub struct Server {
58    application: Box<dyn app::Application>,
59    client: Client,
60    // Worker threads are tracked via their bus address.
61    workers: HashMap<u64, WorkerThread>,
62    // Each thread gets a simple numeric ID.
63    worker_id_gen: u64,
64    to_parent_tx: mpsc::SyncSender<WorkerStateEvent>,
65    to_parent_rx: mpsc::Receiver<WorkerStateEvent>,
66    min_workers: usize,
67    max_workers: usize,
68    max_worker_requests: usize,
69    worker_keepalive: usize,
70
71    exited_workers: HashMap<u64, WorkerThread>,
72
73    sig_tracker: SignalTracker,
74
75    /// Minimum number of idle workers.  Note we don't support
76    /// max_idle_workers at this time -- it would require adding an
77    /// additional mpsc channel for every thread to deliver the shutdown
78    /// request to individual threads.  Hardly seems worth it -- maybe.
79    /// For comparision, the OSRF C code has no min/max idle support
80    /// either.
81    min_idle_workers: usize,
82}
83
84impl Server {
85    pub fn start(application: Box<dyn app::Application>) -> EgResult<()> {
86        let service = application.name();
87
88        let mut options = init::InitOptions::new();
89        options.appname = Some(service.to_string());
90
91        let client = init::osrf_init(&options)?;
92
93        let mut min_workers =
94            HostSettings::get(&format!("apps/{service}/unix_config/min_children"))?
95                .as_usize()
96                .unwrap_or(DEFAULT_MIN_WORKERS);
97
98        let mut min_idle_workers =
99            HostSettings::get(&format!("apps/{service}/unix_config/min_spare_children"))?
100                .as_usize()
101                .unwrap_or(DEFAULT_MIN_IDLE_WORKERS);
102
103        let mut max_workers =
104            HostSettings::get(&format!("apps/{service}/unix_config/max_children"))?
105                .as_usize()
106                .unwrap_or(DEFAULT_MAX_WORKERS);
107
108        let mut max_worker_requests =
109            HostSettings::get(&format!("apps/{service}/unix_config/max_requests"))?
110                .as_usize()
111                .unwrap_or(DEFAULT_MAX_WORKER_REQUESTS);
112
113        let mut worker_keepalive =
114            HostSettings::get(&format!("apps/{service}/unix_config/keepalive"))?
115                .as_usize()
116                .unwrap_or(DEFAULT_WORKER_KEEPALIVE);
117
118        // Environment vars override values from opensrf.settings
119
120        if let Ok(num) = env::var("OSRF_SERVER_MIN_WORKERS") {
121            if let Ok(num) = num.parse::<usize>() {
122                min_workers = num;
123            }
124        }
125
126        if let Ok(num) = env::var("OSRF_SERVER_MIN_IDLE_WORKERS") {
127            if let Ok(num) = num.parse::<usize>() {
128                min_idle_workers = num;
129            }
130        }
131
132        if let Ok(num) = env::var("OSRF_SERVER_MAX_WORKERS") {
133            if let Ok(num) = num.parse::<usize>() {
134                max_workers = num;
135            }
136        }
137
138        if let Ok(num) = env::var("OSRF_SERVER_MAX_WORKER_REQUESTS") {
139            if let Ok(num) = num.parse::<usize>() {
140                max_worker_requests = num;
141            }
142        }
143
144        if let Ok(num) = env::var("OSRF_SERVER_WORKER_KEEPALIVE") {
145            if let Ok(num) = num.parse::<usize>() {
146                worker_keepalive = num;
147            }
148        }
149
150        log::info!(
151            "Starting service {} with min-workers={} min-idle={} max-workers={} max-requests={} keepalive={}",
152            service,
153            min_workers,
154            min_idle_workers,
155            max_workers,
156            max_worker_requests,
157            worker_keepalive,
158        );
159
160        // We have a single to-parent channel whose trasmitter is cloned
161        // per thread.  Communication from worker threads to the parent
162        // are synchronous so the parent always knows exactly how many
163        // threads are active.  With a sync_channel queue size of 0,
164        // workers will block after posting their state events to
165        // the server until the server receives the event.
166        let (tx, rx): (
167            mpsc::SyncSender<WorkerStateEvent>,
168            mpsc::Receiver<WorkerStateEvent>,
169        ) = mpsc::sync_channel(0);
170
171        let mut server = Server {
172            client,
173            application,
174            min_workers,
175            max_workers,
176            max_worker_requests,
177            worker_keepalive,
178            min_idle_workers,
179            worker_id_gen: 0,
180            to_parent_tx: tx,
181            to_parent_rx: rx,
182            workers: HashMap::new(),
183            exited_workers: HashMap::new(),
184            sig_tracker: SignalTracker::new(),
185        };
186
187        server.listen()
188    }
189
190    fn app_mut(&mut self) -> &mut Box<dyn app::Application> {
191        &mut self.application
192    }
193
194    fn service(&self) -> &str {
195        self.application.name()
196    }
197
198    fn next_worker_id(&mut self) -> u64 {
199        self.worker_id_gen += 1;
200        self.worker_id_gen
201    }
202
203    fn spawn_threads(&mut self) {
204        if self.sig_tracker.any_shutdown_requested() {
205            return;
206        }
207        while self.workers.len() < self.min_workers {
208            self.spawn_one_thread();
209        }
210    }
211
212    fn spawn_one_thread(&mut self) {
213        let worker_id = self.next_worker_id();
214        let to_parent_tx = self.to_parent_tx.clone();
215        let service = self.service().to_string();
216        let factory = self.application.worker_factory();
217        let sig_tracker = self.sig_tracker.clone();
218        let max_worker_requests = self.max_worker_requests;
219        let worker_keepalive = self.worker_keepalive;
220
221        log::trace!("server: spawning a new worker {worker_id}");
222
223        let handle = thread::spawn(move || {
224            Server::start_worker_thread(
225                sig_tracker,
226                factory,
227                service,
228                worker_id,
229                to_parent_tx,
230                max_worker_requests,
231                worker_keepalive,
232            );
233        });
234
235        self.workers.insert(
236            worker_id,
237            WorkerThread {
238                state: WorkerState::Idle,
239                join_handle: handle,
240            },
241        );
242    }
243
244    fn start_worker_thread(
245        sig_tracker: SignalTracker,
246        factory: app::ApplicationWorkerFactory,
247        service: String,
248        worker_id: u64,
249        to_parent_tx: mpsc::SyncSender<WorkerStateEvent>,
250        max_worker_requests: usize,
251        worker_keepalive: usize,
252    ) {
253        log::debug!("{service} Creating new worker {worker_id}");
254
255        let worker_result = Worker::new(
256            service,
257            worker_id,
258            sig_tracker,
259            to_parent_tx,
260            max_worker_requests,
261            worker_keepalive,
262        );
263
264        let mut worker = match worker_result {
265            Ok(w) => w,
266            Err(e) => {
267                log::error!("Cannot create worker: {e}. Exiting.");
268
269                // If a worker dies during creation, likely they all
270                // will.  Add a sleep here to avoid a storm of new
271                // worker threads spinning up and failing.
272                thread::sleep(Duration::from_secs(5));
273                return;
274            }
275        };
276
277        log::trace!("Worker {worker_id} going into listen()");
278
279        if let Err(e) = worker.listen(factory) {
280            // Failure here ikely means a systemic issue that could
281            // result in a lot of thread churn.  Sleep for a sec to keep
282            // things from getting too chaotic.
283            thread::sleep(time::Duration::from_secs(1));
284
285            // This code is running within the worker thread.  A failure
286            // may mean the worker was unable to communicate its status
287            // to the main thread.  Panic here to force a cleanup.
288            panic!("{worker} failed; forcing an exit: {e}");
289        }
290    }
291
292    /// List of domains where our service is allowed to run and
293    /// therefore whose routers with whom our presence should be registered.
294    fn hosting_domains(&self) -> Vec<(String, String)> {
295        let mut domains: Vec<(String, String)> = Vec::new();
296        for router in conf::config().client().routers() {
297            match router.services() {
298                Some(services) => {
299                    if services.iter().any(|s| s.eq(self.service())) {
300                        domains.push((router.username().to_string(), router.domain().to_string()));
301                    }
302                }
303                None => {
304                    // A domain with no specific set of hosted services
305                    // hosts all services
306                    domains.push((router.username().to_string(), router.domain().to_string()));
307                }
308            }
309        }
310
311        domains
312    }
313
314    fn register_routers(&mut self) -> EgResult<()> {
315        for (username, domain) in self.hosting_domains().iter() {
316            log::info!("server: registering with router at {domain}");
317
318            self.client
319                .send_router_command(username, domain, "register", Some(self.service()))?;
320        }
321
322        Ok(())
323    }
324
325    fn unregister_routers(&mut self) -> EgResult<()> {
326        for (username, domain) in self.hosting_domains().iter() {
327            log::info!("server: un-registering with router at {domain}");
328
329            self.client.send_router_command(
330                username,
331                domain,
332                "unregister",
333                Some(self.service()),
334            )?;
335        }
336        Ok(())
337    }
338
339    fn service_init(&mut self) -> EgResult<()> {
340        let client = self.client.clone();
341        self.app_mut().init(client)
342    }
343
344    pub fn methods() -> &'static HashMap<String, method::MethodDef> {
345        if let Some(h) = REGISTERED_METHODS.get() {
346            h
347        } else {
348            log::error!("Cannot call methods() prior to registration");
349            panic!("Cannot call methods() prior to registration");
350        }
351    }
352
353    fn register_methods(&mut self) -> EgResult<()> {
354        let client = self.client.clone();
355        let list = self.app_mut().register_methods(client)?;
356        let mut hash: HashMap<String, method::MethodDef> = HashMap::new();
357        for m in list {
358            hash.insert(m.name().to_string(), m);
359        }
360        self.add_system_methods(&mut hash);
361
362        if REGISTERED_METHODS.set(hash).is_err() {
363            return Err("Cannot call register_methods() more than once".into());
364        }
365
366        Ok(())
367    }
368
369    fn add_system_methods(&self, hash: &mut HashMap<String, method::MethodDef>) {
370        let name = "opensrf.system.echo";
371        let mut method = method::MethodDef::new(name, method::ParamCount::Any, system_method_echo);
372        method.set_desc("Echo back any values sent");
373        hash.insert(name.to_string(), method);
374
375        let name = "opensrf.system.time";
376        let mut method = method::MethodDef::new(name, method::ParamCount::Zero, system_method_time);
377        method.set_desc("Respond with system time in epoch seconds");
378        hash.insert(name.to_string(), method);
379
380        let name = "opensrf.system.method.all";
381        let mut method = method::MethodDef::new(
382            name,
383            method::ParamCount::Range(0, 1),
384            system_method_introspect,
385        );
386        method.set_desc("List published API definitions");
387
388        method.add_param(method::Param {
389            name: String::from("prefix"),
390            datatype: method::ParamDataType::String,
391            desc: Some(String::from("API name prefix filter")),
392        });
393
394        hash.insert(name.to_string(), method);
395
396        let name = "opensrf.system.method.all.summary";
397        let mut method = method::MethodDef::new(
398            name,
399            method::ParamCount::Range(0, 1),
400            system_method_introspect,
401        );
402        method.set_desc("Summary list published API definitions");
403
404        method.add_param(method::Param {
405            name: String::from("prefix"),
406            datatype: method::ParamDataType::String,
407            desc: Some(String::from("API name prefix filter")),
408        });
409
410        hash.insert(name.to_string(), method);
411    }
412
413    pub fn listen(&mut self) -> EgResult<()> {
414        self.service_init()?;
415        self.register_methods()?;
416        self.register_routers()?;
417        self.spawn_threads();
418        self.sig_tracker.track_graceful_shutdown();
419        self.sig_tracker.track_fast_shutdown();
420        self.sig_tracker.track_reload();
421
422        let duration = Duration::from_secs(IDLE_WAKE_TIME);
423        let mut log_timer = util::Timer::new(LOG_THREAD_STATS_FREQUENCY);
424
425        loop {
426            // Wait for worker thread state updates
427
428            let mut work_performed = false;
429
430            // Wait up to 'duration' seconds before looping around and
431            // trying again.  This leaves room for other potential
432            // housekeeping between recv calls.
433            //
434            // This will return an Err on timeout or a
435            // failed/disconnected thread.
436            if let Ok(evt) = self.to_parent_rx.recv_timeout(duration) {
437                self.handle_worker_event(&evt);
438                work_performed = true;
439            }
440
441            // Always check for failed threads.
442            work_performed = self.check_failed_threads() || work_performed;
443
444            if self.sig_tracker.any_shutdown_requested() {
445                log::info!("We received a stop signal, exiting");
446                break;
447            }
448
449            if !work_performed {
450                // Only perform idle worker maintenance if no other
451                // tasks were performed during this loop iter.
452                self.perform_idle_worker_maint();
453            }
454
455            self.clean_exited_workers(false);
456            self.log_thread_counts(&mut log_timer);
457        }
458
459        self.unregister_routers()?;
460        self.shutdown();
461
462        Ok(())
463    }
464
465    /// Periodically report our active/idle thread disposition
466    /// so monitoring tools can keep track.
467    ///
468    /// Nothing is logged if all threads are idle.
469    ///
470    /// You can also do things via command line like: $ ps huH p $pid
471    fn log_thread_counts(&self, timer: &mut util::Timer) {
472        if !timer.done() {
473            return;
474        }
475
476        let active_count = self.active_thread_count();
477
478        if active_count < LOG_THREAD_MIN_ACTIVE {
479            return;
480        }
481
482        log::info!(
483            "Service {} max-threads={} active-threads={} idle-threads={} exited-threads={}",
484            self.application.name(),
485            self.max_workers,
486            active_count,
487            self.idle_thread_count(),
488            self.exited_workers.len(),
489        );
490
491        timer.reset();
492    }
493
494    /// Add additional idle workers if needed.
495    ///
496    /// Spawn at most one worker per maintenance cycle.
497    fn perform_idle_worker_maint(&mut self) {
498        let idle_workers = self.idle_thread_count();
499
500        if self.min_idle_workers > 0
501            && self.workers.len() < self.max_workers
502            && idle_workers < self.min_idle_workers
503        {
504            self.spawn_one_thread();
505            log::debug!("Sawned idle worker; idle={idle_workers}");
506        }
507    }
508
509    fn shutdown(&mut self) {
510        let timer = util::Timer::new(SHUTDOWN_MAX_WAIT);
511        let duration = Duration::from_secs(1);
512
513        while !timer.done() && (!self.workers.is_empty() || !self.exited_workers.is_empty()) {
514            let info = format!(
515                "{} shutdown: {} threads; {} active; time remaining {}",
516                self.application.name(),
517                self.workers.len(),
518                self.active_thread_count(),
519                timer.remaining(),
520            );
521
522            // Nod to anyone control-C'ing from the command line.
523            println!("{info}...");
524
525            log::info!("{info}");
526
527            if let Ok(evt) = self.to_parent_rx.recv_timeout(duration) {
528                self.handle_worker_event(&evt);
529            }
530
531            self.check_failed_threads();
532            self.clean_exited_workers(true);
533        }
534
535        // Timer may have completed before all working threads reported
536        // as finished.  Force-kill all of our threads at this point.
537        std::process::exit(0);
538    }
539
540    /// Check for threads that panic!ed and were unable to send any
541    /// worker state info to us.
542    ///
543    /// Returns true if work was done.
544    fn check_failed_threads(&mut self) -> bool {
545        let failed: Vec<u64> = self
546            .workers
547            .iter()
548            .filter(|(_, v)| v.join_handle.is_finished())
549            .map(|(k, _)| *k) // k is a &u64
550            .collect();
551
552        let mut handled = false;
553        for worker_id in failed {
554            handled = true;
555            log::info!("Found a thread that exited ungracefully: {worker_id}");
556            self.remove_thread(&worker_id);
557        }
558
559        handled
560    }
561
562    /// Move the thread into the exited_workers list for future cleanup
563    /// and spwan new threads to fill the gap as needed.
564    fn remove_thread(&mut self, worker_id: &u64) {
565        log::debug!("server: removing thread {}", worker_id);
566
567        // The worker signal may have arrived before its thead fully
568        // exited.  Track the thread as being done so we can go
569        // back and cleanup.
570        if let Some(worker) = self.workers.remove(worker_id) {
571            self.exited_workers.insert(*worker_id, worker);
572        }
573        self.spawn_threads();
574    }
575
576    fn clean_exited_workers(&mut self, block: bool) {
577        let mut keep = HashMap::new();
578
579        log::debug!("server: exited thread count {}", self.exited_workers.len());
580
581        for (worker_id, worker) in self.exited_workers.drain() {
582            if block || worker.join_handle.is_finished() {
583                log::debug!("server: joining worker: {worker_id}");
584
585                let _ = worker
586                    .join_handle
587                    .join()
588                    .inspect_err(|e| log::error!("server: failure joining worker thread: {e:?}"));
589            } else {
590                log::debug!("server: worker {worker_id} has not finished");
591                keep.insert(worker_id, worker);
592            }
593        }
594
595        self.exited_workers = keep;
596    }
597
598    /// Set the state of our thread worker based on the state reported
599    /// to us by the thread.
600    fn handle_worker_event(&mut self, evt: &WorkerStateEvent) {
601        log::trace!("server received WorkerStateEvent: {:?}", evt);
602
603        let worker_id = evt.worker_id();
604
605        let worker: &mut WorkerThread = match self.workers.get_mut(&worker_id) {
606            Some(w) => w,
607            None => {
608                log::error!("No worker found with id {worker_id}");
609                return;
610            }
611        };
612
613        if evt.state() == WorkerState::Exiting {
614            // Worker is done -- remove it and fire up new ones as needed.
615            self.remove_thread(&worker_id);
616        } else {
617            log::trace!("server: updating thread state: {:?}", worker_id);
618            worker.state = evt.state();
619        }
620
621        let idle = self.idle_thread_count();
622        let active = self.active_thread_count();
623
624        log::debug!("server: workers idle={idle} active={active}");
625
626        if self.sig_tracker.any_shutdown_requested() {
627            return;
628        }
629
630        if idle == 0 {
631            if active < self.max_workers {
632                self.spawn_one_thread();
633            } else {
634                log::warn!("server: reached max workers!");
635            }
636        }
637
638        if self.min_idle_workers > IDLE_THREAD_WARN_THRESHOLD && idle < IDLE_THREAD_WARN_THRESHOLD {
639            log::warn!(
640                "server: idle thread count={} is below warning threshold={}",
641                idle,
642                IDLE_THREAD_WARN_THRESHOLD
643            );
644        }
645    }
646
647    fn active_thread_count(&self) -> usize {
648        self.workers
649            .values()
650            .filter(|v| v.state == WorkerState::Active)
651            .count()
652    }
653
654    fn idle_thread_count(&self) -> usize {
655        self.workers
656            .values()
657            .filter(|v| v.state == WorkerState::Idle)
658            .count()
659    }
660}
661
662// Toss our system method handlers down here.
663fn system_method_echo(
664    _worker: &mut Box<dyn app::ApplicationWorker>,
665    session: &mut session::ServerSession,
666    method: message::MethodCall,
667) -> EgResult<()> {
668    let count = method.params().len();
669    for (idx, val) in method.params().iter().enumerate() {
670        if idx == count - 1 {
671            // Package the final response and the COMPLETE message
672            // into the same transport message for consistency
673            // with the Perl code for load testing, etc. comparisons.
674            session.respond_complete(val.clone())?;
675        } else {
676            session.respond(val.clone())?;
677        }
678    }
679    Ok(())
680}
681
682fn system_method_time(
683    _worker: &mut Box<dyn app::ApplicationWorker>,
684    session: &mut session::ServerSession,
685    _method: message::MethodCall,
686) -> EgResult<()> {
687    match SystemTime::now().duration_since(UNIX_EPOCH) {
688        Ok(t) => session.respond_complete(t.as_secs()),
689        Err(e) => Err(format!("System time error: {e}").into()),
690    }
691}
692
693fn system_method_introspect(
694    _worker: &mut Box<dyn app::ApplicationWorker>,
695    session: &mut session::ServerSession,
696    method: message::MethodCall,
697) -> EgResult<()> {
698    let prefix = match method.params().first() {
699        Some(p) => p.as_str(),
700        None => None,
701    };
702
703    // Collect the names first so we can sort them
704    let mut names: Vec<&str> = match prefix {
705        // If a prefix string is provided, only return methods whose
706        // name starts with the provided prefix.
707        Some(pfx) => Server::methods()
708            .keys()
709            .filter(|n| n.starts_with(pfx))
710            .map(|n| n.as_str())
711            .collect(),
712        None => Server::methods().keys().map(|n| n.as_str()).collect(),
713    };
714
715    names.sort();
716
717    for name in names {
718        if let Some(meth) = Server::methods().get(name) {
719            if method.method().contains("summary") {
720                session.respond(meth.to_summary_string())?;
721            } else {
722                session.respond(meth.to_eg_value())?;
723            }
724        }
725    }
726
727    Ok(())
728}