1use crate::osrf::addr::BusAddress;
2use crate::osrf::bus;
3use crate::osrf::conf;
4use crate::osrf::message;
5use crate::osrf::params::ApiParams;
6use crate::osrf::session::ClientSession;
7use crate::osrf::session::ResponseIterator;
8use crate::util;
9use crate::{EgResult, EgValue};
10use log::info;
11use std::cell::RefCell;
12use std::collections::HashMap;
13use std::fmt;
14use std::rc::Rc;
15
16pub struct ClientSingleton {
21 bus: Option<bus::Bus>,
24
25 domain: String,
27
28 remote_bus_map: HashMap<String, bus::Bus>,
30
31 backlog: Vec<message::TransportMessage>,
34}
35
36impl ClientSingleton {
37 fn new() -> EgResult<ClientSingleton> {
38 let bus = bus::Bus::new(conf::config().client())?;
39 Ok(ClientSingleton::from_bus(bus))
40 }
41
42 fn from_bus(bus: bus::Bus) -> ClientSingleton {
44 let domain = conf::config().client().domain().name();
45
46 ClientSingleton {
47 domain: domain.to_string(),
48 bus: Some(bus),
49 backlog: Vec::new(),
50 remote_bus_map: HashMap::new(),
51 }
52 }
53
54 fn clear_backlog(&mut self) {
57 self.backlog.clear();
58 }
59
60 fn address(&self) -> &str {
62 self.bus().address().as_str()
63 }
64
65 fn domain(&self) -> &str {
67 &self.domain
68 }
69
70 pub fn bus(&self) -> &bus::Bus {
74 match self.bus.as_ref() {
75 Some(b) => b,
76 None => panic!("Client has no Bus connection!"),
77 }
78 }
79
80 pub fn bus_mut(&mut self) -> &mut bus::Bus {
84 match self.bus.as_mut() {
85 Some(b) => b,
86 None => panic!("Client has no Bus connection!"),
87 }
88 }
89
90 pub fn take_bus(&mut self) -> bus::Bus {
98 match self.bus.take() {
99 Some(b) => b,
100 None => panic!("Client has to Bus connection!"),
101 }
102 }
103
104 pub fn set_bus(&mut self, bus: bus::Bus) {
106 self.bus = Some(bus);
107 }
108
109 pub fn get_domain_bus(&mut self, domain: &str) -> EgResult<&mut bus::Bus> {
110 log::trace!("Loading bus connection for domain: {domain}");
111
112 if domain.eq(self.domain()) {
113 Ok(self.bus_mut())
114 } else {
115 if self.remote_bus_map.contains_key(domain) {
116 return Ok(self.remote_bus_map.get_mut(domain).unwrap());
117 }
118
119 self.add_connection(domain)
120 }
121 }
122
123 fn add_connection(&mut self, domain: &str) -> EgResult<&mut bus::Bus> {
127 let mut conf = conf::config().client().clone();
130
131 conf.set_domain(domain);
132
133 let bus = bus::Bus::new(&conf)?;
134
135 info!("Opened connection to new domain: {}", domain);
136
137 self.remote_bus_map.insert(domain.to_string(), bus);
138 self.get_domain_bus(domain)
139 }
140
141 fn recv_session_from_backlog(&mut self, thread: &str) -> Option<message::TransportMessage> {
144 if let Some(index) = self.backlog.iter().position(|tm| tm.thread() == thread) {
145 Some(self.backlog.remove(index))
146 } else {
147 None
148 }
149 }
150
151 pub fn wait(&mut self, timeout: u64) -> EgResult<bool> {
158 if !self.backlog.is_empty() {
159 return Ok(true);
160 }
161
162 let timer = util::Timer::new(timeout);
163
164 while self.backlog.is_empty() && !timer.done() {
165 if let Some(tm) = self.bus_mut().recv(timer.remaining(), None)? {
166 self.backlog.push(tm);
167 break;
168 }
169 }
170
171 Ok(!self.backlog.is_empty())
172 }
173
174 pub fn recv_session(
176 &mut self,
177 timer: &mut util::Timer,
178 thread: &str,
179 ) -> EgResult<Option<message::TransportMessage>> {
180 loop {
181 if let Some(tm) = self.recv_session_from_backlog(thread) {
182 return Ok(Some(tm));
183 }
184
185 if timer.done() {
186 return Ok(None);
188 }
189
190 if let Some(tm) = self.bus_mut().recv(timer.remaining(), None)? {
193 self.backlog.push(tm);
194 }
195
196 }
199 }
200
201 fn send_router_command(
204 &mut self,
205 username: &str,
206 domain: &str,
207 router_command: &str,
208 router_class: Option<&str>,
209 ) -> EgResult<()> {
210 let addr = BusAddress::for_router(username, domain);
211
212 let mut tmsg = message::TransportMessage::new(
214 addr.as_str(),
215 self.bus().address().as_str(),
216 &util::random_number(16),
217 );
218
219 tmsg.set_router_command(router_command);
220 if let Some(rc) = router_class {
221 tmsg.set_router_class(rc);
222 }
223
224 let bus = self.get_domain_bus(domain)?;
225 bus.send(tmsg)?;
226
227 Ok(())
228 }
229}
230
231impl fmt::Display for ClientSingleton {
232 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
233 write!(f, "ClientSingleton({})", self.address())
234 }
235}
236
237#[derive(Clone)]
246pub struct Client {
247 singleton: Rc<RefCell<ClientSingleton>>,
248 address: BusAddress,
249 domain: String,
250}
251
252impl Client {
253 pub fn connect() -> EgResult<Client> {
260 let singleton = ClientSingleton::new()?;
262
263 let address = singleton.bus().address().clone();
264 let domain = singleton.domain().to_string();
265
266 Ok(Client {
267 address,
268 domain,
269 singleton: Rc::new(RefCell::new(singleton)),
270 })
271 }
272
273 pub fn from_bus(bus: bus::Bus) -> Client {
277 let singleton = ClientSingleton::from_bus(bus);
279
280 let address = singleton.bus().address().clone();
281 let domain = singleton.domain().to_string();
282
283 Client {
284 address,
285 domain,
286 singleton: Rc::new(RefCell::new(singleton)),
287 }
288 }
289
290 pub fn take_bus(&self) -> bus::Bus {
294 self.singleton.borrow_mut().take_bus()
295 }
296
297 pub fn set_bus(&self, bus: bus::Bus) {
301 self.singleton.borrow_mut().set_bus(bus);
302 }
303
304 pub fn singleton(&self) -> &Rc<RefCell<ClientSingleton>> {
305 &self.singleton
306 }
307
308 pub fn address(&self) -> &BusAddress {
309 &self.address
310 }
311
312 pub fn domain(&self) -> &str {
313 &self.domain
314 }
315
316 pub fn session(&self, service: &str) -> ClientSession {
318 ClientSession::new(self.clone(), service)
319 }
320
321 pub fn clear(&self) -> EgResult<()> {
324 self.singleton().borrow_mut().clear_backlog();
325 self.singleton().borrow_mut().bus_mut().clear_bus()
326 }
327
328 pub fn send_router_command(
330 &self,
331 username: &str,
332 domain: &str,
333 command: &str,
334 router_class: Option<&str>,
335 ) -> EgResult<()> {
336 self.singleton()
337 .borrow_mut()
338 .send_router_command(username, domain, command, router_class)
339 }
340
341 pub fn send_recv_iter(
346 &self,
347 service: &str,
348 method: &str,
349 params: impl Into<ApiParams>,
350 ) -> EgResult<ResponseIterator> {
351 Ok(ResponseIterator::new(
352 self.session(service).request(method, params)?,
353 ))
354 }
355
356 pub fn wait(&self, timeout: u64) -> EgResult<bool> {
358 self.singleton().borrow_mut().wait(timeout)
359 }
360
361 pub fn send_recv_one(
367 &self,
368 service: &str,
369 method: &str,
370 params: impl Into<ApiParams>,
371 ) -> EgResult<Option<EgValue>> {
372 let mut ses = self.session(service);
373 let mut req = ses.request(method, params)?;
374
375 req.first()
376 }
377}