summaryrefslogtreecommitdiff
path: root/include/asterisk/stasis.h
blob: e6d2409cf91611c55781bf85388fbfd821d035f3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
/*
 * Asterisk -- An open source telephony toolkit.
 *
 * Copyright (C) 2013, Digium, Inc.
 *
 * David M. Lee, II <dlee@digium.com>
 *
 * See http://www.asterisk.org for more information about
 * the Asterisk project. Please do not directly contact
 * any of the maintainers of this project for assistance;
 * the project provides a web site, mailing lists and IRC
 * channels for your use.
 *
 * This program is free software, distributed under the terms of
 * the GNU General Public License Version 2. See the LICENSE file
 * at the top of the source tree.
 */

#ifndef _ASTERISK_STASIS_H
#define _ASTERISK_STASIS_H

/*! \file
 *
 * \brief Stasis Message Bus API. See \ref stasis "Stasis Message Bus API" for
 * detailed documentation.
 *
 * \author David M. Lee, II <dlee@digium.com>
 * \since 12
 *
 * \page stasis Stasis Message Bus API
 *
 * \par Intro
 *
 * The Stasis Message Bus is a loosely typed mechanism for distributing messages
 * within Asterisk. It is designed to be:
 *  - Loosely coupled; new message types can be added in seperate modules.
 *  - Easy to use; publishing and subscribing are straightforward operations.
 *  - Consistent memory management; all message bus objects are AO2 managed
 *    objects, using ao2_ref() and ao2_cleanup() to manage the reference
 *    counting.
 *
 * There are three main concepts for using the Stasis Message Bus:
 *  - \ref stasis_message
 *  - \ref stasis_topic
 *  - \ref stasis_subscription
 *
 * \par stasis_message
 *
 * Central to the Stasis Message Bus is the \ref stasis_message, the messages
 * that are sent on the bus. These messages have:
 *  - a type (as defined by a \ref stasis_message_type)
 *  - a value - a \c void pointer to an AO2 object
 *  - a timestamp when it was created
 *
 * Once a \ref stasis_message has been created, it is immutable and cannot
 * change. The same goes for the value of the message (although this cannot be
 * enforced in code). Messages themselves are reference-counted, AO2 objects,
 * along with their values. By being both reference counted and immutable,
 * messages can be shared throughout the system without any concerns for
 * threading. (Well, the objects must be allocated with \ref
 * AO2_ALLOC_OPT_LOCK_MUTEX so that the reference counting operations are thread
 * safe. But other than that, no worries).
 *
 * The type of a message is defined by an instance of \ref stasis_message_type,
 * which can be created by calling stasis_message_type_create(). Message types
 * are named, which is useful in debugging. It is recommended that the string
 * name for a message type match the name of the struct that's stored in the
 * message. For example, name for \ref stasis_cache_update's message type is \c
 * "stasis_cache_update".
 *
 * \par stasis_topic
 *
 * A \ref stasis_topic is an object to which \ref stasis_subscriber's may be
 * subscribed, and \ref stasis_message's may be published. Any message published
 * to the topic is dispatched to all of its subscribers. The topic itself may be
 * named, which is useful in debugging.
 *
 * Topics themselves are reference counted objects, and automagically
 * unsubscribe all of their subscribers when they are destroyed. Topics are also
 * thread safe, so no worries about publishing/subscribing/unsubscribing to a
 * topic concurrently from multiple threads. It's also designed to handle the
 * case of unsubscribing from a topic from within the subscription handler.
 *
 * \par Forwarding
 *
 * There is one special case of topics that's worth noting: forwarding
 * messages. It's a fairly common use case to want to forward all the messages
 * published on one topic to another one (for example, an aggregator topic that
 * publishes all the events from a set of other topics). This can be
 * accomplished easily using stasis_forward_all(). This sets up the forwarding
 * between the two topics, and returns a \ref stasis_subscription, which can be
 * unsubscribed to stop the forwarding.
 *
 * \par Caching
 *
 * Another common use case is to want to cache certain messages that are
 * published on the bus. Usually these events are snapshots of the current state
 * in the system, and it's desirable to query that state from the cache without
 * locking the original object. It's also desirable for subscribers of the
 * caching topic to receive messages that have both the old cache value and the
 * new value being put into the cache. For this, we have
 * stasis_caching_topic_create(), providing it with the topic which publishes
 * the messages that you wish to cache, and a function that can identify
 * cacheable messages.
 *
 * The returned \ref stasis_caching_topic provides a topic that forwards
 * non-cacheable messages unchanged. A cacheable message is wrapped in a \ref
 * stasis_cache_update message which provides the old snapshot (or \c NULL if
 * this is a new cache entry), and the new snapshot (or \c NULL if the entry was
 * removed from the cache). A stasis_cache_clear_create() message must be sent
 * to the topic in order to remove entries from the cache.
 *
 * As with all things Stasis, the \ref stasis_caching_topic is a reference
 * counted AO2 object.
 *
 * \par stasis_subscriber
 *
 * Any topic may be subscribed to by simply providing stasis_subscribe() the
 * \ref stasis_topic to subscribe to, a handler function and \c void pointer to
 * data that is passed back to the handler. Invocations on the subscription's
 * handler are serialized, but different invocations may occur on different
 * threads (this usually isn't important unless you use thread locals or
 * something similar).
 *
 * Since the topic (by necessity) holds a reference to the subscription,
 * reference counting alone is insufficient to terminate a subscription. In
 * order to stop receiving messages, call stasis_unsubscribe() with your \ref
 * stasis_subscription. This will remove the topic's reference to the
 * subscription, and allow it to be destroyed when all of the other references
 * are cleaned up.
 */

#include "asterisk/utils.h"

/*! @{ */

/*!
 * \brief Metadata about a \ref stasis_message.
 * \since 12
 */
struct stasis_message_type;

/*!
 * \brief Register a new message type.
 *
 * \ref stasis_message_type is an AO2 object, so ao2_cleanup() when you're done
 * with it.
 *
 * \param name Name of the new type.
 * \return Pointer to the new type.
 * \return \c NULL on error.
 * \since 12
 */
struct stasis_message_type *stasis_message_type_create(const char *name);

/*!
 * \brief Gets the name of a given message type
 * \param type The type to get.
 * \return Name of the type.
 * \return \c NULL if \a type is \c NULL.
 * \since 12
 */
const char *stasis_message_type_name(const struct stasis_message_type *type);

/*!
 * \brief Opaque type for a Stasis message.
 * \since 12
 */
struct stasis_message;

/*!
 * \brief Create a new message.
 *
 * This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done
 * with it. Messages are also immutable, and must not be modified after they
 * are initialized. Especially the \a data in the message.
 *
 * \param type Type of the message
 * \param data Immutable data that is the actual contents of the message
 * \return New message
 * \return \c NULL on error
 * \since 12
 */
struct stasis_message *stasis_message_create(struct stasis_message_type *type, void *data);

/*!
 * \brief Get the message type for a \ref stasis_message.
 * \param msg Message to type
 * \return Type of \a msg
 * \return \c NULL if \a msg is \c NULL.
 * \since 12
 */
struct stasis_message_type *stasis_message_type(const struct stasis_message *msg);

/*!
 * \brief Get the data contained in a message.
 * \param msg Message.
 * \return Immutable data pointer
 * \return \c NULL if msg is \c NULL.
 * \since 12
 */
void *stasis_message_data(const struct stasis_message *msg);

/*!
 * \brief Get the time when a message was created.
 * \param msg Message.
 * \return Pointer to the \a timeval when the message was created.
 * \return \c NULL if msg is \c NULL.
 * \since 12
 */
const struct timeval *stasis_message_timestamp(const struct stasis_message *msg);

/*! @} */

/*! @{ */

/*!
 * \brief A topic to which messages may be posted, and subscribers, well, subscribe
 * \since 12
 */
struct stasis_topic;

/*!
 * \brief Create a new topic.
 * \param name Name of the new topic.
 * \return New topic instance.
 * \return \c NULL on error.
 * \since 12
 */
struct stasis_topic *stasis_topic_create(const char *name);

/*!
 * \brief Return the name of a topic.
 * \param topic Topic.
 * \return Name of the topic.
 * \return \c NULL if topic is \c NULL.
 * \since 12
 */
const char *stasis_topic_name(const struct stasis_topic *topic);

/*!
 * \brief Publish a message to a topic's subscribers.
 * \param topic Topic.
 * \param message Message to publish.
 * \since 12
 */
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message);

/*!
 * \brief Publish a message from a specified topic to all the subscribers of a
 * possibly different topic.
 * \param topic Topic to publish message to.
 * \param topic Original topic message was from.
 * \param message Message
 * \since 12
 */
void stasis_forward_message(struct stasis_topic *topic,
			    struct stasis_topic *publisher_topic,
			    struct stasis_message *message);

/*! @} */

/*! @{ */

/*!
 * \brief Opaque type for a Stasis subscription.
 * \since 12
 */
struct stasis_subscription;

/*!
 * \brief Callback function type for Stasis subscriptions.
 * \param data Data field provided with subscription.
 * \param topic Topic to which the message was published.
 * \param message Published message.
 * \since 12
 */
typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);

/*!
 * \brief Create a subscription.
 *
 * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
 * up this reference), the subscription must be explicitly unsubscribed from its
 * topic using stasis_unsubscribe().
 *
 * The invocations of the callback are serialized, but may not always occur on
 * the same thread. The invocation order of different subscriptions is
 * unspecified.
 *
 * \param topic Topic to subscribe to.
 * \param callback Callback function for subscription messages.
 * \param data Data to be passed to the callback, in addition to the message.
 * \return New \ref stasis_subscription object.
 * \return \c NULL on error.
 * \since 12
 */
struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
					     stasis_subscription_cb callback,
					     void *data);

/*!
 * \brief Cancel a subscription.
 *
 * Note that in an asynchronous system, there may still be messages queued or
 * in transit to the subscription's callback. These will still be delivered.
 * There will be a final 'SubscriptionCancelled' message, indicating the
 * delivery of the final message.
 *
 * \param subscription Subscription to cancel.
 * \since 12
 */
void stasis_unsubscribe(struct stasis_subscription *subscription);

/*!
 * \brief Create a subscription which forwards all messages from one topic to
 * another.
 *
 * Note that the \a topic parameter of the invoked callback will the be \a topic
 * the message was sent to, not the topic the subscriber subscribed to.
 *
 * \param from_topic Topic to forward.
 * \param to_topic Destination topic of forwarded messages.
 * \return New forwarding subscription.
 * \return \c NULL on error.
 * \since 12
 */
struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic);

/*!
 * \brief Get the unique ID for the subscription.
 *
 * \param sub Subscription for which to get the unique ID.
 * \return Unique ID for the subscription.
 * \since 12
 */
const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub);

/*!
 * \brief Returns whether a subscription is currently subscribed.
 *
 * Note that there may still be messages queued up to be dispatched to this
 * subscription, but the stasis_subscription_final_message() has been enqueued.
 *
 * \param sub Subscription to check
 * \return False (zero) if subscription is not subscribed.
 * \return True (non-zero) if still subscribed.
 */
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub);

/*!
 * \brief Determine whether a message is the final message to be received on a subscription.
 *
 * \param sub Subscription on which the message was received.
 * \param msg Message to check.
 * \return zero if the provided message is not the final message.
 * \return non-zero if the provided message is the final message.
 * \since 12
 */
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg);

/*!
 * \brief Holds details about changes to subscriptions for the specified topic
 * \since 12
 */
struct stasis_subscription_change {
	AST_DECLARE_STRING_FIELDS(
		AST_STRING_FIELD(uniqueid);	/*!< The unique ID associated with this subscription */
		AST_STRING_FIELD(description);	/*!< The description of the change to the subscription associated with the uniqueid */
	);
	struct stasis_topic *topic;		/*!< The topic the subscription is/was subscribing to */
};

/*!
 * \brief Gets the message type for subscription change notices
 * \return The stasis_message_type for subscription change notices
 * \since 12
 */
struct stasis_message_type *stasis_subscription_change(void);

/*! @} */

/*! @{ */

/*!
 * \brief A topic wrapper, which caches certain messages.
 * \since 12
 */
struct stasis_caching_topic;

/*!
 * \brief Message type for cache update messages.
 * \return Message type for cache update messages.
 * \since 12
 */
struct stasis_message_type *stasis_cache_update(void);

/*!
 * \brief Cache update message
 * \since 12
 */
struct stasis_cache_update {
	/*! \brief Topic that published \c new_snapshot */
	struct stasis_topic *topic;
	/*! \brief Convenience reference to snapshot type */
	struct stasis_message_type *type;
	/*! \brief Old value from the cache */
	struct stasis_message *old_snapshot;
	/*! \brief New value */
	struct stasis_message *new_snapshot;
};

/*!
 * \brief A message which instructs the caching topic to remove an entry from its cache.
 * \param type Message type.
 * \param id Unique id of the snapshot to clear.
 * \return Message which, when sent to the \a topic, will clear the item from the cache.
 * \return \c NULL on error.
 * \since 12
 */
struct stasis_message *stasis_cache_clear_create(struct stasis_message_type *type, const char *id);

/*!
 * \brief Callback extract a unique identity from a snapshot message.
 *
 * This identity is unique to the underlying object of the snapshot, such as the
 * UniqueId field of a channel.
 *
 * \param message Message to extract id from.
 * \return String representing the snapshot's id.
 * \return \c NULL if the message_type of the message isn't a handled snapshot.
 * \since 12
 */
typedef const char *(*snapshot_get_id)(struct stasis_message *message);

/*!
 * \brief Create a topic which monitors and caches messages from another topic.
 *
 * The idea is that some topics publish 'snapshots' of some other object's state
 * that should be cached. When these snapshot messages are received, the cache
 * is updated, and a stasis_cache_update() message is forwarded, which has both
 * the original snapshot message and the new message.
 *
 * \param original_topic Topic publishing snapshot messages.
 * \param id_fn Callback to extract the id from a snapshot message.
 * \return New topic which changes snapshot messages to stasis_cache_update()
 *         messages, and forwards all other messages from the original topic.
 * \since 12
 */
struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn);

/*!
 * Unsubscribes a caching topic from its upstream topic.
 * \param caching_topic Caching topic to unsubscribe
 * \since 12
 */
void stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic);

/*!
 * \brief Returns the topic of cached events from a caching topics.
 * \param caching_topic The caching topic.
 * \return The topic that publishes cache update events, along with passthrough events
 *         from the underlying topic.
 * \return \c NULL if \a caching_topic is \c NULL.
 * \since 12
 */
struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic);

/*!
 * \brief Retrieve an item from the cache.
 * \param caching_topic The topic returned from stasis_caching_topic_create().
 * \param type Type of message to retrieve.
 * \param id Identity of the snapshot to retrieve.
 * \return Message from the cache. The cache still owns the message, so
 *         ao2_ref() if you want to keep it.
 * \return \c NULL if message is not found.
 * \since 12
 */
struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_topic,
					struct stasis_message_type *type,
					const char *id);

/*!
 * \brief Dump cached items to a subscription
 * \param caching_topic The topic returned from stasis_caching_topic_create().
 * \param type Type of message to dump (any type if NULL).
 * \return ao2_container containing all matches (must be unreffed by caller)
 * \return NULL on allocation error
 * \since 12
 */
struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic,
					struct stasis_message_type *type);

/*! @} */

/*! @{ */

/*!
 * \brief Initialize the Stasis subsystem
 * \return 0 on success.
 * \return Non-zero on error.
 * \since 12
 */
int stasis_init(void);

/*!
 * \private
 * \brief called by stasis_init() for cache initialization.
 * \return 0 on success.
 * \return Non-zero on error.
 * \since 12
 */
int stasis_cache_init(void);

/*! @} */

#endif /* _ASTERISK_STASIS_H */