WvStreams
wvdbusconn.cc
1 /* -*- Mode: C++ -*-
2  * Worldvisions Weaver Software:
3  * Copyright (C) 2004-2006 Net Integration Technologies, Inc.
4  *
5  * Pathfinder Software:
6  * Copyright (C) 2007, Carillon Information Security Inc.
7  *
8  * This library is licensed under the LGPL, please read LICENSE for details.
9  *
10  */
11 #include "wvdbusconn.h"
12 #include "wvmoniker.h"
13 #include "wvstrutils.h"
14 #undef interface // windows
15 #include <dbus/dbus.h>
16 
17 
18 static WvString translate(WvStringParm dbus_moniker)
19 {
20  WvStringList l;
21  WvStringList::Iter i(l);
22 
23  if (!strncasecmp(dbus_moniker, "unix:", 5))
24  {
25  WvString path, tmpdir;
26  l.split(dbus_moniker+5, ",");
27  for (i.rewind(); i.next(); )
28  {
29  if (!strncasecmp(*i, "path=", 5))
30  path = *i + 5;
31  else if (!strncasecmp(*i, "abstract=", 9))
32  path = WvString("@%s", *i + 9);
33  else if (!strncasecmp(*i, "tmpdir=", 7))
34  tmpdir = *i + 7;
35  }
36  if (!!path)
37  return WvString("unix:%s", path);
38  else if (!!tmpdir)
39  return WvString("unix:%s/dbus.sock", tmpdir);
40  }
41  else if (!strncasecmp(dbus_moniker, "tcp:", 4))
42  {
43  WvString host, port, family;
44  l.split(dbus_moniker+4, ",");
45  for (i.rewind(); i.next(); )
46  {
47  if (!strncasecmp(*i, "family=", 7))
48  family = *i + 7;
49  else if (!strncasecmp(*i, "host=", 5))
50  host = *i + 5;
51  else if (!strncasecmp(*i, "port=", 5))
52  port = *i + 5;
53  }
54  if (!!host && !!port)
55  return WvString("tcp:%s:%s", host, port);
56  else if (!!host)
57  return WvString("tcp:%s", host);
58  else if (!!port)
59  return WvString("tcp:0.0.0.0:%s", port); // localhost
60  }
61 
62  return dbus_moniker; // unrecognized
63 }
64 
65 
66 static IWvStream *stream_creator(WvStringParm _s, IObject *)
67 {
68  WvString s(_s);
69 
70  if (!strcasecmp(s, "starter"))
71  {
72  WvString startbus(getenv("DBUS_STARTER_ADDRESS"));
73  if (!!startbus)
74  return IWvStream::create(translate(startbus));
75  else
76  {
77  WvString starttype(getenv("DBUS_STARTER_BUS_TYPE"));
78  if (!!starttype && !strcasecmp(starttype, "system"))
79  s = "system";
80  else if (!!starttype && !strcasecmp(starttype, "session"))
81  s = "session";
82  }
83  }
84 
85  if (!strcasecmp(s, "system"))
86  {
87  // NOTE: the environment variable for the address of the system
88  // bus is very often not set-- in that case, look in your dbus
89  // system bus config file (e.g. /etc/dbus-1/system.conf) for the
90  // raw address and either set this environment variable to that, or
91  // pass in the address directly
92  WvString bus(getenv("DBUS_SYSTEM_BUS_ADDRESS"));
93  if (!!bus)
94  return IWvStream::create(translate(bus));
95  }
96 
97  if (!strcasecmp(s, "session"))
98  {
99  WvString bus(getenv("DBUS_SESSION_BUS_ADDRESS"));
100  if (!!bus)
101  return IWvStream::create(translate(bus));
102  }
103 
104  return IWvStream::create(translate(s));
105 }
106 
107 static WvMoniker<IWvStream> reg("dbus", stream_creator);
108 
109 
110 static int conncount;
111 
112 WvDBusConn::WvDBusConn(IWvStream *_cloned, IWvDBusAuth *_auth, bool _client)
113  : WvStreamClone(_cloned),
114  log(WvString("DBus %s%s",
115  _client ? "" : "s",
116  ++conncount), WvLog::Debug5),
117  pending(10)
118 {
119  init(_auth, _client);
120 }
121 
122 
123 WvDBusConn::WvDBusConn(WvStringParm moniker, IWvDBusAuth *_auth, bool _client)
124  : WvStreamClone(IWvStream::create(moniker)),
125  log(WvString("DBus %s%s",
126  _client ? "" : "s",
127  ++conncount), WvLog::Debug5),
128  pending(10)
129 {
130  log("Connecting to '%s'\n", moniker);
131  init(_auth, _client);
132 }
133 
134 
135 void WvDBusConn::init(IWvDBusAuth *_auth, bool _client)
136 {
137  log("Initializing.\n");
138  client = _client;
139  auth = _auth ? _auth : new WvDBusClientAuth;
140  authorized = in_post_select = false;
141  if (!client) set_uniquename(WvString(":%s.0", conncount));
142 
143  if (!isok()) return;
144 
145  delay_output(true);
146 
147  // this will get enqueued until later, but we want to make sure it
148  // comes before anything the user tries to send - including anything
149  // goofy they enqueue in the authorization part.
150  if (client)
151  send_hello();
152 
153  try_auth();
154 }
155 
157 {
158  log("Shutting down.\n");
159  if (geterr())
160  log("Error was: %s\n", errstr());
161 
162  close();
163 
164  delete auth;
165 }
166 
167 
169 {
170  if (!closed)
171  log("Closing.\n");
173 }
174 
175 
177 {
178  return _uniquename;
179 }
180 
181 
182 void WvDBusConn::request_name(WvStringParm name, const WvDBusCallback &onreply,
183  time_t msec_timeout)
184 {
185  uint32_t flags = (DBUS_NAME_FLAG_ALLOW_REPLACEMENT |
186  DBUS_NAME_FLAG_REPLACE_EXISTING);
187  WvDBusMsg msg("org.freedesktop.DBus", "/org/freedesktop/DBus",
188  "org.freedesktop.DBus", "RequestName");
189  msg.append(name).append(flags);
190  send(msg, onreply, msec_timeout);
191 }
192 
193 
195 {
196  msg.marshal(out_queue);
197  if (authorized)
198  {
199  log(" >> %s\n", msg);
200  write(out_queue);
201  }
202  else
203  log(" .> %s\n", msg);
204  return msg.get_serial();
205 }
206 
207 
208 void WvDBusConn::send(WvDBusMsg msg, const WvDBusCallback &onreply,
209  time_t msec_timeout)
210 {
211  send(msg);
212  if (onreply)
213  add_pending(msg, onreply, msec_timeout);
214 }
215 
216 
218 {
219 public:
220  WvDBusMsg *reply;
221 
222  xxReplyWaiter()
223  { reply = NULL; }
224  ~xxReplyWaiter()
225  { delete reply; }
226  bool reply_wait(WvDBusMsg &msg)
227  { reply = new WvDBusMsg(msg); return true; }
228 };
229 
230 
232  wv::function<void(uint32_t)> serial_cb)
233 {
234  xxReplyWaiter rw;
235 
236  send(msg, wv::bind(&xxReplyWaiter::reply_wait, &rw, _1),
237  msec_timeout);
238  if (serial_cb)
239  serial_cb(msg.get_serial());
240  while (!rw.reply && isok())
241  runonce();
242  if (!rw.reply)
243  return WvDBusError(msg, DBUS_ERROR_FAILED,
244  WvString("Connection closed (%s) "
245  "while waiting for reply.",
246  errstr()));
247  else
248  return *rw.reply;
249 }
250 
251 
252 void WvDBusConn::out(WvStringParm s)
253 {
254  log(" >> %s", s);
255  print(s);
256 }
257 
258 
259 const char *WvDBusConn::in()
260 {
261  const char *s = trim_string(getline(0));
262  if (s)
263  log("<< %s\n", s);
264  return s;
265 }
266 
267 
268 void WvDBusConn::send_hello()
269 {
270  WvDBusMsg msg("org.freedesktop.DBus", "/org/freedesktop/DBus",
271  "org.freedesktop.DBus", "Hello");
272  send(msg, wv::bind(&WvDBusConn::_registered, this, _1));
273  WvDBusMsg msg2("org.freedesktop.DBus", "/org/freedesktop/DBus",
274  "org.freedesktop.DBus", "AddMatch");
275  msg2.append("type='signal'");
276  send(msg2); // don't need to monitor this for completion
277 }
278 
279 
280 void WvDBusConn::set_uniquename(WvStringParm s)
281 {
282  // we want to print the message before switching log.app, so that we
283  // can trace which log.app turned into which
284  log("Assigned name '%s'\n", s);
285  _uniquename = s;
286  log.app = WvString("DBus %s%s", client ? "" : "s", uniquename());
287 }
288 
289 
290 void WvDBusConn::try_auth()
291 {
292  bool done = auth->authorize(*this);
293  if (done)
294  {
295  // ready to send messages!
296  if (out_queue.used())
297  {
298  log(" >> (sending enqueued messages)\n");
299  write(out_queue);
300  }
301 
302  authorized = true;
303  }
304 }
305 
306 
307 void WvDBusConn::add_callback(CallbackPri pri, WvDBusCallback cb, void *cookie)
308 {
309  callbacks.append(new CallbackInfo(pri, cb, cookie), true);
310 }
311 
312 
313 void WvDBusConn::del_callback(void *cookie)
314 {
315  // remember, there might be more than one callback with the same cookie.
316  CallbackInfoList::Iter i(callbacks);
317  for (i.rewind(); i.next(); )
318  if (i->cookie == cookie)
319  i.xunlink();
320 }
321 
322 
323 int WvDBusConn::priority_order(const CallbackInfo *a, const CallbackInfo *b)
324 {
325  return a->pri - b->pri;
326 }
327 
329 {
330  log("<< %s\n", msg);
331 
332  // handle replies
333  uint32_t rserial = msg.get_replyserial();
334  if (rserial)
335  {
336  Pending *p = pending[rserial];
337  if (p)
338  {
339  p->cb(msg);
340  pending.remove(p);
341  return true; // handled it
342  }
343  }
344 
345  // handle all the generic filters
346  CallbackInfoList::Sorter i(callbacks, priority_order);
347  for (i.rewind(); i.next(); )
348  {
349  bool handled = i->cb(msg);
350  if (handled) return true;
351  }
352 
353  return false; // couldn't handle the message, sorry
354 }
355 
356 
357 WvDBusClientAuth::WvDBusClientAuth()
358 {
359  sent_request = false;
360 }
361 
362 
363 wvuid_t WvDBusClientAuth::get_uid()
364 {
365  return wvgetuid();
366 }
367 
368 
370 {
371  if (!sent_request)
372  {
373  c.write("\0", 1);
374  WvString uid = get_uid();
375  c.out("AUTH EXTERNAL %s\r\n\0", WvHexEncoder().strflushstr(uid));
376  sent_request = true;
377  }
378  else
379  {
380  const char *line = c.in();
381  if (line)
382  {
383  if (!strncasecmp(line, "OK ", 3))
384  {
385  c.out("BEGIN\r\n");
386  return true;
387  }
388  else if (!strncasecmp(line, "ERROR ", 6))
389  c.seterr("Auth failed: %s", line);
390  else
391  c.seterr("Unknown AUTH response: '%s'", line);
392  }
393  }
394 
395  return false;
396 }
397 
398 
399 time_t WvDBusConn::mintimeout_msec()
400 {
401  WvTime when = 0;
402  PendingDict::Iter i(pending);
403  for (i.rewind(); i.next(); )
404  {
405  if (!when || when > i->valid_until)
406  when = i->valid_until;
407  }
408  if (!when)
409  return -1;
410  else if (when <= wvstime())
411  return 0;
412  else
413  return msecdiff(when, wvstime());
414 }
415 
416 
417 bool WvDBusConn::post_select(SelectInfo &si)
418 {
419  bool ready = WvStreamClone::post_select(si);
420  if (si.inherit_request) return ready;
421 
422  if (in_post_select) return false;
423  in_post_select = true;
424 
425  if (!authorized && ready)
426  try_auth();
427 
428  if (!alarm_remaining())
429  {
430  WvTime now = wvstime();
431  PendingDict::Iter i(pending);
432  for (i.rewind(); i.next(); )
433  {
434  if (now > i->valid_until)
435  {
436  log("Expiring %s\n", i->msg);
437  expire_pending(i.ptr());
438  i.rewind();
439  }
440  }
441  }
442 
443  if (authorized && ready)
444  {
445  // put this in a loop so that wvdbusd can forward packets rapidly.
446  // Otherwise TCP_NODELAY kicks in, because we do a select() loop
447  // between packets, which causes delay_output() to flush.
448  bool ran;
449  do
450  {
451  ran = false;
452  size_t needed = WvDBusMsg::demarshal_bytes_needed(in_queue);
453  size_t amt = needed - in_queue.used();
454  if (amt < 4096)
455  amt = 4096;
456  read(in_queue, amt);
457  WvDBusMsg *m;
458  while ((m = WvDBusMsg::demarshal(in_queue)) != NULL)
459  {
460  ran = true;
461  filter_func(*m);
462  delete m;
463  }
464  } while (ran);
465  }
466 
467  alarm(mintimeout_msec());
468  in_post_select = false;
469  return false;
470 }
471 
472 
474 {
475  return !out_queue.used() && pending.isempty();
476 }
477 
478 
479 void WvDBusConn::expire_pending(Pending *p)
480 {
481  if (p)
482  {
483  WvDBusCallback xcb(p->cb);
484  pending.remove(p); // prevent accidental recursion
485  WvDBusError e(p->msg, DBUS_ERROR_FAILED,
486  "Timed out while waiting for reply");
487  xcb(e);
488  }
489 }
490 
491 
492 void WvDBusConn::cancel_pending(uint32_t serial)
493 {
494  Pending *p = pending[serial];
495  if (p)
496  {
497  WvDBusCallback xcb(p->cb);
498  WvDBusMsg msg(p->msg);
499  pending.remove(p); // prevent accidental recursion
500  WvDBusError e(msg, DBUS_ERROR_FAILED,
501  "Canceled while waiting for reply");
502  xcb(e);
503  }
504 }
505 
506 
507 void WvDBusConn::add_pending(WvDBusMsg &msg, WvDBusCallback cb,
508  time_t msec_timeout)
509 {
510  uint32_t serial = msg.get_serial();
511  assert(serial);
512  if (pending[serial])
513  cancel_pending(serial);
514  pending.add(new Pending(msg, cb, msec_timeout), true);
515  alarm(mintimeout_msec());
516 }
517 
518 
519 bool WvDBusConn::_registered(WvDBusMsg &msg)
520 {
521  WvDBusMsg::Iter i(msg);
522  _uniquename = i.getnext().get_str();
523  set_uniquename(_uniquename);
524  return true;
525 }
526 
A WvFastString acts exactly like a WvString, but can take (const char *) strings without needing to a...
Definition: wvstring.h:93
The basic interface which is included by all other XPLC interfaces and objects.
Definition: IObject.h:65
static size_t demarshal_bytes_needed(WvBuf &buf)
Given a buffer containing what might be the header of a DBus message, checks how many bytes need to b...
void split(WvStringParm s, const char *splitchars=" \\, int limit=0)
split s and form a list ignoring splitchars (except at beginning and end) ie.
Definition: wvstringlist.cc:19
void runonce(time_t msec_timeout=-1)
Exactly the same as: if (select(timeout)) callback();.
Definition: wvstream.h:391
WvString get_str() const
Get the current element as a string (possible for all types).
Definition: wvdbusmsg.cc:120
virtual int geterr() const
If isok() is false, return the system error number corresponding to the error, -1 for a special error...
virtual void close()
Close the underlying stream.
Definition: wvdbusconn.cc:168
virtual bool isok() const
return true if the stream is actually usable right now
bool isidle()
Returns true if there are no outstanding messages that have not received (or timed out) their reply...
Definition: wvdbusconn.cc:473
Based on (and interchangeable with) struct timeval.
Definition: wvtimeutils.h:17
WvDBusMsg send_and_wait(WvDBusMsg msg, time_t msec_timeout=WVDBUS_DEFAULT_TIMEOUT, wv::function< void(uint32_t)> serial_cb=0)
Send a message on the bus and wait for a reply to come in, returning the message when it does...
Definition: wvdbusconn.cc:231
virtual size_t write(const void *buf, size_t count)
Write data to the stream.
Definition: wvstream.cc:532
virtual ~WvDBusConn()
Release this connection.
Definition: wvdbusconn.cc:156
WvDBusConn(WvStringParm moniker, IWvDBusAuth *_auth=NULL, bool _client=true)
Creates a new dbus connection using the given WvStreams moniker.
Definition: wvdbusconn.cc:123
void marshal(WvBuf &buf)
Locks this message, encodes it in DBus binary protocol format, and adds it to the given buffer...
A type-safe version of WvMonikerBase that lets you provide create functions for object types other th...
Definition: wvmoniker.h:61
void add_callback(CallbackPri pri, WvDBusCallback cb, void *cookie=NULL)
Adds a callback to the connection: all received messages will be sent to all callbacks to look at and...
Definition: wvdbusconn.cc:307
void delay_output(bool is_delayed)
force write() to always buffer output.
Definition: wvstream.h:246
uint32_t send(WvDBusMsg msg)
Send a message on the bus, not expecting any reply.
Definition: wvdbusconn.cc:194
virtual bool filter_func(WvDBusMsg &msg)
Called by for each received message.
Definition: wvdbusconn.cc:328
WvDBusMsg & append(const char *s)
The following methods are designed to allow appending various arguments to the message.
Definition: wvdbusmsg.cc:461
Iter & getnext()
Same as next(), but returns *this instead so you can convert the new item to the right value type...
Definition: wvdbusmsg.h:222
virtual bool post_select(SelectInfo &si)
post_select() is called after ::select(), and returns true if this object is now ready.
virtual bool authorize(WvDBusConn &c)
Main action callback.
Definition: wvdbusconn.cc:369
This is a WvList of WvStrings, and is a really handy way to parse strings.
Definition: wvstringlist.h:27
char * getline(time_t wait_msec=0, char separator='\n', int readahead=1024)
Read up to one line of data from the stream and return a pointer to the internal buffer containing th...
Definition: wvstream.h:175
virtual void close()
Close this stream.
CallbackPri
The priority level of a callback registration.
Definition: wvdbusconn.h:170
WvString uniquename() const
Return this connection&#39;s unique name on the bus, assigned by the server at connect time...
Definition: wvdbusconn.cc:176
virtual void seterr(int _errnum)
Override seterr() from WvError so that it auto-closes the stream.
Definition: wvstream.cc:451
A hex encoder.
Definition: wvhex.h:21
Various little string functions.
static WvDBusMsg * demarshal(WvBuf &buf)
Demarshals a new WvDBusMsg from a buffer containing its binary DBus protocol representation.
char * trim_string(char *string)
Trims whitespace from the beginning and end of the character string, including carriage return / line...
Definition: strutils.cc:59
void del_callback(void *cookie)
Delete all callbacks that have the given cookie.
Definition: wvdbusconn.cc:313
WvString is an implementation of a simple and efficient printable-string class.
Definition: wvstring.h:329
time_t alarm_remaining()
return the number of milliseconds remaining before the alarm will go off; -1 means no alarm is set (i...
Definition: wvstream.cc:1058
void alarm(time_t msec_timeout)
set an alarm, ie.
Definition: wvstream.cc:1049
virtual size_t read(void *buf, size_t count)
read a data block on the stream.
Definition: wvstream.cc:490
A WvLog stream accepts log messages from applications and forwards them to all registered WvLogRcv&#39;s...
Definition: wvlog.h:56
WvStreamClone simply forwards all requests to the "cloned" stream.
Definition: wvstreamclone.h:23
void request_name(WvStringParm name, const WvDBusCallback &onreply=0, time_t msec_timeout=WVDBUS_DEFAULT_TIMEOUT)
Request the given service name on DBus.
Definition: wvdbusconn.cc:182