Постановка задачи

В фирме есть 80 филиалов в 80 городах по РФ с одинаковой по структуре базой данных. В базах данных филиалов порядка 150 таблиц с одинаковым названием, но с разным, своим наполнением, за исключением общих справочников. Все филиалы имеют индивидуальные 2-значные номера. Для каждого филиала выделен свой SQL Server 2008R2 Enterprise edition (x64). Базы данных в филиалах имеют одинаковое название. Филиалы различаются разными IP-адресами и доменными именами. Все сервера БД объединены в единую сеть и единый домен (Active Directory) предприятия. Данные в таблицах филиалов могут как добавляться, так и изменяться и удаляться, изменения могут происходить и за долгий срок от текущего момента времени.

Есть отдельный сервер, где реализована еженедельная сборка данных со всех филиалов в единую корпоративную базу данных (далее будем называть эту БД — ЕКБД по следующему алгоритму:

  1. Из файла бэкапа восстанавливается пустая ЕКБД без индексов, но со всеми таблицами и процедурами.
  2. С помощью bcp.exe и job SQL Server в ЕКБД загружаются данные в 8 параллельных потоков со всех филиалов.
  3. Первичные ключи — идентификаторы в таблицах при загрузке преобразуются по принципу добавления номера филиала в начало. Например, на филиале 17 загружаем с ID = 177, этот ключ в ЕКБД будет — 1700000000177. Т.е. код филиала умножается на 10^N + ID.
  4. После загрузки последнего job стартует построение необходимых индексов для работы.
  5. Весь процесс загрузки выполняется порядка 1.5-2 суток, при этом ЕКБД во время сборки не доступна.

Необходимо переработать сборку ЕКБД так, чтобы:

  1. Загрузка успевала проходить за ночь.
  2. Филиалы имеют в РФ разные часовые пояса, поэтому при загрузке процесс сборки не мешал операционной деятельности филиалов — базы данных филиалов должны быть доступны.
  3. ЕКБД по возможности должна быть доступна всегда.

Исследование проекта

Прежде всего очевидно было то, что загрузку ЕКБД необходимо сделать инкрементальной, то есть каждую ночь загружать не полностью все таблицы, а изменения с момента последней загрузки. Так же требуется написание полной загрузки, так как хотя бы один раз это выполнить потребуется. На практике возможны варианты, когда какие-то данные утеряны и необходимо просто восстановить их, перезагрузив: как весь филиал полностью, так и отдельно взятые таблицы.

Рассматривались варианты секционирования по дате/ID чтобы можно было уже догружать новые секции. Но! Т.к. возможно изменение данных «задним числом» на источниках, хоть и редкое, рассматривать секции по дате с точки зрения вывода партиций в режим readonly - не наш вариант. Для полной загрузки было выбрано следующее решение — секционировать все таблицы на ЕКБД по «Коду Филиала», Так как первичный ключ на ЕКБД содержит код филиала, функция секционирования использовалась для первичного ключа таким образом, чтобы в каждой секции были данные только одного филиала. Этот выбор позволяет загружать полностью все данные во вновь созданные таблицы из отдельного филиала, что не мешает работать в это время пользователям с партицированной таблицей и после загрузки командой «SWITCH PARTITION» включать за миллисекунды (команда затрагивает только метаданные) загруженные данные в результирующую таблицу. Плюс этой технологии еще в том, что при необходжимости можно полностью загружать любую отдельно взятую таблицу любого филиала. Т.е., допустим,  «сломался» сервер БД одного из филиалов, восстановили БД из бэкапов. Необходимо заново загрузить данные только этого филиала, данные же других филиалов должны загружаться планово, инкрементно. С помощью описанной технологии нам ничего не мешает восстановить данные одного сервера, а не всех сразу.

Для того, чтобы не накладывать блокировки на таблицы, с которых читаем данные на серверах филиалов, воспользовались технологией DATABASE SNAPSHOT — эта команда создает новую БД в режиме только для чтения — моментальный снимок на фиксированный момент времени. Чтение со снимка базы данных дало нам следующие преимущества:

  1. Не блокирующее чтение с таблиц источников.
  2. Избежание несогласованности данных. Т.е. мы можем загружать, к примеру, 2 зависимые таблицы в разные моменты времени и возможна ситуация, когда одну таблицу мы уже загрузили, а вторую начнем загружать после добавления данных. Очевидно, что в одной таблице будут новые данные, а в другой - нет. Со снимками БД эта ситуация автоматически устраняется.

Для инкрементальной загрузки рассматривались варианты:

  1. Триггеры и логирование изменений в локальные таблицы. Минусы — все писать заново и поддерживать, триггеры и вставки дают дополнительную просадку по производительности.
  2. Триггеры и отсылка изменений с помощью Service Broker. Все правильно настроив, данные автоматически переносятся, хотя и с небольшой задержкой. Из минусов — сложность написания и в случае добавления новых полей и таблиц, внутренним сотрудникам с недостаточной квалификацией будем сложно поддерживать этот вариант.
  3. Change Tracking — интересная технология. Из минусов - синхронная, что дает дополнительную нагрузку на БД. Также читать данные в этом варианте можно только с помощью функций, а это всегда медленнее, чем считывать данные из таблиц.
  4. Change Data Capture. Из минусов - создает дополнительные JOB на сервере. Из плюсов — асинхронная технология отслеживания данных: данные можно читать как измененные с помощью функций, так и напрямую из таблиц. В итоге эта технология и была выбрана.

Рассмотрим вариант, откуда будет выполняться загрузка:

  1. Процесс загрузки выполняется с филиальского сервера БД.
  2. Процесс загрузки выполняется с сервера ЕКБД

Можно запускать загрузку как на источнике, так и на приемнике, большой разницы нет. Я выбрал на приемнике, так как приемник изначально позиционируется для записи и при очередной загрузке необходимо сохранять LSN/timestamp для каждой загружемой таблицы в отдельную лог-таблицу. Источник теоретически может быть ReadOnly, поэтому вариант с ЕКБД выглядел предпочтительней.

Краткое описание алгоритма сборки ЕКБД

Варианты загрузок

При проектировании ЕКБД было понимание того, что потребуется как минимум 2 типа загрузки данных, а именно:

1. Полная загрузка — загрузка, когда нет возможности загрузить изменения с момента предыдущей загрузки. Например:

a. Первая загрузка данных — данных просто еще нет в ЕКБД.
b. Изменение структуры загружаемой таблицы — появились дополнительные данные (например, поля), которых нет, и которые требуется загрузить полностью.
c. Изменения между последней загрузкой и хранящиеся не полностью — например, retention period for CDC по умолчанию 3 дня, предыдущая загрузка была 7 дней назад, а в изменениях мы можем извлечь последние 3 дня — потерю 4 дней не восполнить, следует перезагрузить данные полностью.
d. Потеря цепочки загрузки — здесь имеется в виду тот случай, когда данные из филиала были загружены, и филиал был восстановлен из резервной копии временем раньше последней загрузки.
e. Ошибки при изменениях и отладке — возможны ошибки при изменении структуры. Неправильно отработал алгоритм инкрементной загрузки - следует исправить этот алгоритм, и провести полную загрузку, только после этого инкрементную, чтобы в БД не содержались ошибочные данные.

2. Инкрементная загрузка. На основе предварительных тестов была выбрана технология MS-SQL 2008 — Change Data Capture. При желании вникнуть поглубже в технологию можно ознакомиться здесь http://www.sql.ru/blogs/kab/1591

Технологии, используемые для сборки ЕКБД:

1. SQL Server Integration Services (SSIS) — бесплатный сервис Microsoft, который поставляется с SQL Server и позиционируется как средство загрузки данных. SSIS выбран по нескольким причинам:

a. Визуально наглядное средство программирования — приятное дополнение, но не решающий фактор при выборе средства интеграции.
b. Бесплатный инструмент в существующем SQL Server.
c. Легко организуется параллельность и многопоточность нескольких загрузок одновременно.
d. Поддержка технологии быстрой загрузки в SQL Server — BULK INSERT.

2. DATABASE SNAPSHOT — моментальные снимки баз данных используются как при полной, так и при инкрементной загрузке, используются в БД филиалов.

Делается моментальный снимок загружаемой БД перед загрузкой, загружаются данные, в конце — удаляется моментальный снимок. Это делается по нескольким причинам:

a. Отсутствие блокировок при загрузке данных, даже при наличии транзакций с уровнем изоляций SERIALIZABLE.
b. При загрузке связанных таблиц в разных транзакциях возможно нарушение логической целостности данных в приемнике, моментальный снимок БД источника позволяет избежать этой проблемы.

3. Change Data Capture (CDC) — система отслеживания изменений SQL Server 2008, применяется в БД филиалов. Использует для работы SQL Agent, при включении в БД создает JOB на текущем экземпляре SQL Server. Работает асинхронно, т.е. изменения происходят, а данные в систему отслеживания изменений попадают позже по времени. Это свойство имеет как плюсы, так и минусы:

a. Хорошо то, что асинхронность не дает высокую нагрузку на ресурсы экземпляра SQL Server.
b. Плохо то, что не всегда получается произвести сверку загрузки. Данные могут быть изменены реально, а CDC может это отметить при высокой степени загрузки дисков, CPU и активности процессов (или выключенном агенте) значительно позже, после освобождения ресурсов (включении агента).

4. Секционирование — используется на ЕКБД, в нашем случае дает 2 положительных эффекта:

a. При полной загрузке мы загружаем данные не в таблицу-приемник, а во вновь созданную таблицу с такой же структурой, и после загрузки командами ALTER TABLE SWITCH PARTITION выводим из приемника устаревшие данные и вводим новые. Тем самым мы уходим от удалений в таблице приемнике (которые могут быть долгими по времени), и, в случае неуспешной полной загрузки этой таблицы, мы в приемнике не теряем данные (предварительно мы их не удаляем). Также мы уходим от блокировок таблицы-приемника при загрузке и удалении данных многих филиалов.
b. При инкрементной загрузке — мы загодя устанавливаем параметр таблицы LOCK_ESCALATION=AUTO, чем гарантируем возможную эскалацию до уровня секции, а не таблицы. Это гарантирует нам отсутствие блокировок при одновременной работе нескольких процессов, каждый из которых работает исключительно со своей секцией. Т.е. не технически — процесс инкрементной загрузки любого филиала не мешает другому процессу загрузки другого филиала. Дополнительную информацию увидите, например, здесь — http://www.gilev.ru/escalation/.

5. ALOW_SNAPSHOT_SOLATION — замена уровня изоляций транзакций по умолчанию на SNAPSHOT, используется на ЕКБД. Позволяет конечным пользователям, которые извлекают данные с ЕКБД продолжать работу и не блокировать загрузку ЕКБД. Механизм версионности, вместо механизма блокировок.

6. BULK INSERT — техника загрузки SSIS, которая по скорости в разы превышает стандартную вставку.

7. MERGE — команда T-SQL которая при инкрементной загрузки либо добавляет, либо удаляет, либо изменяет данные в целевой таблице из загруженных данных CDC во временную таблицу. Работает существенно быстрее при больших объемах. Также вместо 3х операций INSERT, UPDATE, DELETE пишем одну команду MERGE, которая производит 3 команды сразу.

8. Dynamic Management View (DMV) — используются для создания альтернативных структур, временных таблиц, индексов к ним, ограничений. Например, чтобы вставить данные в секцию, необходимо иметь исходную таблицу, по структуре данных и индексам и ограничениям полностью аналогичную таблице-приемнику.

Вспомогательная БД — KBDSYS

Для протоколирования загрузок и ошибок, хранения алгоритмов (хранимых процедур и различных функций) и данных (таблиц) как на ЕКБД, так и в каждом филиале была создана дополнительная база данных с именем KBDSYS. Перед рассмотрением алгоритма загрузки предлагаю пройтись по созданным объектам этой БД. Все объекты для ЕКБД в базе данных KBDSYS находятся в схеме RPL.

Структура данных:

Структура — управление способом загрузки

Структура состоит из 3х таблиц:

  • rpl.filial_list — список филиалов, номера филиалов и имена серверов, с помощью этой таблицы в динамических запросах происходит ассоциирование сервера к номеру филиала.
  • rpl.table_list — список загружаемых таблиц.
  • rpl.table_options — нам интересны 3 поля — ссылки на 2 таблицы и поле mask. Это поле — битовая маска опций загрузки. Битовая маска используется по причине того, что структура условий может быть доработана, и правильнее вести 1 поле с разработанными интерфейсами хранимых процедур, нежели использовать множество параметров и впоследствии поддерживать эти параметры. Поле MASK может использовать и интерпретировать в комплексе следующие биты:
  • 0×01 — запрет загрузки. Т.е. если этот бит для определенной таблицы на определенном филиале выставлен, загрузка производиться не будет.

0×02 — принудительная полная загрузка.
0×04 — способ включения-выключения данных с секциями при полной загрузки. При этом флаге алгоритм меняется с «сначала загружаем данные в дубль, после включаем в таблицу приемник — требуется дисковое место и для хранения старых данных и для загрузки новых данных, плюс в том, что в случае ошибки загрузки старые данные не теряются» на алгоритм экономии дискового пространства при загрузке — «сначала чистим данные перед загрузкой, после на это же место загружаем новые данные».

Для удобства работы имеем 3 хранимых процедуры

  • rpl.spu_select_load — используется в SSIS пакете, выбирает, какую загрузку производить - полную или инкрементную для таблицы, входной параметр
  • rpl.spu_get_table_option — выводит в виде матрицы данные таблицы rpl.table_options, где колонки - номера филиалов, строки таблицы, а значения матрицы - поле MASK. Процедура без параметров
  • rpl. spu_set_table_option — устанавливает в матрице маску поля заданной таблицы. Нам интересны 3 параметра:

@MASK — собственно сама маска
@filial_id — номер филиала, если NULL — то все филиалы
@table_id — номер таблицы (идентификатор в rpl.table_list), если NULL — то все таблицы

Описанной структурой администратор может воздействовать на следующую загрузку. Т.е. нужные таблицы запретить к загрузке, нужные таблицы принудительно полностью загрузить, оставшиеся таблицы пакет загрузит на свое усмотрение («свое усмотрение» регулируется процедурой — rpl.spu_select_load).

Структура — логирование загрузки

Структура состоит из 2-х представлений:

  • rpl.log_loading — в прошлом это была одна таблица, позже разбита на две, из которых создано это представление. Сделано с целью ухода от частых изменений таблиц — от многочисленных ожиданий и блокировок на этой таблице-представлении. Хранит данные о загрузках:

Имя бд, таблицы

Дата, время начала загрузки

Время окончания загрузки — это поле принципиально важно для загрузки. Если это поле не заполнено, то считается, что загрузка по таблице еще выполняется. В случае попытки запуска пакета идет проверка на наличие других процессов загрузки выбранного филиала по этому полю.

Текстовый статус ошибки

LSN загрузки — это поле принципиально важно для загрузки. После каждой загрузки сохраняется LSN последней команды. Требуется для того, чтобы при следующей инкрементной загрузке этот считанный LSN был стартом отсечения для загрузки измененных данных. Т.е. сегодня мы загрузили таблицу с LSN — 1000, при следующей загрузке мы будем выбирать записи больше этого сохраненного LSN, больше 1000.

  • rpl.v_log_loading — по сути данные предыдущего представления, только содержит не всю историю загрузок, а информацию о последней загрузке. Данное представление реально не используется пакетом, но удобно для администратора смотреть лог последней загрузки.

Представление rpl.log_loading состоит из 2х таблиц:

  • rpl.log_loading_head — таблица заполняется при старте загрузки: дата начала загрузки, имя таблицы.
  • rpl.log_loading_tail — таблица заполняется при завершении загрузки (ранее был апдейт, позже изменили на вставку в эту таблицу): дата завершения загрузки, статус загрузки.

Структура — rpl.compare_table

Таблица была создана значительно позже самого пакета. Была инициирована работа по сверке загружаемых таблиц. В этой таблице содержатся результаты каждой сверки.

Изменения загружаемой базы данных на филиале.

На филиале для инкрементной загрузки прежде всего включаем CDC на базу данных и CDC на каждую таблицу, которую планируется загружать инкрементно.

Далее для каждой загружаемой таблицы создаем табличную функцию с именем [rpl].fn_RPL_ИМЯ_ТАБЛИЦЫ (@filial, @lsn). Наличие этой функции принципиально важно для текущей архитектуры загрузки ЕКБД. Функция возвращает данные из таблицы в случае полной загрузки = параметр @lsn = NULL. Либо если @lsn - какое-то число, то функция возвращает данные, взятые из таблицы CDC. Соответственно, если изменяется структура загружаемой таблицы, необходимо также приводить в соответствие ее табличную функцию для загрузочного SSIS пакета. Пример функции:

ALTER function [rpl].[fn_RPL_OST]
(
	@FILIAL_CODE VARCHAR(6),
	@__$start_lsn binary(10)
)
returns table as return
(
	with rn_filial as
	(
		select
			convert(varchar(53),'0'+right('00' + @FILIAL_CODE,2)+OST_ID) as [OST_ID],
			PLS_ID, C_VAL_ID, ANL_ID, D_OST, OST_BAL, FILIAL_ID, DBT_OBR, DBT_OBR_R, DBT_OST, DBT_OST_R, KRT_OBR, KRT_OBR_R, KRT_OST, KRT_OST_R,
			case __$operation
				when 1	then 'D'
				when 2	then 'I'
				when 3	then 'P'
						else 'N'
			end __$operation,
			__$start_lsn,
			__$rn = row_number() over
			(
				partition by OST_ID
				order by __$start_lsn desc, __$seqval desc
			)
		from cdc.dbo_OST_ct
		where [OST_BAL] = 'V'
		and __$operation <> 3 and __$start_lsn > @__$start_lsn
	)
	, rn_tbl_filial as
	(
		select
			convert(varchar(53),'0'+right('00' + @FILIAL_CODE,2)+OST_ID) as [OST_ID],
			PLS_ID, C_VAL_ID, ANL_ID, D_OST, OST_BAL, FILIAL_ID, DBT_OBR, DBT_OBR_R, DBT_OST, DBT_OST_R, KRT_OBR, KRT_OBR_R, KRT_OST, KRT_OST_R,
			'I' __$operation,
			convert(binary(10),0) __$start_lsn,
			1 __$rn
		from dbo.OST
		where [OST_BAL] = 'V' and @__$start_lsn is null 
	)
	select *
	from rn_tbl_filial
	union all
	select *
	from rn_filial
	where __$rn=1
)

Изменения базы данных приемника на ЕКБД

В загружаемой базе данных создана файловая группа FG_FILIALS, которая обязательно должна быть выбрана как DEFAULT. Это сделано с той целью, что таблицы приемника - все в этой файловой группе и альтернативные таблицы для полной загрузки должны быть созданы в этой же самой файловой группе. Они создаются командой SELECT top(0) * INTO — создание пустой таблицы такой же структуры в файловой группе DEFAULT.

Каждая таблица, по которой будет производиться загрузка, должна быть секционирована по филиалу или по первичному ключу, в случае, если в поле первичного ключа вшит код филиала. Соответственно, сколько первичных ключей разного типа в базе данных приемнике, столько создано функций и схем секционирования и по этим схемам секционирования таблицы секционированы для последующей загрузки — см. рисунок.

Краткий алгоритм загрузки данных в ЕКБД

Прежде всего, предлагаю ознакомиться с SSIS алгоритмом визуально:

Загрузка базы данных целиком

В пакете определены 2 коннекта — CDB — приемник и FILIAL — источник. Также определены 2 переменные, на которые следует обратить внимание:

CDBsrv — имя сервера ЕКБД

FilialSrv — имя сервера источника. Эта переменная впоследствии задается входным параметром. Тем самым подставляя разные входные параметры, мы одним пакетом производим загрузку разных филиалов с одинаковой подготовленной структурой. При старте пакета коннекты меняются в зависимости от указанных переменных, а указанные переменные принимают значения в зависимости от заданных параметров.

Прежде всего, перед стартом проверяется наличие других процессов загрузки этого же филиала, с той целью, чтобы не удалить существующий моментальный снимок базы данных и с ошибкой прервать предыдущую загрузку.

Затем, в случае существования другой загрузки — прерываемся с ошибкой (ошибку можно увидеть в логе SQL Server ЕКБД).

В случае отсутствия других загрузок, создаем моментальный снимок загружаемой базы данных — DATABASE SNAPSHOT в филиале.

Далее идет загрузка всех таблиц выбранной базы данных выбранного филиала — описание алгоритма конкретной таблицы смотри ниже. Все загружаемые таблицы начинают загружаться одновременно в параллельных потоках.

По окончанию загрузки в филиале удаляется моментальный снимок базы данных. После этого возможен повторный старт следующей загрузки.

При загрузке данных во временную таблицу необходимо понимать, что за таблицу мы загружаем, из какой базы данных, и из какого филиала. Далее, при полной загрузке и командах ALTER TABLE SWITCH PARTITION временные таблицы должны располагаться обязательно в той БД, где таблицы-приемники. Но при инкрементной загрузке SWITCH PARTITION не требуется, так как будет заливка изменений командой MERGE, поэтому требований расположения временной таблицы хранить в БД, где таблица-приемник  — нет. Для инкрементной загрузки было решено временные таблицы создавать в БД tempdb, по причине того, что БД tempdb по определению должна располагаться на самых быстрых дисках. И если запущен процесс инкрементной загрузки, то по имени временной таблицы требуется определять — какая БД загружается. Архитектурно было решено использовать разные схемы для разных БД и разных филиалов и название схем формировать по следующему шаблону «f[NNN][database]» — буква f, номер филиала цифровой из 3х цифр, имя базы данных. Пример схем: f086dstah, f072buh.

Загрузка отдельно взятой таблицы

Прежде всего определяется LSN последней загрузки для выбранной таблицы, затем отрабатывает процедура rpl.spu_select_load с целью определения дальнейшего поведения — определения какую загрузку выбранный таблицы делать — полную или инкрементную. В зависимости от выбора рассмотрим 2 алгоритма:

Полная загрузка таблицы

На примере таблицы buh.dbo.G_OPR, филиал Москва

  1. В таблицу KBDSYS.rpl.log_loading_head вставляем запись с именем таблицы (buh.f086buh.G_OPR) и текущей датой — помечаем, что началась загрузка.
  2. Создаем пустую временную таблицу в файловой группе DEFAULT только с кластерным индексом (если создать сразу все индексы, загрузка будет длиться намного дольше) — buh.f086buh.G_OPR.
  3. Загружаем данные из источника в эту таблицу
  4. Строим индексы и ограничения на загруженной таблице.
  5. Создаем пустую таблицу со всеми индексами и ограничениями для вывода данных из БД — buh.f086buh.G_OPR$
  6. SWITCH из buh.dbo.G_OPR в buh.f086buh.G_OPR$ — секцию с загруженным филиалом вывели — очистили.
  7. SWITCH из buh.f086buh.G_OPR в buh.dbo.G_OPR — загруженные данные включили в таблицу приемник.
  8. Удалили таблицу — buh.f086buh.G_OPR$.
  9. Помечаем окончание загрузки в KBDSYS.rpl.log_loading_tail, так же в эту таблицу записываем считаный ранее LSN, на который будет опираться алгоритм в следующую загрузку.

Инкрементная загрузка таблицы

На примере таблицы buh.dbo.G_OPR, филиал Москва

  1. В таблицу KBDSYS.rpl.log_loading_head вставляем запись с именем таблицы (buh.f086buh.G_OPR) и текущей датой — помечаем, что началась загрузка.
  2. Создаем пустую временную таблицу в tempdb только с кластерным индексом (другие индексы нам не понадобятся) — tempdb.f086buh.G_OPR.
  3. Загружаем данные из источника в эту таблицу, используя функцию buh.rpl.fn_RPL_G_OPR(‘086’, @lsn) — здесь нам нужен как раз тот LSN, который мы получили ранее.
  4. Командой MERGE накатываем изменения из темпа в приемник
  5. Удалили таблицу — tempdb.f086buh.G_OPR.
  6. Помечаем окончание загрузки в KBDSYS.rpl.log_loading_tail, так же в эту таблицу записываем считаный ранее LSN, на который будет опираться алгоритм в следующую загрузку.