evergreen/osrf/
microsvc.rs

1use crate::init;
2use crate::osrf::addr::BusAddress;
3use crate::osrf::app;
4use crate::osrf::client::{Client, ClientSingleton};
5use crate::osrf::conf;
6use crate::osrf::logging::Logger;
7use crate::osrf::message;
8use crate::osrf::message::Message;
9use crate::osrf::message::MessageStatus;
10use crate::osrf::message::MessageType;
11use crate::osrf::message::Payload;
12use crate::osrf::message::TransportMessage;
13use crate::osrf::method;
14use crate::osrf::method::ParamCount;
15use crate::osrf::sclient::HostSettings;
16use crate::osrf::session::ServerSession;
17use crate::util;
18use crate::EgResult;
19use mptc::signals::SignalTracker;
20use std::cell::RefMut;
21use std::collections::HashMap;
22use std::fmt;
23use std::sync::OnceLock;
24use std::thread;
25use std::time::{Duration, SystemTime, UNIX_EPOCH};
26
27static REGISTERED_METHODS: OnceLock<HashMap<String, method::MethodDef>> = OnceLock::new();
28
29// How often each worker wakes to check for shutdown signals, etc.
30const IDLE_WAKE_TIME: u64 = 5;
31
32pub struct Microservice {
33    application: Box<dyn app::Application>,
34
35    /// Watches for signals
36    sig_tracker: SignalTracker,
37
38    /// OpenSRF bus connection
39    client: Client,
40
41    /// True if the caller has requested a stateful conversation.
42    connected: bool,
43
44    /// Currently active session.
45    /// A worker can only have one active session at a time.
46    /// For stateless requests, each new thread results in a new session.
47    /// Starting a new thread/session in a stateful conversation
48    /// results in an error.
49    session: Option<ServerSession>,
50}
51
52impl fmt::Display for Microservice {
53    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
54        write!(f, "Micro")
55    }
56}
57
58impl Microservice {
59    pub fn start(application: Box<dyn app::Application>) -> EgResult<()> {
60        let mut options = init::InitOptions::new();
61        options.appname = Some(application.name().to_string());
62
63        let client = init::osrf_init(&options)?;
64
65        let mut tracker = SignalTracker::new();
66
67        tracker.track_graceful_shutdown();
68        tracker.track_fast_shutdown();
69        tracker.track_reload();
70
71        let mut service = Microservice {
72            application,
73            client,
74            sig_tracker: tracker,
75            connected: false,
76            session: None,
77        };
78
79        let client = service.client.clone();
80
81        service.application.init(client)?;
82
83        service.register_methods()?;
84
85        service.register_routers()?;
86
87        service.listen();
88
89        service.unregister_routers()?;
90
91        Ok(())
92    }
93
94    /// Mark ourselves as currently idle.
95    ///
96    /// This is a NO-OP for now, but may be useful in the future.
97    fn set_idle(&mut self) -> EgResult<()> {
98        Ok(())
99    }
100
101    /// Mark ourselves as currently active.
102    ///
103    /// This is a NO-OP for now, but may be useful in the future.
104    fn set_active(&mut self) -> EgResult<()> {
105        Ok(())
106    }
107
108    fn methods() -> &'static HashMap<String, method::MethodDef> {
109        if let Some(h) = REGISTERED_METHODS.get() {
110            h
111        } else {
112            log::error!("Cannot call methods() prior to registration");
113            panic!("Cannot call methods() prior to registration");
114        }
115    }
116
117    /// Mutable Ref to our under-the-covers client singleton.
118    fn client_internal_mut(&self) -> RefMut<ClientSingleton> {
119        self.client.singleton().borrow_mut()
120    }
121
122    /// Current session
123    ///
124    /// Panics of session on None.
125    fn session(&self) -> &ServerSession {
126        self.session.as_ref().unwrap()
127    }
128
129    fn session_mut(&mut self) -> &mut ServerSession {
130        self.session.as_mut().unwrap()
131    }
132
133    /// Wait for and process inbound API calls.
134    fn listen(&mut self) {
135        let factory = self.application.worker_factory();
136        let mut app_worker = (factory)();
137
138        if let Err(e) = app_worker.worker_start(self.client.clone()) {
139            log::error!("worker_start failed {e}.  Exiting");
140            return;
141        }
142
143        let max_requests: usize = HostSettings::get(&format!(
144            "apps/{}/unix_config/max_requests",
145            self.application.name()
146        ))
147        .expect("Host Settings Not Retrieved")
148        .as_usize()
149        .unwrap_or(5000);
150
151        let keepalive = HostSettings::get(&format!(
152            "apps/{}/unix_config/keepalive",
153            self.application.name()
154        ))
155        .expect("Host Settings Not Retrieved")
156        .as_u64()
157        .unwrap_or(5);
158
159        let mut requests: usize = 0;
160
161        // We listen for API calls at an addressed scoped to our
162        // username and domain.
163        let username = self.client.address().username();
164        let domain = self.client.address().domain();
165
166        let service_addr = BusAddress::for_service(username, domain, self.application.name());
167        let service_addr = service_addr.as_str().to_string();
168
169        let my_addr = self.client.address().as_str().to_string();
170
171        while requests < max_requests {
172            let timeout: u64;
173            let sent_to: &str;
174
175            if self.connected {
176                // We're in the middle of a stateful conversation.
177                // Listen for messages sent specifically to our bus
178                // address and only wait up to keeplive seconds for
179                // subsequent messages.
180                sent_to = &my_addr;
181                timeout = keepalive;
182            } else {
183                // If we are not within a stateful conversation, clear
184                // our bus data and message backlogs since any remaining
185                // data is no longer relevant.
186                if let Err(e) = self.reset() {
187                    log::error!("could not reset {e}.  Exiting");
188                    break;
189                }
190
191                sent_to = &service_addr;
192                timeout = IDLE_WAKE_TIME;
193            }
194
195            // work_occurred will be true if we handled a message or
196            // had to address a stateful session timeout.
197            let (work_occurred, msg_handled) =
198                match self.handle_recv(&mut app_worker, timeout, sent_to) {
199                    Ok(w) => w,
200                    Err(e) => {
201                        log::error!("Error in main loop error: {e}");
202                        break;
203                    }
204                };
205
206            // If we are connected, we remain Active and avoid counting
207            // subsequent requests within this stateful converstation
208            // toward our overall request count.
209            if self.connected {
210                continue;
211            }
212
213            if work_occurred {
214                // also true if msg_handled
215
216                // If we performed any work and we are outside of a
217                // keepalive loop, let our worker know a stateless
218                // request or stateful conversation has just completed.
219                if let Err(e) = app_worker.end_session() {
220                    log::error!("end_session() returned an error: {e}");
221                    break;
222                }
223
224                if self.set_idle().is_err() {
225                    break;
226                }
227
228                if msg_handled {
229                    // Increment our message handled count.
230                    // Each connected session counts as 1 "request".
231                    requests += 1;
232
233                    // An inbound message may have modified our
234                    // thread-scoped locale.  Reset our locale back
235                    // to the default so the previous locale does not
236                    // affect future messages.
237                    message::reset_thread_locale();
238                }
239            } else {
240                // Let the worker know we woke up and nothing interesting
241                // happened.
242                if let Err(e) = app_worker.worker_idle_wake(self.connected) {
243                    log::error!("worker_idle_wake() returned an error: {e}");
244                    break;
245                }
246            }
247
248            // Did we get a shutdown signal?  Check this after
249            // "end_session()" so we don't interrupt a conversation to
250            // shutdown.
251            if self.sig_tracker.any_shutdown_requested() {
252                log::info!("received a stop signal");
253                break;
254            }
255        }
256
257        log::debug!("{self} exiting listen loop and cleaning up");
258
259        if let Err(e) = app_worker.worker_end() {
260            log::error!("worker_end failed {e}");
261        }
262
263        // Clear our worker-specific bus address of any lingering data.
264        self.reset().ok();
265    }
266
267    /// Call recv() on our message bus and process the response.
268    ///
269    /// Return value consists of (work_occurred, msg_handled).
270    fn handle_recv(
271        &mut self,
272        app_worker: &mut Box<dyn app::ApplicationWorker>,
273        timeout: u64,
274        sent_to: &str,
275    ) -> EgResult<(bool, bool)> {
276        let selfstr = format!("{self}");
277
278        let recv_result = self
279            .client_internal_mut()
280            .bus_mut()
281            .recv(timeout, Some(sent_to));
282
283        let msg_op = match recv_result {
284            Ok(o) => o,
285            Err(e) => {
286                // There's a good chance an error in recv() means the
287                // thread/system is unusable, so let the worker exit.
288                //
289                // Avoid a tight thread respawn loop with a short pause.
290                thread::sleep(Duration::from_secs(1));
291                Err(e)?
292            }
293        };
294
295        let tmsg = match msg_op {
296            Some(v) => v,
297            None => {
298                if !self.connected {
299                    // No new message to handle and no timeout to address.
300                    return Ok((false, false));
301                }
302
303                self.set_active()?;
304
305                // Caller failed to send a message within the keepliave interval.
306                log::warn!("{selfstr} timeout waiting on request while connected");
307
308                if let Err(e) = self.reply_with_status(MessageStatus::Timeout, "Timeout") {
309                    Err(format!("server: could not reply with Timeout message: {e}"))?;
310                }
311
312                return Ok((true, false)); // work occurred
313            }
314        };
315
316        self.set_active()?;
317
318        if !self.connected {
319            // Any message received in a non-connected state represents
320            // the start of a session.  For stateful convos, the
321            // current message will be a CONNECT.  Otherwise, it will
322            // be a one-off request.
323            app_worker.start_session()?;
324        }
325
326        if let Err(e) = self.handle_transport_message(tmsg, app_worker) {
327            // An error within our worker's method handler is not enough
328            // to shut down the worker.  Log, force a disconnect on the
329            // session (if applicable) and move on.
330            log::error!("{selfstr} error handling message: {e}");
331            self.connected = false;
332        }
333
334        Ok((true, true)) // work occurred, message handled
335    }
336
337    fn handle_transport_message(
338        &mut self,
339        mut tmsg: message::TransportMessage,
340        app_worker: &mut Box<dyn app::ApplicationWorker>,
341    ) -> EgResult<()> {
342        // Always adopt the log trace of an inbound API call.
343        Logger::set_log_trace(tmsg.osrf_xid());
344
345        if self.session.is_none() || self.session().thread().ne(tmsg.thread()) {
346            log::trace!("server: creating new server session for {}", tmsg.thread());
347
348            self.session = Some(ServerSession::new(
349                self.client.clone(),
350                self.application.name(),
351                tmsg.thread(),
352                0, // thread trace -- updated later as needed
353                BusAddress::parse_str(tmsg.from())?,
354            ));
355        }
356
357        for msg in tmsg.body_mut().drain(..) {
358            self.handle_message(msg, app_worker)?;
359        }
360
361        Ok(())
362    }
363
364    fn handle_message(
365        &mut self,
366        msg: message::Message,
367        app_worker: &mut Box<dyn app::ApplicationWorker>,
368    ) -> EgResult<()> {
369        self.session_mut().set_last_thread_trace(msg.thread_trace());
370        self.session_mut().clear_responded_complete();
371
372        log::trace!("{self} received message of type {:?}", msg.mtype());
373
374        match msg.mtype() {
375            message::MessageType::Disconnect => {
376                log::trace!("{self} received a DISCONNECT");
377                self.reset()?;
378                Ok(())
379            }
380
381            message::MessageType::Connect => {
382                log::trace!("{self} received a CONNECT");
383
384                if self.connected {
385                    return self.reply_bad_request("Worker is already connected");
386                }
387
388                self.connected = true;
389                self.reply_with_status(MessageStatus::Ok, "OK")
390            }
391
392            message::MessageType::Request => {
393                log::trace!("{self} received a REQUEST");
394                self.handle_request(msg, app_worker)
395            }
396
397            _ => self.reply_bad_request("Unexpected message type"),
398        }
399    }
400
401    fn reply_with_status(&mut self, stat: MessageStatus, stat_text: &str) -> EgResult<()> {
402        let tmsg = TransportMessage::with_body(
403            self.session().sender().as_str(),
404            self.client.address().as_str(),
405            self.session().thread(),
406            Message::new(
407                MessageType::Status,
408                self.session().last_thread_trace(),
409                Payload::Status(message::Status::new(stat, stat_text, "osrfStatus")),
410            ),
411        );
412
413        self.client_internal_mut()
414            .get_domain_bus(self.session().sender().domain())?
415            .send(tmsg)
416    }
417
418    fn handle_request(
419        &mut self,
420        mut msg: message::Message,
421        app_worker: &mut Box<dyn app::ApplicationWorker>,
422    ) -> EgResult<()> {
423        let method_call = match msg.take_payload() {
424            message::Payload::Method(m) => m,
425            _ => return self.reply_bad_request("Request sent without a MethoCall payload"),
426        };
427
428        let param_count = method_call.params().len();
429        let api_name = method_call.method().to_string();
430
431        let log_params = util::stringify_params(
432            &api_name,
433            method_call.params(),
434            conf::config().log_protect(),
435        );
436
437        // Log the API call
438        log::info!("CALL: {} {}", api_name, log_params);
439
440        // Before we begin processing a service-level request, clear our
441        // local message bus to avoid encountering any stale messages
442        // lingering from the previous conversation.
443        if !self.connected {
444            self.client.clear()?;
445        }
446
447        // Clone the method since we have mutable borrows below.  Note
448        // this is the method definition, not the param-laden request.
449        let mut method_def = Microservice::methods().get(&api_name).cloned();
450
451        if method_def.is_none() {
452            // Atomic methods are not registered/published in advance
453            // since every method has an atomic variant.
454            // Find the root method and use it.
455            if api_name.ends_with(".atomic") {
456                let meth = api_name.replace(".atomic", "");
457                if let Some(m) = Microservice::methods().get(&meth) {
458                    method_def = Some(m.clone());
459
460                    // Creating a new queue tells our session to treat
461                    // this as an atomic request.
462                    self.session_mut().new_atomic_resp_queue();
463                }
464            }
465        }
466
467        if method_def.is_none() {
468            log::warn!("Method not found: {}", api_name);
469
470            return self.reply_with_status(
471                MessageStatus::MethodNotFound,
472                &format!("Method not found: {}", api_name),
473            );
474        }
475
476        let method_def = method_def.unwrap();
477        let pcount = method_def.param_count();
478
479        // Make sure the number of params sent by the caller matches the
480        // parameter count for the method.
481        if !ParamCount::matches(pcount, param_count as u8) {
482            return self.reply_bad_request(&format!(
483                "Invalid param count sent: method={} sent={} needed={}",
484                api_name, param_count, &pcount,
485            ));
486        }
487
488        // Verify paramter types are correct, at least superficially.
489        // Do this after deserialization.
490        if let Some(param_defs) = method_def.params() {
491            for (idx, param_def) in param_defs.iter().enumerate() {
492                // There may be more param defs than parameters if
493                // some param are optional.
494                if let Some(param_val) = method_call.params().get(idx) {
495                    if idx >= pcount.minimum() as usize && param_val.is_null() {
496                        // NULL placeholders for non-required parameters are
497                        // allowed.
498                        continue;
499                    }
500                    if !param_def.datatype.matches(param_val) {
501                        return self.reply_bad_request(&format!(
502                            "Invalid paramter type: wanted={} got={}",
503                            param_def.datatype,
504                            param_val.clone().dump()
505                        ));
506                    }
507                } else {
508                    // More defs than actual params. Verification complete.
509                    break;
510                }
511            }
512        }
513
514        // Call the API
515        if let Err(err) = (method_def.handler())(app_worker, self.session_mut(), method_call) {
516            let msg = format!("{self} method {api_name} exited: \"{err}\"");
517            log::error!("{msg}");
518            app_worker.api_call_error(&api_name, err);
519            self.reply_server_error(&msg)?;
520            Err(msg)?;
521        }
522
523        if !self.session().responded_complete() {
524            self.session_mut().send_complete()
525        } else {
526            Ok(())
527        }
528    }
529
530    fn reply_server_error(&mut self, text: &str) -> EgResult<()> {
531        self.connected = false;
532
533        let msg = Message::new(
534            MessageType::Status,
535            self.session().last_thread_trace(),
536            Payload::Status(message::Status::new(
537                MessageStatus::InternalServerError,
538                &format!("Internal Server Error: {text}"),
539                "osrfStatus",
540            )),
541        );
542
543        let tmsg = TransportMessage::with_body(
544            self.session().sender().as_str(),
545            self.client.address().as_str(),
546            self.session().thread(),
547            msg,
548        );
549
550        self.client_internal_mut()
551            .get_domain_bus(self.session().sender().domain())?
552            .send(tmsg)
553    }
554
555    fn reply_bad_request(&mut self, text: &str) -> EgResult<()> {
556        self.connected = false;
557
558        let msg = Message::new(
559            MessageType::Status,
560            self.session().last_thread_trace(),
561            Payload::Status(message::Status::new(
562                MessageStatus::BadRequest,
563                &format!("Bad Request: {text}"),
564                "osrfStatus",
565            )),
566        );
567
568        let tmsg = TransportMessage::with_body(
569            self.session().sender().as_str(),
570            self.client.address().as_str(),
571            self.session().thread(),
572            msg,
573        );
574
575        self.client_internal_mut()
576            .get_domain_bus(self.session().sender().domain())?
577            .send(tmsg)
578    }
579
580    fn register_methods(&mut self) -> EgResult<()> {
581        let client = self.client.clone();
582        let list = self.application.register_methods(client)?;
583        let mut hash = HashMap::new();
584        for m in list {
585            hash.insert(m.name().to_string(), m);
586        }
587        self.add_system_methods(&mut hash);
588
589        if REGISTERED_METHODS.set(hash).is_err() {
590            return Err("Cannot call register_methods() more than once".into());
591        }
592
593        Ok(())
594    }
595
596    fn add_system_methods(&mut self, hash: &mut HashMap<String, method::MethodDef>) {
597        let name = "opensrf.system.echo";
598        let mut method = method::MethodDef::new(name, method::ParamCount::Any, system_method_echo);
599        method.set_desc("Echo back any values sent");
600        hash.insert(name.to_string(), method);
601
602        let name = "opensrf.system.time";
603        let mut method = method::MethodDef::new(name, method::ParamCount::Zero, system_method_time);
604        method.set_desc("Respond with system time in epoch seconds");
605        hash.insert(name.to_string(), method);
606
607        let name = "opensrf.system.method.all";
608        let mut method = method::MethodDef::new(
609            name,
610            method::ParamCount::Range(0, 1),
611            system_method_introspect,
612        );
613        method.set_desc("List published API definitions");
614
615        method.add_param(method::Param {
616            name: String::from("prefix"),
617            datatype: method::ParamDataType::String,
618            desc: Some(String::from("API name prefix filter")),
619        });
620
621        hash.insert(name.to_string(), method);
622
623        let name = "opensrf.system.method.all.summary";
624        let mut method = method::MethodDef::new(
625            name,
626            method::ParamCount::Range(0, 1),
627            system_method_introspect,
628        );
629        method.set_desc("Summary list published API definitions");
630
631        method.add_param(method::Param {
632            name: String::from("prefix"),
633            datatype: method::ParamDataType::String,
634            desc: Some(String::from("API name prefix filter")),
635        });
636
637        hash.insert(name.to_string(), method);
638    }
639
640    /// List of domains where our service is allowed to run and
641    /// therefore whose routers with whom our presence should be registered.
642    fn hosting_domains(&self) -> Vec<(String, String)> {
643        let mut domains: Vec<(String, String)> = Vec::new();
644        for router in conf::config().client().routers() {
645            match router.services() {
646                Some(services) => {
647                    if services.iter().any(|s| s.eq(self.application.name())) {
648                        domains.push((router.username().to_string(), router.domain().to_string()));
649                    }
650                }
651                None => {
652                    // A domain with no specific set of hosted services
653                    // hosts all services
654                    domains.push((router.username().to_string(), router.domain().to_string()));
655                }
656            }
657        }
658
659        domains
660    }
661
662    fn register_routers(&mut self) -> EgResult<()> {
663        for (username, domain) in self.hosting_domains().iter() {
664            log::info!("server: registering with router at {domain}");
665
666            self.client.send_router_command(
667                username,
668                domain,
669                "register",
670                Some(self.application.name()),
671            )?;
672        }
673
674        Ok(())
675    }
676
677    fn unregister_routers(&mut self) -> EgResult<()> {
678        for (username, domain) in self.hosting_domains().iter() {
679            log::info!("server: un-registering with router at {domain}");
680
681            self.client.send_router_command(
682                username,
683                domain,
684                "unregister",
685                Some(self.application.name()),
686            )?;
687        }
688        Ok(())
689    }
690
691    // Clear our local message bus and reset state maintenance values.
692    fn reset(&mut self) -> EgResult<()> {
693        self.connected = false;
694        self.session = None;
695        self.client.clear()
696    }
697}
698
699// Toss our system method handlers down here.
700fn system_method_echo(
701    _worker: &mut Box<dyn app::ApplicationWorker>,
702    session: &mut ServerSession,
703    method: message::MethodCall,
704) -> EgResult<()> {
705    let count = method.params().len();
706    for (idx, val) in method.params().iter().enumerate() {
707        if idx == count - 1 {
708            // Package the final response and the COMPLETE message
709            // into the same transport message for consistency
710            // with the Perl code for load testing, etc. comparisons.
711            session.respond_complete(val.clone())?;
712        } else {
713            session.respond(val.clone())?;
714        }
715    }
716    Ok(())
717}
718
719fn system_method_time(
720    _worker: &mut Box<dyn app::ApplicationWorker>,
721    session: &mut ServerSession,
722    _method: message::MethodCall,
723) -> EgResult<()> {
724    match SystemTime::now().duration_since(UNIX_EPOCH) {
725        Ok(t) => session.respond_complete(t.as_secs()),
726        Err(e) => Err(format!("System time error: {e}").into()),
727    }
728}
729
730fn system_method_introspect(
731    _worker: &mut Box<dyn app::ApplicationWorker>,
732    session: &mut ServerSession,
733    method: message::MethodCall,
734) -> EgResult<()> {
735    let prefix = match method.params().first() {
736        Some(p) => p.as_str(),
737        None => None,
738    };
739
740    // Collect the names first so we can sort them
741    let mut names: Vec<&str> = match prefix {
742        // If a prefix string is provided, only return methods whose
743        // name starts with the provided prefix.
744        Some(pfx) => Microservice::methods()
745            .keys()
746            .filter(|n| n.starts_with(pfx))
747            .map(|n| n.as_str())
748            .collect(),
749        None => Microservice::methods().keys().map(|n| n.as_str()).collect(),
750    };
751
752    names.sort();
753
754    for name in names {
755        if let Some(meth) = Microservice::methods().get(name) {
756            if method.method().contains("summary") {
757                session.respond(meth.to_summary_string())?;
758            } else {
759                session.respond(meth.to_eg_value())?;
760            }
761        }
762    }
763
764    Ok(())
765}