Разработка распределенных приложений в Microsoft.NET Framework

         

Использование очередей сообщений MSMQ в NET Framework


Для работы с очередями сообщений используются классы из пространства имен System.Messaging. Класс System.Messaging.MessageQueue содержит три группы методов.

  • Статические методы для администрирования очередей: Create, Delete, Exists, Purge.
  • Методы поиска общих очередей: GetPublicQueues, GetPublicQueuesByLabel и другие. При их использовании можно создать приложение, которое переключается между несколькими менеджерами очередей в пределах Active Directory, если один из них выходит из строя.
  • Методы для работы с сообщениями (Send, Receive, Peek и другие), в том числе позволяющие использовать обработчик на завершение операции (BeginPeek, BeginReceive).

При применении классов из System.Messaging возможно три варианта работы с очередями сообщений:

  • работа с очередями, не использующими транзакции;
  • работа с очередями, поддерживающими транзакции, при использовании внутренних транзакций MSMQ;
  • работа с очередями, поддерживающими транзакции, при использовании распределенных транзакций COM+.

Определенной трудностью при использовании MSMQ в .NET Framework являются различия по использованию разных видов очередей. В частности, для очередей без транзакций можно установить обработчик на завершение приема сообщения, а для очередей с транзакциями этот способ неприемлем, поскольку само чтение сообщения должно быть оформлено как часть транзакции. Для обоих видов очередей можно поставить обработчик на появление сообщения в очереди, что и является рекомендованным способом.

Для сериализации и десериализации сообщений MSMQ могут использоваться классы XMLMessageFormatter или BinaryMessageFormatter из пространства имен System.Messaging. Класс XMLMessageFormatter использует класс System.Xml.Serialization.XmlSerializer, описанный ранее в теме о сериализации, поэтому при использовании XMLMessageFormatter должны учитываться все особенности использования класса XmlSerializer. Класс BinaryMessageFormatter аналогичным способом использует для сериализации класс BinaryFormatter.

Поскольку классы BinaryFormatter и XmlSerializer имеют различные ограничения на сериализуемые классы и используют совершенно различные процедуры сериализации, в нетривиальном случае переход BinaryMessageFormatter на XMLMessageFormatter или наоборот может привести к определенным изменениям в исходном коде программных компонент.
Рекомендованным для использования с MSMQ следует считать XMLMessageFormatter. Его применение позволяет создать XSD схему для передаваемого сообщения. При использовании MSMQ ни значительно меньший объем сообщения, создаваемого классом BinaryMessageFormatter, ни его меньшее время работы не является принципиальными факторами.

Ниже рассмотрено вспомогательное пространство имен с классами общего вида, реализующими модель "запрос-ответ" при использовании внутренних транзакций MSMQ (рис. 5.2).


Рис. 5.2.  Обслуживание запросов клиентов при использовании MSMQ

Программа использует пространство имен с классами передачи сообщений System.Messaging и пространство имен с коллекциями общего вида.

using System; using System.Messaging; using System.Collections.Generic; Классы используют два делегата общего вида, которые будут связаны с событиям обработки сообщения сервером и получения ответа клиентом.

namespace Seva.Msmq { // типы очередей enum QueueType {NonTransactional, Transactional}; // типы классов форматирования enum QueueFormatter {Binary, Xml}; // делегат общего вида для обработки сервером сообщений клиента delegate AnswerType ProcessRequestEventHandler <RequestType, AnswerType>(Object sender, RequestType request, MessageQueue queueResponse); // делегат общего вида для обработки ответов сервера клиентом delegate void ProcessAnswerEventHandler<RequestType, AnswerType> (Object sender, RequestType request, AnswerType answer); Абстрактный класс MSMQUser, наследуемый классами MSMQServer и MSMQClient.

public abstract class MsmqUser { // использование восстанавливаемых сообщений private bool recoverable = false; public bool Recoverable { get { return recoverable; } set { recoverable = value; } } // объекты форматирования для посылки приема сообщений protected IMessageFormatter requestFormatter; protected IMessageFormatter answerFormatter; // public MsmqUser(QueueFormatter formatterType) { if (formatterType == QueueFormatter.Xml) { requestFormatter = new XmlMessageFormatter( new Type[]{typeof(RequestType)}); answerFormatter = new XmlMessageFormatter( new Type[]{typeof(AnswerType)}); } if (formatterType == QueueFormatter.Binary) { requestFormatter = new BinaryMessageFormatter(); answerFormatter = new BinaryMessageFormatter(); } } } Листинг 5.1. Класс общего вида, посылающий через MSMQ запросы и получающий ответы на них.



class MsmqClient<RequestType, AnswerType> : MsmqUser<RequestType, AnswerType>, IDisposable { // очереди для отсылки запросов и приема ответов private MessageQueue queueSend; private MessageQueue queueReceive; // список необслуженных запросов private Dictionary<String, RequestType> messages; public Dictionary<String, RequestType> Messages { get { return messages;} } // событие, вызываемое при приеме ответа public event ProcessAnswerEventHandler<RequestType, AnswerType> ProcessAnswer; Конструктор, получающий имена очередей для посылки и приема сообщений.

public MsmqClient(String queueSendName, String queueReceiveName, QueueFormatter formatterType): base(formatterType) { // список отправленных сообщений без ответов messages = new Dictionary<String,RequestType>(); // создание очереди для посылки запросов, если она не существует queueSend = MsmqTools.CreateQueue(queueSendName, QueueType.Transactional); // создание очереди для приема ответов, если она нужна if (queueReceiveName != null) { queueReceive = MsmqTools.CreateQueue(queueReceiveName); queueReceive.Formatter = answerFormatter; // считывать из очереди свойство CorrelationId queueReceive.MessageReadPropertyFilter.CorrelationId = true; } else { queueReceive = null; } } В методе Dispose происходит закрытие используемых очередей.

public void Dispose() { queueSend.Close(); queueSend.Dispose(); if (queueReceive != null) { queueReceive.Close(); queueReceive.Dispose(); } } Функции BeginReceive и EndReceive начинают и прекращают прием ответов сервера, изменяя обработчик события PeekComplete очереди ответов.

public void BeginReceive() { // установить обработчик на событие, возникающее при появлении // сообщения в очереди queueReceive.PeekCompleted += OnPeek; // начать отслеживание поступления сообщения в очередь queueReceive.BeginPeek(); } // прекратить прием ответов сервера public void EndReceive() { // отключить обработчик queueReceive.PeekCompleted -= OnPeek; } Функция Send посылает в исходящую очередь запрос общего типа для его обработки сервером.


Для ответа на сообщение серверу следует использовать очередь, указанную в поле ResponseQueue посылаемого сообщения.

public void Send(RequestType request) { // создание нового сообщения Message message = new Message(request, requestFormatter); message.ResponseQueue = queueReceive; // использование восстаналиваемых сообщений message.Recoverable = Recoverable; // послать сообщение; поскольку транзакция состоит из // единственной операции, вместо объекта-транзакции используется // значение MessageQueueTransactionType.Single queueSend.Send(message, MessageQueueTransactionType.Single); // поле message.Id устанавливается после посылки сообщения; // идентификатор сообщения связывается c отосланным запросом // в списке необслуженных запросов messages.Add(message.Id, request); } Обработчик события очереди PeekComplete использует внутренние транзакции MSMQ. В одну транзакцию входит операция чтения ответа из очереди и последующий вызов события ProcessAnswer. Если в ходе обработки события возникло исключение, ответ сервера останется в очереди ответов. Иначе сообщение удаляется из поддерживаемого клиентом списка невыполненных запросов.

public void OnPeek(Object source, PeekCompletedEventArgs asyncResult) { // создание внутренней транзакции MSMQ MessageQueueTransaction transaction = new MessageQueueTransaction(); // начало транзакции transaction.Begin(); try { // прекратить ожидание сообщений в очереди queueReceive.EndPeek(asyncResult.AsyncResult); // получить сообщение из очереди в рамках транзакции Message message = queueReceive.Receive(transaction); // в поле CorrelationId должен быть идентификатор сообщения // с исходным запросом String messageId = message.CorrelationId; // есть ли такое сообщение в списке невыполненных запросов? if (messages.ContainsKey(messageId)) { if (message.Body is AnswerType) { // преобразовать тело сообщения к типу ответа // и вызвать событие по его обработке AnswerType answer = (AnswerType) message.Body; ProcessAnswer(this, messages[messageId], answer); }; messages.Remove(messageId); } // продолжить ожидать сообщения BeginReceive(); // успешное завершение транзакции transaction.Commit(); } catch (Exception e) { // отмена транзакции transaction.Abort(); throw e; } } } Листинг 5.2. MSMQServer – класс общего вида, принимающий через MSMQ запросы и посылающий ответы на них.



class MsmqServer<RequestType, AnswerType>: MsmqUser<RequestType, AnswerType>, IDisposable { // очередь приема запросов private MessageQueue queueReceive; // событие, вызываемое при приеме запроса public event ProcessRequestEventHandler<RequestType, AnswerType> ProcessMessage; Конструктор класса проверяет наличие очереди.

public MsmqServer(String queueReceiveName, QueueFormatter formatterType): base(formatterType) { // создание очереди приема сообщений, если она не существует queueReceive = MsmqTools.CreateQueue(queueReceiveName, QueueType.Transactional); queueReceive.Formatter = requestFormatter; } В методе Dispose происходит закрытие используемых очередей.

public void Dispose() { queueReceive.Close(); queueReceive.Dispose(); } Функции BeginReceive и EndReceive начинают и прекращают прием ответов сервера, изменяя обработчик события PeekComplete очереди ответов.

// начать прием запросов от клиента public void BeginReceive() { queueReceive.PeekCompleted += OnPeek; queueReceive.BeginPeek(); } // прекратить прием запросов от клиента public void EndReceive() { queueReceive.PeekCompleted -= OnPeek; } Метод OnPeek – обработчик события PeekCompleted очереди с запросами. В одну транзакцию входит две операции с очередями – чтения запроса и отправка ответа на него. Для обработки принятого сообщения и создания ответа на него вызывается событие ProcessMessage. В поле ResponseQueue полученного сообщения содержится ссылка на очередь, в которую следует отправить ответ на обработанный запрос.

// обработчки события PeekCompleted очереди с запосами public void OnPeek(Object source, PeekCompletedEventArgs asyncResult) { // создание внутренней транзакции MSMQ MessageQueueTransaction transaction = new MessageQueueTransaction(); // начало транзакции transaction.Begin(); try { queueReceive.EndPeek(asyncResult.AsyncResult); // прием cообщения в рамках транзакции Message message = queueReceive.Receive(transaction); // в поле ResponseQueue содержится ссылка на очередь, // куда следует послать ответ на запрос MessageQueue queueResponse = message.ResponseQueue; try { if (message.Body is RequestType) { RequestType request = (RequestType) message.Body; // вызвать событие обработки запроса AnswerType answer = ProcessMessage(this, request, queueResponse); if ((queueResponse != null) && (answer != null)) { Message answerMessage = new Message(answer, answerFormatter); answerMessage.Label = "Answer"; answerMessage.CorrelationId = message.Id; answerMessage.Recoverable = Recoverable; // послать собщение в рамках транзакции queueResponse.Send(answerMessage, transaction); } } } finally { if (queueResponse != null) { queueResponse.Close(); queueResponse.Dispose(); } }; // продолжить прием запросов BeginReceive(); // завершить транзакцию transaction.Commit(); } catch (Exception e) { // отменить транзакцию в случае ошибки Console.WriteLine(e); transaction.Abort(); throw e; } } } Листинг 5.3. Класс MsmqTools содержит вспомогательный статический метод для создания очереди сообщений.



static class MsmqTools { static public MessageQueue CreateQueue(String queueName) { return CreateQueue(queueName, QueueType.Transactional); } // функция проверяет наличие очереди и создает ее при необходимости static public MessageQueue CreateQueue(String queueName, QueueType type) { MessageQueue messageQueue; // если это частная очередь удаленного компьютера, // то при попытке проверки ее наличие возникает исключение try { if (!MessageQueue.Exists(queueName)) { MessageQueue.Create(queueName, type == QueueType.Transactional); } } catch(Exception) { } MessageQueue messageQueue = new MessageQueue(queueName); return messageQueue; } } } Следует отметить, что при работе с общими очередями можно обращаться к очереди по ее пути, например следующим образом.

queueName = @"Server\PublicQueue"; При использовании частных очередей на удаленном компьютере в большинстве случаев требуется применять прямое имя очереди.

queueName = @"Formatname:DIRECT=OS:Computer\Private$\PrivateName"; Имена используемых очередей следует хранить в конфигурационном файле программы.


Содержание раздела