Database
 sql >> Database >  >> RDS >> Database

Configurazione di Service Broker per l'elaborazione asincrona

Nel mio ultimo articolo ho parlato dei vantaggi dell'implementazione dell'elaborazione asincrona tramite Service Broker in SQL Server rispetto agli altri metodi esistenti per l'elaborazione disaccoppiata di attività lunghe. In questo articolo esamineremo tutti i componenti che devono essere configurati per una configurazione di base di Service Broker in un unico database e le considerazioni importanti per la gestione delle conversazioni tra i servizi del broker. Per iniziare, dovremo creare un database e abilitare il database per l'utilizzo di Service Broker:

CREATE DATABASE AsyncProcessingDemo;
GO
 
IF (SELECT is_broker_enabled FROM sys.databases WHERE name = N'AsyncProcessingDemo') = 0
BEGIN
  ALTER DATABASE AsyncProcessingDemo SET ENABLE_BROKER;
END
GO
 
USE AsyncProcessingDemo;
GO

Configurazione dei componenti del broker

Gli oggetti di base che devono essere creati nel database sono i tipi di messaggio per i messaggi, un contratto che definisce come verranno inviati i messaggi tra i servizi, una coda e il servizio iniziatore, una coda e il servizio di destinazione. Molti esempi online per Service Broker mostrano nomi di oggetti complessi per i tipi di messaggi, i contratti e i servizi per Service Broker. Tuttavia, non è necessario che i nomi siano complessi e i nomi degli oggetti semplici possono essere utilizzati per qualsiasi oggetto.

Per i messaggi, dovremo creare un tipo di messaggio per la richiesta, che si chiamerà AsyncRequest e un tipo di messaggio per il risultato, che sarà chiamato AsyncResult . Entrambi utilizzeranno XML che verrà convalidato come correttamente formato dai servizi del broker per inviare e ricevere i dati richiesti dai servizi.

-- Create the message types
CREATE MESSAGE TYPE [AsyncRequest] VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE [AsyncResult]  VALIDATION = WELL_FORMED_XML;

Il contratto specifica che il AsyncRequest verrà inviato dal servizio di avvio al servizio di destinazione e che il servizio di destinazione restituirà un AsyncResult messaggio al servizio di avvio. Il contratto può anche specificare più tipi di messaggio per l'iniziatore e la destinazione, o che un tipo di messaggio specifico può essere inviato da qualsiasi servizio, se l'elaborazione specifica lo richiede.

-- Create the contract
CREATE CONTRACT [AsyncContract] 
(
  [AsyncRequest] SENT BY INITIATOR, 
  [AsyncResult]  SENT BY TARGET
);

Per ciascuno dei servizi è necessario creare una coda per fornire l'archiviazione dei messaggi ricevuti dal servizio. Il servizio di destinazione a cui verrà inviata la richiesta deve essere creato specificando il AsyncContract per consentire l'invio di messaggi al servizio. In questo caso il servizio si chiama ProcessingService e verrà creato su ProcessingQueue all'interno della banca dati. Il servizio di avvio non richiede la specifica di un contratto, il che lo rende in grado di ricevere messaggi solo in risposta a una conversazione che è stata avviata da esso.

-- Create the processing queue and service - specify the contract to allow sending to the service
CREATE QUEUE ProcessingQueue;
CREATE SERVICE [ProcessingService] ON QUEUE ProcessingQueue ([AsyncContract]);
 
-- Create the request queue and service 
CREATE QUEUE RequestQueue;
CREATE SERVICE [RequestService] ON QUEUE RequestQueue;

Invio di un messaggio per l'elaborazione

Come spiegato nell'articolo precedente, preferisco implementare una stored procedure wrapper per inviare un nuovo messaggio a un servizio broker, in modo che possa essere modificato una volta per ridimensionare le prestazioni, se necessario. Questa procedura è un semplice wrapper per creare una nuova conversazione e inviare il messaggio al ProcessingService .

-- Create the wrapper procedure for sending messages
CREATE PROCEDURE dbo.SendBrokerMessage 
	@FromService SYSNAME,
	@ToService   SYSNAME,
	@Contract    SYSNAME,
	@MessageType SYSNAME,
	@MessageBody XML
AS
BEGIN
  SET NOCOUNT ON;
 
  DECLARE @conversation_handle UNIQUEIDENTIFIER;
 
  BEGIN TRANSACTION;
 
  BEGIN DIALOG CONVERSATION @conversation_handle
    FROM SERVICE @FromService
    TO SERVICE @ToService
    ON CONTRACT @Contract
    WITH ENCRYPTION = OFF;
 
  SEND ON CONVERSATION @conversation_handle
    MESSAGE TYPE @MessageType(@MessageBody);
 
  COMMIT TRANSACTION;
END
GO

Utilizzando la stored procedure wrapper possiamo ora inviare un messaggio di prova al ProcessingService per confermare che abbiamo impostato correttamente i servizi del broker.

-- Send a request
EXECUTE dbo.SendBrokerMessage
  @FromService = N'RequestService',
  @ToService   = N'ProcessingService',
  @Contract    = N'AsyncContract',
  @MessageType = N'AsyncRequest',
  @MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>';
 
-- Check for message on processing queue
SELECT CAST(message_body AS XML) FROM ProcessingQueue;
GO

Elaborazione dei messaggi

Mentre potremmo elaborare manualmente i messaggi da ProcessingQueue , probabilmente vorremo che i messaggi vengano elaborati automaticamente mentre vengono inviati a ProcessingService . Per fare ciò, è necessario creare una stored procedure di attivazione che testeremo e quindi legheremo alla coda per automatizzare l'elaborazione all'attivazione della coda. Per elaborare un messaggio dobbiamo RECEIVE il messaggio dalla coda all'interno di una transazione, insieme al tipo di messaggio e all'handle di conversazione per il messaggio. Il tipo di messaggio garantisce che venga applicata la logica appropriata al messaggio in elaborazione e l'handle di conversazione consente di inviare una risposta al servizio di avvio quando il messaggio è stato elaborato.

Il RECEIVE Il comando consente di elaborare un singolo messaggio o più messaggi all'interno dello stesso handle di conversazione o gruppo in un'unica transazione. Per elaborare più messaggi, è necessario utilizzare una variabile di tabella, oppure per eseguire l'elaborazione di messaggi singoli, è possibile utilizzare una variabile locale. La procedura di attivazione seguente recupera un singolo messaggio dalla coda, controlla il tipo di messaggio per determinare se si tratta di un AsyncRequest messaggio, quindi esegue il processo di lunga durata in base alle informazioni sul messaggio ricevute. Se non riceve un messaggio all'interno del ciclo, attenderà fino a 5000 ms, o 5 secondi, affinché un altro messaggio entri nella coda prima di uscire dal ciclo e terminarne l'esecuzione. Dopo aver elaborato un messaggio, crea un AsyncResult messaggio e lo rimanda all'iniziatore sullo stesso handle di conversazione da cui è stato ricevuto il messaggio. La procedura controlla anche il tipo di messaggio per determinare se è un EndDialog o Error è stato ricevuto un messaggio per ripulire la conversazione terminandola.

-- Create processing procedure for processing queue
CREATE PROCEDURE dbo.ProcessingQueueActivation
AS
BEGIN
  SET NOCOUNT ON;
 
  DECLARE @conversation_handle UNIQUEIDENTIFIER;
  DECLARE @message_body XML;
  DECLARE @message_type_name sysname;
 
  WHILE (1=1)
  BEGIN
    BEGIN TRANSACTION;
 
    WAITFOR
    (
      RECEIVE TOP (1)
        @conversation_handle = conversation_handle,
        @message_body = CAST(message_body AS XML),
        @message_type_name = message_type_name
      FROM ProcessingQueue
    ), TIMEOUT 5000;
 
    IF (@@ROWCOUNT = 0)
    BEGIN
      ROLLBACK TRANSACTION;
      BREAK;
    END
 
    IF @message_type_name = N'AsyncRequest'
    BEGIN
      -- Handle complex long processing here
      -- For demonstration we'll pull the account number and send a reply back only
 
      DECLARE @AccountNumber INT = @message_body.value('(AsyncRequest/AccountNumber)[1]', 'INT');
 
      -- Build reply message and send back
      DECLARE @reply_message_body XML = N'
        ' + CAST(@AccountNumber AS NVARCHAR(11)) + '
      ';
 
      SEND ON CONVERSATION @conversation_handle
        MESSAGE TYPE [AsyncResult] (@reply_message_body);
    END
 
    -- If end dialog message, end the dialog
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
    BEGIN
      END CONVERSATION @conversation_handle;
    END
 
    -- If error message, log and end conversation
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
    BEGIN
      -- Log the error code and perform any required handling here
      -- End the conversation for the error
      END CONVERSATION @conversation_handle;
    END
 
    COMMIT TRANSACTION;
  END
END
GO

La RequestQueue dovrà anche elaborare i messaggi che gli vengono inviati, quindi una procedura aggiuntiva per l'elaborazione di AsyncResult i messaggi restituiti dalla procedura ProcessingQueueActivation devono essere creati. Poiché sappiamo che il messaggio AsnycResult significa che tutto il lavoro di elaborazione è stato completato, la conversazione può essere terminata una volta elaborato quel messaggio, che invierà un messaggio EndDialog al ProcessingService, che verrà quindi elaborato dalla sua procedura di attivazione per terminare il conversazione ripulire tutto ed evitare il fuoco e dimenticare i problemi che si verificano quando le conversazioni vengono terminate correttamente.

-- Create procedure for processing replies to the request queue
CREATE PROCEDURE dbo.RequestQueueActivation
AS
BEGIN
  SET NOCOUNT ON;
 
  DECLARE @conversation_handle UNIQUEIDENTIFIER;
  DECLARE @message_body XML;
  DECLARE @message_type_name sysname;
 
  WHILE (1=1)
  BEGIN
    BEGIN TRANSACTION;
 
    WAITFOR
    (
      RECEIVE TOP (1)
        @conversation_handle = conversation_handle,
        @message_body = CAST(message_body AS XML),
        @message_type_name = message_type_name
      FROM RequestQueue
    ), TIMEOUT 5000;
 
    IF (@@ROWCOUNT = 0)
    BEGIN
      ROLLBACK TRANSACTION;
      BREAK;
    END
 
    IF @message_type_name = N'AsyncResult'
    BEGIN
      -- If necessary handle the reply message here
      DECLARE @AccountNumber INT = @message_body.value('(AsyncResult/AccountNumber)[1]', 'INT');
 
      -- Since this is all the work being done, end the conversation to send the EndDialog message
      END CONVERSATION @conversation_handle;
    END
 
    -- If end dialog message, end the dialog
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
    BEGIN
       END CONVERSATION @conversation_handle;
    END
 
    -- If error message, log and end conversation
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
    BEGIN
       END CONVERSATION @conversation_handle;
    END
 
    COMMIT TRANSACTION;
  END
END
GO

Test delle procedure

Prima di automatizzare l'elaborazione delle code per i nostri servizi, è importante testare le procedure di attivazione per assicurarsi che elaborino i messaggi in modo appropriato e per evitare che una coda venga disabilitata in caso di errore che non viene gestito correttamente. Poiché c'è già un messaggio su ProcessingQueue il ProcessingQueueActivation è possibile eseguire la procedura per elaborare quel messaggio. Tieni presente che il WAITFOR la procedura richiederà 5 secondi per terminare, anche se il messaggio viene elaborato immediatamente dalla coda. Dopo aver elaborato il messaggio, possiamo verificare che la procedura abbia funzionato correttamente interrogando il RequestQueue per vedere se un AsyncResult messaggio esiste e quindi possiamo verificare che il RequestQueueActivation la procedura funziona correttamente eseguendola.

-- Process the message from the processing queue
EXECUTE dbo.ProcessingQueueActivation;
GO
 
-- Check for reply message on request queue
SELECT CAST(message_body AS XML) FROM RequestQueue;
GO
 
-- Process the message from the request queue
EXECUTE dbo.RequestQueueActivation;
GO

Automatizzazione dell'elaborazione

A questo punto, tutti i componenti sono completi per automatizzare completamente la nostra elaborazione. L'unica cosa rimasta è associare le procedure di attivazione alle loro code appropriate, quindi inviare un altro messaggio di prova per convalidare che venga elaborato e nulla rimanga nelle code in seguito.

-- Alter the processing queue to specify internal activation
ALTER QUEUE ProcessingQueue
    WITH ACTIVATION
    ( 
      STATUS = ON,
      PROCEDURE_NAME = dbo.ProcessingQueueActivation,
      MAX_QUEUE_READERS = 10,
      EXECUTE AS SELF
    );
GO
 
-- Alter the request queue to specify internal activation
ALTER QUEUE RequestQueue
    WITH ACTIVATION
    ( 
      STATUS = ON,
      PROCEDURE_NAME = dbo.RequestQueueActivation,
      MAX_QUEUE_READERS = 10,
      EXECUTE AS SELF
    );
GO
 
-- Test automated activation
-- Send a request
 
EXECUTE dbo.SendBrokerMessage
	@FromService = N'RequestService',
	@ToService   = N'ProcessingService',
	@Contract    = N'AsyncContract',
	@MessageType = N'AsyncRequest',
	@MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>';
 
-- Check for message on processing queue 
-- nothing is there because it was automatically processed
SELECT CAST(message_body AS XML) FROM ProcessingQueue;
GO
 
-- Check for reply message on request queue 
-- nothing is there because it was automatically processed
SELECT CAST(message_body AS XML) FROM RequestQueue;
GO

Riepilogo

I componenti di base per l'elaborazione asincrona automatizzata in SQL Server Service Broker possono essere configurati in un'unica configurazione di database per consentire l'elaborazione disaccoppiata di attività a esecuzione prolungata. Questo può essere un potente strumento per migliorare le prestazioni dell'applicazione, dall'esperienza di un utente finale, disaccoppiando l'elaborazione dalle interazioni dell'utente finale con l'applicazione.