Разработка распределенных приложений в Microsoft.NET Framework


Использование очередей сообщений MSMQ в NET Framework - часть 5


class MsmqServer<RequestType, AnswerType>: MsmqUser<RequestType, AnswerType>, IDisposable { // очередь приема запросов private MessageQueue queueReceive; // событие, вызываемое при приеме запроса public event ProcessRequestEventHandler<RequestType, AnswerType> ProcessMessage;

Конструктор класса проверяет наличие очереди.

public MsmqServer(String queueReceiveName, QueueFormatter formatterType): base(formatterType) { // создание очереди приема сообщений, если она не существует queueReceive = MsmqTools.CreateQueue(queueReceiveName, QueueType.Transactional); queueReceive.Formatter = requestFormatter; }

В методе Dispose происходит закрытие используемых очередей.

public void Dispose() { queueReceive.Close(); queueReceive.Dispose(); }

Функции BeginReceive и EndReceive начинают и прекращают прием ответов сервера, изменяя обработчик события PeekComplete очереди ответов.

// начать прием запросов от клиента public void BeginReceive() { queueReceive.PeekCompleted += OnPeek; queueReceive.BeginPeek(); } // прекратить прием запросов от клиента public void EndReceive() { queueReceive.PeekCompleted -= OnPeek; }

Метод OnPeek – обработчик события PeekCompleted очереди с запросами. В одну транзакцию входит две операции с очередями – чтения запроса и отправка ответа на него. Для обработки принятого сообщения и создания ответа на него вызывается событие ProcessMessage. В поле ResponseQueue полученного сообщения содержится ссылка на очередь, в которую следует отправить ответ на обработанный запрос.

// обработчки события PeekCompleted очереди с запосами public void OnPeek(Object source, PeekCompletedEventArgs asyncResult) { // создание внутренней транзакции MSMQ MessageQueueTransaction transaction = new MessageQueueTransaction(); // начало транзакции transaction.Begin(); try { queueReceive.EndPeek(asyncResult.AsyncResult); // прием cообщения в рамках транзакции Message message = queueReceive.Receive(transaction); // в поле ResponseQueue содержится ссылка на очередь, // куда следует послать ответ на запрос MessageQueue queueResponse = message.ResponseQueue; try { if (message.Body is RequestType) { RequestType request = (RequestType) message.Body; // вызвать событие обработки запроса AnswerType answer = ProcessMessage(this, request, queueResponse); if ((queueResponse != null) && (answer != null)) { Message answerMessage = new Message(answer, answerFormatter); answerMessage.Label = "Answer"; answerMessage.CorrelationId = message.Id; answerMessage.Recoverable = Recoverable; // послать собщение в рамках транзакции queueResponse.Send(answerMessage, transaction); } } } finally { if (queueResponse != null) { queueResponse.Close(); queueResponse.Dispose(); } }; // продолжить прием запросов BeginReceive(); // завершить транзакцию transaction.Commit(); } catch (Exception e) { // отменить транзакцию в случае ошибки Console.WriteLine(e); transaction.Abort(); throw e; } } } Листинг 5.3.

Класс MsmqTools содержит вспомогательный статический метод для создания очереди сообщений.




Начало  Назад  Вперед