Nel mio ultimo articolo ho parlato dei vantaggi dell'implementazione dell'elaborazione asincrona tramite Service Broker in SQL Server rispetto agli altri metodi esistenti per l'elaborazione disaccoppiata di attività lunghe. In questo articolo esamineremo tutti i componenti che devono essere configurati per una configurazione di base di Service Broker in un unico database e le considerazioni importanti per la gestione delle conversazioni tra i servizi del broker. Per iniziare, dovremo creare un database e abilitare il database per l'utilizzo di Service Broker:
CREATE DATABASE AsyncProcessingDemo; GO IF (SELECT is_broker_enabled FROM sys.databases WHERE name = N'AsyncProcessingDemo') = 0 BEGIN ALTER DATABASE AsyncProcessingDemo SET ENABLE_BROKER; END GO USE AsyncProcessingDemo; GO
Configurazione dei componenti del broker
Gli oggetti di base che devono essere creati nel database sono i tipi di messaggio per i messaggi, un contratto che definisce come verranno inviati i messaggi tra i servizi, una coda e il servizio iniziatore, una coda e il servizio di destinazione. Molti esempi online per Service Broker mostrano nomi di oggetti complessi per i tipi di messaggi, i contratti e i servizi per Service Broker. Tuttavia, non è necessario che i nomi siano complessi e i nomi degli oggetti semplici possono essere utilizzati per qualsiasi oggetto.
Per i messaggi, dovremo creare un tipo di messaggio per la richiesta, che si chiamerà AsyncRequest
e un tipo di messaggio per il risultato, che sarà chiamato AsyncResult
. Entrambi utilizzeranno XML che verrà convalidato come correttamente formato dai servizi del broker per inviare e ricevere i dati richiesti dai servizi.
-- Create the message types CREATE MESSAGE TYPE [AsyncRequest] VALIDATION = WELL_FORMED_XML; CREATE MESSAGE TYPE [AsyncResult] VALIDATION = WELL_FORMED_XML;
Il contratto specifica che il AsyncRequest
verrà inviato dal servizio di avvio al servizio di destinazione e che il servizio di destinazione restituirà un AsyncResult
messaggio al servizio di avvio. Il contratto può anche specificare più tipi di messaggio per l'iniziatore e la destinazione, o che un tipo di messaggio specifico può essere inviato da qualsiasi servizio, se l'elaborazione specifica lo richiede.
-- Create the contract CREATE CONTRACT [AsyncContract] ( [AsyncRequest] SENT BY INITIATOR, [AsyncResult] SENT BY TARGET );
Per ciascuno dei servizi è necessario creare una coda per fornire l'archiviazione dei messaggi ricevuti dal servizio. Il servizio di destinazione a cui verrà inviata la richiesta deve essere creato specificando il AsyncContract
per consentire l'invio di messaggi al servizio. In questo caso il servizio si chiama ProcessingService
e verrà creato su ProcessingQueue
all'interno della banca dati. Il servizio di avvio non richiede la specifica di un contratto, il che lo rende in grado di ricevere messaggi solo in risposta a una conversazione che è stata avviata da esso.
-- Create the processing queue and service - specify the contract to allow sending to the service CREATE QUEUE ProcessingQueue; CREATE SERVICE [ProcessingService] ON QUEUE ProcessingQueue ([AsyncContract]); -- Create the request queue and service CREATE QUEUE RequestQueue; CREATE SERVICE [RequestService] ON QUEUE RequestQueue;
Invio di un messaggio per l'elaborazione
Come spiegato nell'articolo precedente, preferisco implementare una stored procedure wrapper per inviare un nuovo messaggio a un servizio broker, in modo che possa essere modificato una volta per ridimensionare le prestazioni, se necessario. Questa procedura è un semplice wrapper per creare una nuova conversazione e inviare il messaggio al ProcessingService
.
-- Create the wrapper procedure for sending messages CREATE PROCEDURE dbo.SendBrokerMessage @FromService SYSNAME, @ToService SYSNAME, @Contract SYSNAME, @MessageType SYSNAME, @MessageBody XML AS BEGIN SET NOCOUNT ON; DECLARE @conversation_handle UNIQUEIDENTIFIER; BEGIN TRANSACTION; BEGIN DIALOG CONVERSATION @conversation_handle FROM SERVICE @FromService TO SERVICE @ToService ON CONTRACT @Contract WITH ENCRYPTION = OFF; SEND ON CONVERSATION @conversation_handle MESSAGE TYPE @MessageType(@MessageBody); COMMIT TRANSACTION; END GO
Utilizzando la stored procedure wrapper possiamo ora inviare un messaggio di prova al ProcessingService
per confermare che abbiamo impostato correttamente i servizi del broker.
-- Send a request EXECUTE dbo.SendBrokerMessage @FromService = N'RequestService', @ToService = N'ProcessingService', @Contract = N'AsyncContract', @MessageType = N'AsyncRequest', @MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>'; -- Check for message on processing queue SELECT CAST(message_body AS XML) FROM ProcessingQueue; GO
Elaborazione dei messaggi
Mentre potremmo elaborare manualmente i messaggi da ProcessingQueue
, probabilmente vorremo che i messaggi vengano elaborati automaticamente mentre vengono inviati a ProcessingService
. Per fare ciò, è necessario creare una stored procedure di attivazione che testeremo e quindi legheremo alla coda per automatizzare l'elaborazione all'attivazione della coda. Per elaborare un messaggio dobbiamo RECEIVE
il messaggio dalla coda all'interno di una transazione, insieme al tipo di messaggio e all'handle di conversazione per il messaggio. Il tipo di messaggio garantisce che venga applicata la logica appropriata al messaggio in elaborazione e l'handle di conversazione consente di inviare una risposta al servizio di avvio quando il messaggio è stato elaborato.
Il RECEIVE
Il comando consente di elaborare un singolo messaggio o più messaggi all'interno dello stesso handle di conversazione o gruppo in un'unica transazione. Per elaborare più messaggi, è necessario utilizzare una variabile di tabella, oppure per eseguire l'elaborazione di messaggi singoli, è possibile utilizzare una variabile locale. La procedura di attivazione seguente recupera un singolo messaggio dalla coda, controlla il tipo di messaggio per determinare se si tratta di un AsyncRequest
messaggio, quindi esegue il processo di lunga durata in base alle informazioni sul messaggio ricevute. Se non riceve un messaggio all'interno del ciclo, attenderà fino a 5000 ms, o 5 secondi, affinché un altro messaggio entri nella coda prima di uscire dal ciclo e terminarne l'esecuzione. Dopo aver elaborato un messaggio, crea un AsyncResult
messaggio e lo rimanda all'iniziatore sullo stesso handle di conversazione da cui è stato ricevuto il messaggio. La procedura controlla anche il tipo di messaggio per determinare se è un EndDialog
o Error
è stato ricevuto un messaggio per ripulire la conversazione terminandola.
-- Create processing procedure for processing queue CREATE PROCEDURE dbo.ProcessingQueueActivation AS BEGIN SET NOCOUNT ON; DECLARE @conversation_handle UNIQUEIDENTIFIER; DECLARE @message_body XML; DECLARE @message_type_name sysname; WHILE (1=1) BEGIN BEGIN TRANSACTION; WAITFOR ( RECEIVE TOP (1) @conversation_handle = conversation_handle, @message_body = CAST(message_body AS XML), @message_type_name = message_type_name FROM ProcessingQueue ), TIMEOUT 5000; IF (@@ROWCOUNT = 0) BEGIN ROLLBACK TRANSACTION; BREAK; END IF @message_type_name = N'AsyncRequest' BEGIN -- Handle complex long processing here -- For demonstration we'll pull the account number and send a reply back only DECLARE @AccountNumber INT = @message_body.value('(AsyncRequest/AccountNumber)[1]', 'INT'); -- Build reply message and send back DECLARE @reply_message_body XML = N' ' + CAST(@AccountNumber AS NVARCHAR(11)) + ' '; SEND ON CONVERSATION @conversation_handle MESSAGE TYPE [AsyncResult] (@reply_message_body); END -- If end dialog message, end the dialog ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @conversation_handle; END -- If error message, log and end conversation ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' BEGIN -- Log the error code and perform any required handling here -- End the conversation for the error END CONVERSATION @conversation_handle; END COMMIT TRANSACTION; END END GO
La RequestQueue
dovrà anche elaborare i messaggi che gli vengono inviati, quindi una procedura aggiuntiva per l'elaborazione di AsyncResult
i messaggi restituiti dalla procedura ProcessingQueueActivation devono essere creati. Poiché sappiamo che il messaggio AsnycResult significa che tutto il lavoro di elaborazione è stato completato, la conversazione può essere terminata una volta elaborato quel messaggio, che invierà un messaggio EndDialog al ProcessingService, che verrà quindi elaborato dalla sua procedura di attivazione per terminare il conversazione ripulire tutto ed evitare il fuoco e dimenticare i problemi che si verificano quando le conversazioni vengono terminate correttamente.
-- Create procedure for processing replies to the request queue CREATE PROCEDURE dbo.RequestQueueActivation AS BEGIN SET NOCOUNT ON; DECLARE @conversation_handle UNIQUEIDENTIFIER; DECLARE @message_body XML; DECLARE @message_type_name sysname; WHILE (1=1) BEGIN BEGIN TRANSACTION; WAITFOR ( RECEIVE TOP (1) @conversation_handle = conversation_handle, @message_body = CAST(message_body AS XML), @message_type_name = message_type_name FROM RequestQueue ), TIMEOUT 5000; IF (@@ROWCOUNT = 0) BEGIN ROLLBACK TRANSACTION; BREAK; END IF @message_type_name = N'AsyncResult' BEGIN -- If necessary handle the reply message here DECLARE @AccountNumber INT = @message_body.value('(AsyncResult/AccountNumber)[1]', 'INT'); -- Since this is all the work being done, end the conversation to send the EndDialog message END CONVERSATION @conversation_handle; END -- If end dialog message, end the dialog ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @conversation_handle; END -- If error message, log and end conversation ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' BEGIN END CONVERSATION @conversation_handle; END COMMIT TRANSACTION; END END GO
Test delle procedure
Prima di automatizzare l'elaborazione delle code per i nostri servizi, è importante testare le procedure di attivazione per assicurarsi che elaborino i messaggi in modo appropriato e per evitare che una coda venga disabilitata in caso di errore che non viene gestito correttamente. Poiché c'è già un messaggio su ProcessingQueue
il ProcessingQueueActivation
è possibile eseguire la procedura per elaborare quel messaggio. Tieni presente che il WAITFOR
la procedura richiederà 5 secondi per terminare, anche se il messaggio viene elaborato immediatamente dalla coda. Dopo aver elaborato il messaggio, possiamo verificare che la procedura abbia funzionato correttamente interrogando il RequestQueue
per vedere se un AsyncResult
messaggio esiste e quindi possiamo verificare che il RequestQueueActivation
la procedura funziona correttamente eseguendola.
-- Process the message from the processing queue EXECUTE dbo.ProcessingQueueActivation; GO -- Check for reply message on request queue SELECT CAST(message_body AS XML) FROM RequestQueue; GO -- Process the message from the request queue EXECUTE dbo.RequestQueueActivation; GO
Automatizzazione dell'elaborazione
A questo punto, tutti i componenti sono completi per automatizzare completamente la nostra elaborazione. L'unica cosa rimasta è associare le procedure di attivazione alle loro code appropriate, quindi inviare un altro messaggio di prova per convalidare che venga elaborato e nulla rimanga nelle code in seguito.
-- Alter the processing queue to specify internal activation ALTER QUEUE ProcessingQueue WITH ACTIVATION ( STATUS = ON, PROCEDURE_NAME = dbo.ProcessingQueueActivation, MAX_QUEUE_READERS = 10, EXECUTE AS SELF ); GO -- Alter the request queue to specify internal activation ALTER QUEUE RequestQueue WITH ACTIVATION ( STATUS = ON, PROCEDURE_NAME = dbo.RequestQueueActivation, MAX_QUEUE_READERS = 10, EXECUTE AS SELF ); GO -- Test automated activation -- Send a request EXECUTE dbo.SendBrokerMessage @FromService = N'RequestService', @ToService = N'ProcessingService', @Contract = N'AsyncContract', @MessageType = N'AsyncRequest', @MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>'; -- Check for message on processing queue -- nothing is there because it was automatically processed SELECT CAST(message_body AS XML) FROM ProcessingQueue; GO -- Check for reply message on request queue -- nothing is there because it was automatically processed SELECT CAST(message_body AS XML) FROM RequestQueue; GO
Riepilogo
I componenti di base per l'elaborazione asincrona automatizzata in SQL Server Service Broker possono essere configurati in un'unica configurazione di database per consentire l'elaborazione disaccoppiata di attività a esecuzione prolungata. Questo può essere un potente strumento per migliorare le prestazioni dell'applicazione, dall'esperienza di un utente finale, disaccoppiando l'elaborazione dalle interazioni dell'utente finale con l'applicazione.