diff options
author | Matthew Jordan <mjordan@digium.com> | 2014-12-01 17:57:12 +0000 |
---|---|---|
committer | Matthew Jordan <mjordan@digium.com> | 2014-12-01 17:57:12 +0000 |
commit | d79c68d3fb3b27d9632ec91b8a0fc572bf833f4b (patch) | |
tree | 340b3f68f987c94cdf27a7e353d1c226618ac9e2 /tests | |
parent | 6e30ccd242e9316d2de20086724a6f5d340115cb (diff) |
main/stasis: Allow subscriptions to use a threadpool for message delivery
Prior to this patch, all Stasis subscriptions would receive a dedicated
thread for servicing published messages. In contrast, prior to r400178
(see review https://reviewboard.asterisk.org/r/2881/), the subscriptions
shared a thread pool. It was discovered during some initial work on Stasis
that, for a low subscription count with high message throughput, the
threadpool was not as performant as simply having a dedicated thread per
subscriber.
For situations where a subscriber receives a substantial number of messages
and is always present, the model of having a dedicated thread per subscriber
makes sense. While we still have plenty of subscriptions that would follow
this model, e.g., AMI, CDRs, CEL, etc., there are plenty that also fall into
the following two categories:
* Large number of subscriptions, specifically those tied to endpoints/peers.
* Low number of messages. Some subscriptions exist specifically to coordinate
a single message - the subscription is created, a message is published, the
delivery is synchronized, and the subscription is destroyed.
In both of the latter two cases, creating a dedicated thread is wasteful (and
in the case of a large number of peers/endpoints, harmful). In those cases,
having shared delivery threads is far more performant.
This patch adds the ability of a subscriber to Stasis to choose whether or not
their messages are dispatched on a dedicated thread or on a threadpool. The
threadpool is configurable through stasis.conf.
Review: https://reviewboard.asterisk.org/r/4193
ASTERISK-24533 #close
Reported by: xrobau
Tested by: xrobau
........
Merged revisions 428681 from http://svn.asterisk.org/svn/asterisk/branches/12
git-svn-id: https://origsvn.digium.com/svn/asterisk/branches/13@428687 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_stasis.c | 310 |
1 files changed, 310 insertions, 0 deletions
diff --git a/tests/test_stasis.c b/tests/test_stasis.c index ba82e83ad..2e83e3b70 100644 --- a/tests/test_stasis.c +++ b/tests/test_stasis.c @@ -361,6 +361,61 @@ AST_TEST_DEFINE(subscription_messages) return AST_TEST_PASS; } +AST_TEST_DEFINE(subscription_pool_messages) +{ + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe); + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); + RAII_VAR(char *, expected_uniqueid, NULL, ast_free); + int complete; + struct stasis_subscription_change *change; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test subscribe/unsubscribe messages using a threadpool subscription"; + info->description = "Test subscribe/unsubscribe messages using a threadpool subscription"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + topic = stasis_topic_create("TestTopic"); + ast_test_validate(test, NULL != topic); + + consumer = consumer_create(0); + ast_test_validate(test, NULL != consumer); + + uut = stasis_subscribe_pool(topic, consumer_exec, consumer); + ast_test_validate(test, NULL != uut); + ao2_ref(consumer, +1); + expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut)); + + uut = stasis_unsubscribe(uut); + complete = consumer_wait_for_completion(consumer); + ast_test_validate(test, 1 == complete); + + ast_test_validate(test, 2 == consumer->messages_rxed_len); + ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[0])); + ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[1])); + + change = stasis_message_data(consumer->messages_rxed[0]); + ast_test_validate(test, topic == change->topic); + ast_test_validate(test, 0 == strcmp("Subscribe", change->description)); + ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid)); + + change = stasis_message_data(consumer->messages_rxed[1]); + ast_test_validate(test, topic == change->topic); + ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description)); + ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid)); + + return AST_TEST_PASS; +} + AST_TEST_DEFINE(publish) { RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); @@ -455,6 +510,55 @@ AST_TEST_DEFINE(publish_sync) return AST_TEST_PASS; } +AST_TEST_DEFINE(publish_pool) +{ + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe); + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); + int actual_len; + const char *actual; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test publishing with a threadpool"; + info->description = "Test publishing to a subscriber whose\n" + "subscription dictates messages are received through a\n" + "threadpool."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + topic = stasis_topic_create("TestTopic"); + ast_test_validate(test, NULL != topic); + + consumer = consumer_create(1); + ast_test_validate(test, NULL != consumer); + + uut = stasis_subscribe_pool(topic, consumer_exec, consumer); + ast_test_validate(test, NULL != uut); + ao2_ref(consumer, +1); + + test_data = ao2_alloc(1, NULL); + ast_test_validate(test, NULL != test_data); + test_message_type = stasis_message_type_create("TestMessage", NULL); + test_message = stasis_message_create(test_message_type, test_data); + + stasis_publish(topic, test_message); + + actual_len = consumer_wait_for(consumer, 1); + ast_test_validate(test, 1 == actual_len); + actual = stasis_message_data(consumer->messages_rxed[0]); + ast_test_validate(test, test_data == actual); + + return AST_TEST_PASS; +} + AST_TEST_DEFINE(unsubscribe_stops_messages) { RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); @@ -650,6 +754,106 @@ AST_TEST_DEFINE(interleaving) return AST_TEST_PASS; } +AST_TEST_DEFINE(subscription_interleaving) +{ + RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup); + + RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); + + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + + RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup); + + RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel); + RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel); + RAII_VAR(struct stasis_subscription *, sub1, NULL, stasis_unsubscribe); + RAII_VAR(struct stasis_subscription *, sub2, NULL, stasis_unsubscribe); + + RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup); + + int actual_len; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test sending interleaved events to a parent topic with different subscribers"; + info->description = "Test sending events to a parent topic.\n" + "This test creates three topics (one parent, two children)\n" + "and publishes messages alternately between the children.\n" + "It verifies that the messages are received in the expected\n" + "order, for different subscription types: one with a dedicated\n" + "thread, the other on the Stasis threadpool.\n"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + test_message_type = stasis_message_type_create("test", NULL); + ast_test_validate(test, NULL != test_message_type); + + test_data = ao2_alloc(1, NULL); + ast_test_validate(test, NULL != test_data); + + test_message1 = stasis_message_create(test_message_type, test_data); + ast_test_validate(test, NULL != test_message1); + test_message2 = stasis_message_create(test_message_type, test_data); + ast_test_validate(test, NULL != test_message2); + test_message3 = stasis_message_create(test_message_type, test_data); + ast_test_validate(test, NULL != test_message3); + + parent_topic = stasis_topic_create("ParentTestTopic"); + ast_test_validate(test, NULL != parent_topic); + topic1 = stasis_topic_create("Topic1"); + ast_test_validate(test, NULL != topic1); + topic2 = stasis_topic_create("Topic2"); + ast_test_validate(test, NULL != topic2); + + forward_sub1 = stasis_forward_all(topic1, parent_topic); + ast_test_validate(test, NULL != forward_sub1); + forward_sub2 = stasis_forward_all(topic2, parent_topic); + ast_test_validate(test, NULL != forward_sub2); + + consumer1 = consumer_create(1); + ast_test_validate(test, NULL != consumer1); + + consumer2 = consumer_create(1); + ast_test_validate(test, NULL != consumer2); + + sub1 = stasis_subscribe(parent_topic, consumer_exec, consumer1); + ast_test_validate(test, NULL != sub1); + ao2_ref(consumer1, +1); + + sub2 = stasis_subscribe_pool(parent_topic, consumer_exec, consumer2); + ast_test_validate(test, NULL != sub2); + ao2_ref(consumer2, +1); + + stasis_publish(topic1, test_message1); + stasis_publish(topic2, test_message2); + stasis_publish(topic1, test_message3); + + actual_len = consumer_wait_for(consumer1, 3); + ast_test_validate(test, 3 == actual_len); + + actual_len = consumer_wait_for(consumer2, 3); + ast_test_validate(test, 3 == actual_len); + + ast_test_validate(test, test_message1 == consumer1->messages_rxed[0]); + ast_test_validate(test, test_message2 == consumer1->messages_rxed[1]); + ast_test_validate(test, test_message3 == consumer1->messages_rxed[2]); + + ast_test_validate(test, test_message1 == consumer2->messages_rxed[0]); + ast_test_validate(test, test_message2 == consumer2->messages_rxed[1]); + ast_test_validate(test, test_message3 == consumer2->messages_rxed[2]); + + return AST_TEST_PASS; +} + struct cache_test_data { char *id; char *value; @@ -1389,6 +1593,104 @@ AST_TEST_DEFINE(router) return AST_TEST_PASS; } +AST_TEST_DEFINE(router_pool) +{ + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join); + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup); + int actual_len, ret; + struct stasis_message *actual; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test message routing via threadpool"; + info->description = "Test simple message routing when\n" + "the subscriptions dictate usage of the Stasis\n" + "threadpool.\n"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + topic = stasis_topic_create("TestTopic"); + ast_test_validate(test, NULL != topic); + + consumer1 = consumer_create(1); + ast_test_validate(test, NULL != consumer1); + consumer2 = consumer_create(1); + ast_test_validate(test, NULL != consumer2); + consumer3 = consumer_create(1); + ast_test_validate(test, NULL != consumer3); + + test_message_type1 = stasis_message_type_create("TestMessage1", NULL); + ast_test_validate(test, NULL != test_message_type1); + test_message_type2 = stasis_message_type_create("TestMessage2", NULL); + ast_test_validate(test, NULL != test_message_type2); + test_message_type3 = stasis_message_type_create("TestMessage3", NULL); + ast_test_validate(test, NULL != test_message_type3); + + uut = stasis_message_router_create_pool(topic); + ast_test_validate(test, NULL != uut); + + ret = stasis_message_router_add( + uut, test_message_type1, consumer_exec, consumer1); + ast_test_validate(test, 0 == ret); + ao2_ref(consumer1, +1); + ret = stasis_message_router_add( + uut, test_message_type2, consumer_exec, consumer2); + ast_test_validate(test, 0 == ret); + ao2_ref(consumer2, +1); + ret = stasis_message_router_set_default(uut, consumer_exec, consumer3); + ast_test_validate(test, 0 == ret); + ao2_ref(consumer3, +1); + + test_data = ao2_alloc(1, NULL); + ast_test_validate(test, NULL != test_data); + test_message1 = stasis_message_create(test_message_type1, test_data); + ast_test_validate(test, NULL != test_message1); + test_message2 = stasis_message_create(test_message_type2, test_data); + ast_test_validate(test, NULL != test_message2); + test_message3 = stasis_message_create(test_message_type3, test_data); + ast_test_validate(test, NULL != test_message3); + + stasis_publish(topic, test_message1); + stasis_publish(topic, test_message2); + stasis_publish(topic, test_message3); + + actual_len = consumer_wait_for(consumer1, 1); + ast_test_validate(test, 1 == actual_len); + actual_len = consumer_wait_for(consumer2, 1); + ast_test_validate(test, 1 == actual_len); + actual_len = consumer_wait_for(consumer3, 1); + ast_test_validate(test, 1 == actual_len); + + actual = consumer1->messages_rxed[0]; + ast_test_validate(test, test_message1 == actual); + + actual = consumer2->messages_rxed[0]; + ast_test_validate(test, test_message2 == actual); + + actual = consumer3->messages_rxed[0]; + ast_test_validate(test, test_message3 == actual); + + /* consumer1 and consumer2 do not get the final message. */ + ao2_cleanup(consumer1); + ao2_cleanup(consumer2); + + return AST_TEST_PASS; +} + static const char *cache_simple(struct stasis_message *message) { const char *type_name = @@ -1748,8 +2050,10 @@ static int unload_module(void) AST_TEST_UNREGISTER(message_type); AST_TEST_UNREGISTER(message); AST_TEST_UNREGISTER(subscription_messages); + AST_TEST_UNREGISTER(subscription_pool_messages); AST_TEST_UNREGISTER(publish); AST_TEST_UNREGISTER(publish_sync); + AST_TEST_UNREGISTER(publish_pool); AST_TEST_UNREGISTER(unsubscribe_stops_messages); AST_TEST_UNREGISTER(forward); AST_TEST_UNREGISTER(cache_filter); @@ -1757,8 +2061,10 @@ static int unload_module(void) AST_TEST_UNREGISTER(cache_dump); AST_TEST_UNREGISTER(cache_eid_aggregate); AST_TEST_UNREGISTER(router); + AST_TEST_UNREGISTER(router_pool); AST_TEST_UNREGISTER(router_cache_updates); AST_TEST_UNREGISTER(interleaving); + AST_TEST_UNREGISTER(subscription_interleaving); AST_TEST_UNREGISTER(no_to_json); AST_TEST_UNREGISTER(to_json); AST_TEST_UNREGISTER(no_to_ami); @@ -1773,8 +2079,10 @@ static int load_module(void) AST_TEST_REGISTER(message_type); AST_TEST_REGISTER(message); AST_TEST_REGISTER(subscription_messages); + AST_TEST_REGISTER(subscription_pool_messages); AST_TEST_REGISTER(publish); AST_TEST_REGISTER(publish_sync); + AST_TEST_REGISTER(publish_pool); AST_TEST_REGISTER(unsubscribe_stops_messages); AST_TEST_REGISTER(forward); AST_TEST_REGISTER(cache_filter); @@ -1782,8 +2090,10 @@ static int load_module(void) AST_TEST_REGISTER(cache_dump); AST_TEST_REGISTER(cache_eid_aggregate); AST_TEST_REGISTER(router); + AST_TEST_REGISTER(router_pool); AST_TEST_REGISTER(router_cache_updates); AST_TEST_REGISTER(interleaving); + AST_TEST_REGISTER(subscription_interleaving); AST_TEST_REGISTER(no_to_json); AST_TEST_REGISTER(to_json); AST_TEST_REGISTER(no_to_ami); |