Sqlserver
 sql >> Database >  >> RDS >> Sqlserver

Implementazione del carico incrementale utilizzando Change Data Capture in SQL Server

Questo articolo sarà interessante per coloro che hanno spesso a che fare con l'integrazione dei dati.

Introduzione

Si supponga che esista un database in cui gli utenti modificano sempre i dati (aggiornano o rimuovono). Forse, questo database è utilizzato da una grande applicazione che non consente di modificare la struttura della tabella. L'attività consiste nel caricare di tanto in tanto i dati da questo database a un altro database su un server diverso. Il modo più semplice per affrontare il problema è caricare i nuovi dati da un database di origine a un database di destinazione con una pulizia preliminare del database di destinazione. È possibile utilizzare questo metodo purché il tempo di caricamento dei dati sia accettabile e non superi le scadenze prestabilite. Cosa succede se sono necessari diversi giorni per caricare i dati? Inoltre, i canali di comunicazione instabili portano alla situazione in cui il caricamento dei dati si interrompe e si riavvia. Se affronti questi ostacoli, ti suggerisco di prendere in considerazione uno degli algoritmi di "ricaricamento dei dati". Significa che sono state apportate solo modifiche ai dati dall'ultimo caricamento.

CDC

In SQL Server 2008, Microsoft ha introdotto un meccanismo di rilevamento dei dati chiamato Change Data Capture (CDC). In generale, lo scopo di questo meccanismo è che l'abilitazione di CDC per qualsiasi tabella di database creerà una tabella di sistema nello stesso database con un nome simile a quello della tabella originale (lo schema sarà il seguente:'cdc' come prefisso più il nome del vecchio schema più "_" e la fine "_CT". Ad esempio, la tabella originale è dbo.Example, quindi la tabella di sistema sarà chiamata cdc.dbo_Example_CT). Memorizzerà tutti i dati che sono stati modificati.

In realtà, per approfondire CDC, considera l'esempio. Ma prima assicurati che SQL Agent che utilizza CDC funzioni sull'istanza di test di SQL Server.

Inoltre, prenderemo in considerazione uno script che crea un database e una tabella di test, popola questa tabella con dati e abilita CDC per questa tabella.

Per comprendere e semplificare l'attività, utilizzeremo un'istanza di SQL Server senza distribuire i database di origine e di destinazione a server diversi.

usa mastergo-- crea un database di origine se non esiste (seleziona * da sys.databases dove nome ='db_src_cdc') crea database db_src_cdcgouse db_src_cdcgo-- abilita CDC se è disabilitato se non esiste (seleziona * da sys.databases dove nome =db_name() e is_cdc_enabled=1) exec sys.sp_cdc_enable_dbgo-- crea un ruolo per le tabelle con CDCif non esiste (seleziona * da sys.sysusers dove name ='CDC_Reader' e issqlrole=1) crea un ruolo CDC_Readergo-- crea un tableif object_id('dbo.Example','U') è null create table dbo.Example ( ID int identity vincolo PK_Example chiave primaria, Title varchar(200) not null )go-- popola il tableinsert dbo.Example (Title) values( 'Uno'),('Due'),('Tre'),('Quattro'),('Cinque');go-- abilita CDC per la tabella se non esiste (seleziona * da sys.tables dove is_tracked_by_cdc =1 and name ='Example') exec sys.sp_cdc_enable_table @source_schema ='dbo', @source_name ='Example', @role_name ='CDC_Reader'go-- popola la tabella con alcuni dati. Cambieremo o elimineremo qualcosa di aggiornamento dbo.Exampleset Title =reverse(Title)where ID in (2,3,4);delete from dbo.Example where ID in (1,2);set identity_insert dbo.Example on;insert dbo. Esempio (ID, Titolo) valori(1,'Uno'),(6,'Sei');set identity_insert dbo.Example off;vai

Ora, diamo un'occhiata a cosa abbiamo dopo aver eseguito questo script nelle tabelle dbo.Example e cdc.dbo_Example_CT (va notato che CDC è asincrono. I dati vengono inseriti nelle tabelle in cui viene archiviato il rilevamento delle modifiche dopo un certo periodo di tempo ).

seleziona * da dbo.Example;
Titolo ID---- -------- 1 Uno 3 eerhT 4 ruoF 5 Cinque 6 Sei
select row_number() su ( partizione per ID order by __$start_lsn desc, __$seqval desc ) come __$rn, *da cdc.dbo_Example_CT;
__$rn __$start_lsn __$end_lsn __$seqval __$operazione __$update_mask ID Titolo------ --------------------- - ----------- ---------------------- ------------ ---- ------------ --- ----------- 1 0x0000003A000000580005 NULL 0x0000003A000000580003 2 0x03 1 One 2 0x0000003A000000560006 NULL 0x0000003A000000560002 1 0x03 1 One 1 0x0000003A000000560006 NULL 0x0000003A000000560005 1 0x03 2 owT 2 0x0000003A000000540005 NULL 0x0000003A000000540002 3 0x02 2 Two 3 0x0000003A000000540005 NULL 0x0000003A000000540002 4 0x02 2 owT 1 0x0000003A000000540005 NULL 0x0000003A000000540003 3 0x02 3 Three 2 0x0000003A000000540005 NULL 0x0000003A000000540003 4 0x02 3 eerhT 1 0x0000003A000000540005 NULL 0x0000003A000000540004 3 0x02 4 Quattro 2 0x0000003A000000540005 NULL 0x0000003A000000540004 4 0x02 4 ruoF 1 0x0000003A000000580005 NULL 0x0000003A000000580004 2 0 

Considerare in dettaglio la struttura della tabella in cui è memorizzato il rilevamento delle modifiche. I campi __ $start_lsn e __ $seqval sono rispettivamente LSN (numero di sequenza del registro nel database) e il numero di transazione all'interno della transazione. C'è una proprietà importante in questi campi, vale a dire, possiamo essere sicuri che il record con un LSN più alto verrà eseguito in seguito. Grazie a questa proprietà, possiamo facilmente ottenere lo stato più recente di ogni record nella query, filtrando la nostra selezione in base alla condizione – dove __ $ rn =1.

Il campo operazione __$ contiene il codice transazione:

  • 1:il record viene eliminato
  • 2 – il record è inserito
  • 3, 4 – il record viene aggiornato. I vecchi dati prima dell'aggiornamento sono 3, i nuovi dati sono 4.

Oltre ai campi di servizio con prefisso «__$», i campi della tabella originale sono completamente duplicati. Queste informazioni sono sufficienti per procedere al caricamento incrementale.

Configurazione di un database per il caricamento dei dati

Crea una tabella nel nostro database di destinazione del test, in cui verranno caricati i dati, nonché una tabella aggiuntiva per archiviare i dati sul registro di caricamento.

usa mastergo-- crea un database di destinazione se non esiste (seleziona * da sys.databases dove name ='db_dst_cdc') crea database db_dst_cdcgouse db_dst_cdcgo-- crea un tableif object_id('dbo.Example','U') is null create table dbo.Example ( ID int vincolo PK_Example chiave primaria, Title varchar(200) not null )go-- crea una tabella per memorizzare il carico logif object_id('dbo.log_cdc','U') è null create table dbo .log_cdc ( table_name nvarchar(512) not null, dt datetime not null default getdate(), lsn binary(10) not null default(0x0), vincolo pk_log_cdc primary key (table_name, dt desc) )vai

Vorrei attirare la vostra attenzione sui campi della tabella LOG_CDC:

  • TABLE_NAME memorizza le informazioni su quale tabella è stata caricata (è possibile caricare più tabelle in futuro, da database diversi o anche da server diversi; il formato della tabella è 'SERVER_NAME.DB_NAME.SCHEMA_NAME.TABLE_NAME'
  • DT è un campo della data e dell'ora di caricamento, facoltativo per il caricamento incrementale. Tuttavia, sarà utile per controllare il caricamento.
  • LSN:dopo il caricamento di una tabella, è necessario memorizzare le informazioni sul punto in cui iniziare il caricamento successivo, se necessario. Di conseguenza, dopo ogni caricamento, aggiungiamo l'ultimo (massimo) __ $ start_lsn in questa colonna.

Algoritmo per il caricamento dei dati

Come descritto sopra, usando la query, possiamo ottenere l'ultimo stato della tabella con l'aiuto delle funzioni della finestra. Se conosciamo l'LSN dell'ultimo caricamento, la prossima volta che carichiamo possiamo filtrare dall'origine tutti i dati, le cui modifiche sono superiori all'LSN memorizzato, se c'era almeno un caricamento precedente completo:

con incr_Example as( select row_number() over ( partizione per ID order by __$start_lsn desc, __$seqval desc ) come __$rn, * da db_src_cdc.cdc.dbo_Example_CT dove __$operazione <> 3 e __$ start_lsn> @lsn)select * da incr_Example

Quindi, possiamo ottenere tutti i record per il carico completo, se l'LSN del carico non è archiviato:

con incr_Example as( select row_number() over ( partizione per ID order by __$start_lsn desc, __$seqval desc ) come __$rn, * da db_src_cdc.cdc.dbo_Example_CT dove __$operazione <> 3 e __$ start_lsn> @lsn), full_Example as( seleziona * da db_src_cdc.dbo.Example dove @lsn è null)select ID, Title, __$ operationfrom incr_Examplewhere __$rn =1union allselect ID, Title, 2 as __$ operationfrom full_Example

Pertanto, a seconda del valore @LSN, questa query visualizzerà tutte le ultime modifiche (ignorando quelle provvisorie) con lo stato Rimosso o meno, o tutti i dati dalla tabella originale, aggiungendo lo stato 2 (nuovo record) – questo campo viene utilizzato solo per unificare due selezioni. Con questa query, possiamo facilmente implementare il caricamento completo o il ricaricamento utilizzando il comando MERGE (a partire dalla versione SQL 2008).

Per evitare colli di bottiglia che possono creare processi alternativi e per caricare dati abbinati da tabelle diverse (in futuro caricheremo più tabelle ed, eventualmente, potrebbero esserci relazioni relazionali tra di loro), suggerisco di utilizzare uno snapshot DB sul database di origine ( un'altra funzionalità di SQL 2008).

Il testo completo del caricamento è il seguente:

[expand title=”Codice”]

/* Algoritmo di caricamento dei dati*/-- crea uno snapshot del database se esiste (seleziona * da sys.databases dove name ='db_src_cdc_ss' ) rilascia il database db_src_cdc_ss;declare @query nvarchar(max);select @query =N' crea il database db_src_cdc_ss su ( name =N'''+name+ ''', filename =N'''+[filename]+'.ss'' ) come snapshot di db_src_cdc'from db_src_cdc.sys.sysfiles dove groupid =1; exec ( @query );-- leggi LSN dal precedente loaddeclare @lsn binary(10) =(seleziona max(lsn) da db_dst_cdc.dbo.log_cdc dove table_name ='localhost.db_src_cdc.dbo.Example');-- clear una tabella prima del caricamento completo se @lsn è null tronca la tabella db_dst_cdc.dbo.Example;-- carica processwith incr_Example as( select row_number() over ( partiziona per ID order by __$start_lsn desc, __$seqval desc ) come __$rn , * da db_src_cdc_ss.cdc.dbo_Example_CT dove __$operazione <> 3 e __$start_lsn> @lsn), full_Example as( seleziona * da db_src_cdc_ss.dbo.Example dove @lsn è null), cte_Example as( seleziona ID, Titolo, __$operazione da incr_Example dove __$rn =1 union all select ID, Title, 2 come __$operazione da full_Example)merge db_dst_cdc.dbo.Example come trg usando cte_Example come src su trg.ID=src.IDquando abbinato e __$operazione =1 quindi elimina quando corrisponde e __$operazione <> 1 quindi aggiorna set trg.Title =src.Titlequando non corrisponde a destinazione e __$operazione <> 1 quindi inserisci (ID, titolo) valori (src.ID, src .Title);-- segna la fine del processo di caricamento e l'ultimo LSNinsert db_dst_cdc.dbo.log_cdc (table_name, lsn)values ​​('localhost.db_src_cdc.dbo.Example', isnull((select max(__$start_lsn) from db_src_cdc_ss.cdc.dbo_Example_CT),0))-- elimina lo snapshot del database se esiste (seleziona * da sys.databases dove name ='db_src_cdc_ss' ) elimina il database db_src_cdc_ss

[/espandi]