Dwh etl что это
Основные функции ETL-систем
ETL – аббревиатура от Extract, Transform, Load. Это системы корпоративного класса, которые применяются, чтобы привести к одним справочникам и загрузить в DWH и EPM данные из нескольких разных учетных систем.
Вероятно, большинству интересующихся хорошо знакомы принципы работы ETL, но как таковой статьи, описывающей концепцию ETL без привязки к конкретному продукту, на я Хабре не нашел. Это и послужило поводом написать отдельный текст.
Хочу оговориться, что описание архитектуры отражает мой личный опыт работы с ETL-инструментами и мое личное понимание «нормального» применения ETL – промежуточным слоем между OLTP системами и OLAP системой или корпоративным хранилищем.
Хотя в принципе существуют ETL, который можно поставить между любыми системами, лучше интеграцию между учетными системами решать связкой MDM и ESB. Если же вам для интеграции двух зависимых учетных систем необходим функционал ETL, то это ошибка проектирования, которую надо исправлять доработкой этих систем.
Зачем нужна ETL система
Проблема, из-за которой в принципе родилась необходимость использовать решения ETL, заключается в потребностях бизнеса в получении достоверной отчетности из того бардака, который творится в данных любой ERP-системы.
Как работает ETL система
Все основные функции ETL системы умещаются в следующий процесс:
В разрезе потока данных это несколько систем-источников (обычно OLTP) и система приемник (обычно OLAP), а так же пять стадий преобразования между ними:
Особенности архитектуры
Реализация процессов 4 и 5 с точки зрения архитектуры тривиальна, все сложности имеют технический характер, а вот реализация процессов 1, 2 и 3 требует дополнительного пояснения.
Процесс загрузки
При проектировании процесса загрузки данных необходимо помнить о том что:
Процесс валидации
Данный процесс отвечает за выявление ошибок и пробелов в данных, переданных в ETL.
Само программирование или настройка формул проверки не вызывает вопросов, главный вопрос – как вычислить возможные виды ошибок в данных, и по каким признакам их идентифицировать?
Возможные виды ошибок в данных зависят от того какого рода шкалы применимы для этих данных. (Ссылка на прекрасный пост, объясняющий, какие существуют виды шкал — http://habrahabr.ru/post/246983/).
Ближе к практике в каждом из передаваемых типов данных в 95% случаев возможны следующие ошибки:
Соответственно проверки на ошибки реализуются либо формулами, либо скриптами в редакторе конкретного ETL-инструмента.
А если вообще по большому счету, то большая часть ваших валидаций будет на соответствие справочников, а это [select * from a where a.field not in (select…) ]
При этом для сохранения аудиторского следа разумно сохранять в системе две отдельные таблицы – rawdata и cleandata с поддержкой связи 1:1 между строками.
Процесс мэппинга
Процесс мэппинга так же реализуется с помощью соответствующих формул и скриптов, есть три хороших правила при его проектировании:
Заключение
В принципе это все архитектурные приемы, которые мне понравились в тех ETL инструментах, которыми я пользовался.
Кроме этого конечно в реальных системах есть еще сервисные процессы — авторизации, разграничения доступа к данным, автоматизированного согласования изменений, и все решения конечно являются компромиссом с требованиями производительности и предельным объемом данных.
7 ошибок ETL-разработчика
Проекты хранилищ данных уже давно являются частью IT-инфраструктуры большинства крупных предприятий. Процессы ETL являются частью этих проектов, однако разработчики иногда совершают одни и те же ошибки при проектировании и сопровождении этих процессов. Некоторые из этих ошибок описаны в этом посте.
Я хотел бы сразу сузить рамки обсуждения и договориться о терминологии:
В целом, суть большинства ошибок ETL-разработчика можно объяснить игнорированием жизненного правила с этой картинки.
В дальнейшем будут использованы примеры для DWH на базе Oracle 11g. Итак приступим.
1. Использование системной даты (или аналогичной функции) в бизнес-логике
Одна из самых простых и частых ошибок, особенно у неопытных разработчиков. Допустим есть бизнес-правило: во время «ночного окна для загрузки» выгружать заказы, которые были закрыты за этот день (по полю close_date). Результатом иногда бывает примерно такой sql statement:
insert into target_table
select from orders
where close_date >= sysdate() — 1
Даже если забыть про то, что sysdate() может содержать не только дату, но и время, то у нас с этим скриптом возникают проблемы в тот момент, когда регулярная работа ETL процесса нарушается по вполне банальным причинам (исходная система апгрейдится на новую версию, пропала связь с исходной системой, из-за нового процесса ETL закончилось место во временном tablespace и т.д.). Т.е. в тот момент когда наш ETL процесс нужно по каким-то причинам перезапустить или же приостановить на время и потом снова запустить. Также может произойти нечто интересное, если по какой-то причине этот процесс запустят дважды за день.
Решение у этой ошибки обычно простое: параметризовать вызов данного процесса, и если нужно, то использовать sysdate() как дефолтное значение с возможностью переопределения. Хотя использование поля типа datetime для обработки дельты с точки зрения сопровождения ХД не очень оптимально, и вместо него лучше применить дельту по некоему дискретному полю (например целочисленный id сессии загрузки или нечто подобное)
2. Профилирование данных не было сделано перед началом разработки
Даже самая документированная и разработанная по всем правилам и методикам исходная система обычно содержит в себе некорректные или неконсистентные данные, несмотря на многочисленные уверения ее разработчиков или команды поддержки. И полагаться на уверения в правильности с той стороны баррикад, обычно чревато проблемами в конце разработки. Любой источник данных (таблица, файл, xml, json и т.д.) должен быть проверен на соответствие логической модели DWH. Существуют различные инструменты для профилирования данных, как встроенные в ETL инструменты, так и независимые от них. Перечислю наиболее востребованные проверки:
Проверка #1: Уникальность идентификаторов и натуральных ключей исходных данных
Различие между идентификатором и натуральным ключом состоит в том, что идентификатор — это обычно некое суррогатное значение, которое технически идентифицирует строку, а натуральный ключ — это значение или комбинация значений, которые имеют бизнес-смысл.
Таблица order_details:
order_details_id | document_position | order_id |
35346346 | 10 | 1224114 |
35346365 | 20 | 1224114 |
…. | …. | …. |
35345464 | 10 | 1224438 |
В данном примере order_details_id — это идентификатор, а комбинация document_position+order_id — это натуральный ключ.
Пример: я участвовал в проекте по загрузке данных в DWH из распределенной системы (instance-based), в которой велся учет объектов сетевой инфраструктуры. Разработчики этой системы на голубом глазу уверяли, что id этого объекта является уникальным и даже показывали в исходной системе уникальный индекс на таблице в подтверждение своих слов. Подвох выявился не сразу: оказывается уникальность этих id существовала только в рамках одного инстанса системы, и когда попробовали загрузить все данные со всех инстансов, то получилась проблема с уникальностью. В результате пришлось менять модель данных и расширять натуральный ключ сущности «сетевой объект» дополнительным полем «инстанс», чтобы обеспечить уникальность.
Проверка #2: Типы данных
Если поле называется Order_nr, то в нем необязательно содержатся только числовые значения — там вполне могут быть буквенно-цифровые последовательности. Также всегда стоит проверять длину полей. Эта проблема обычно характерна для файловых источников данных — таблицы БД обычно хорошо типизированы.
Проверка #3: Ссылочная целостность (проверка FK)
То, что разработчик показывает ER-диаграммы своей исходной системы, показывает у себя на DEV-окружении существующие FK между таблицами, и вообще мамой клянется, что у него все под контролем, не является поводом не проверить существование «повисших» записей. Т.к. он может быть не в курсе, что на продуктивном окружении DBA уже отключил эту проверку для улучшения производительности (конечно, согласовав это с менеджером разработчика, т.е. никто не виноват). Также проблемы со ссылочной целостностью очень часто встречается для файловых источников данных. Также не стоит забывать о применении сценария late-arriving-data (например, если данные приходят согласовано сегодня, далеко не факт, что так будет и через полгода).
Проверка #4: NULL значения
Основная проблема NULL значений состоит в том, что NULL<>NULL, поэтому любые запросы с джойнами по полю, которое может содержать NULL, будут возвращать непредсказуемые результаты. Поэтому все важные поля стоит обернуть конструкцией nvl(). Существует отдельный холивар по поводу грузить NULL в неключевые поля или заменять на некие значения по умолчанию. Мне ближе идея о всеобщей замене NULLов для более стандартизированного подхода к использованию DWH, но я не берусь настаивать, что так нужно делать всегда.
Проверка #5: Даты
Проверки полей с датами обычно являются самыми усложненными, т.к. помимо стандартных проверок приходится учитывать то, что не все даты, которые являются допустимыми с точки зрения БД, являются таковыми с точки зрения DWH: дата «21-07-1007» вряд ли является допустимой для даты заключения договора на оказание услуг сотовой связи. При моделировании DWH обычно существуют т.н. даты «начала времен» и «конца времен» (возможны другие названия), и любая дата, не попадающая в этот диапазон времени должна заменяться на некое значение по умолчанию.
Отдельного упоминания заслуживают случаи использования типов данных вроде varchar(8) для хранения дат (в формате например ‘20151201’), т.к. количество проверок здесь должно быть еще больше.
3. Удаление дубликатов через GROUP BY или DISTINCT
Несмотря на то, что в DWH обычно грузятся все данные, которые приходят из источника, существуют сценарии, когда на вход приходят заведомо дублирующиеся данные. Но уникальность натурального ключа требует только одну запись из дубликатов. Существует два неправильных способа удаления дубликатов:
Неправильный способ #1: GROUP BY
Допустим, мы грузим адреса клиентов и знаем, что теоретически для одного клиента может прийти несколько записей с адресной информацией (обычно они являются полными дубликатами из-за проблем, например, с синхронизацией). Поддавшись желанию решить задачу «в лоб», разработчик может написать такой запрос:
insert into customer_address
select customer_id, max(street_name), max(house_nr)
from source_table
group by customer_id
Проблемы начнутся, если на вход придут две реально отличающиеся записи для одного клиента (например, была ошибка ввода оператора, которую он исправил, но в источник данных попали оба варианта записи):
customer_id | street_name | house_nr |
1321 | Moskovskaya str | 127 |
1321 | Pushkinskaya str | 34 |
Запрос может вернуть такой результат (в зависимости от локали):
customer_id | street_name | house_nr |
1321 | Pushkinskaya str | 127 |
Такой записи в исходных данных не было, и у пользователей DWH может возникнуть резонный вопрос, что вообще это такое? На самом деле здесь нарушено 3-е требование к ETL процессу: в DWH была загружена запись, которая не может быть отслежена до исходной системы, проще говоря, которой там нет. И это однозначная ошибка ETL разработчика.
Неправильный способ #2: DISTINCT
Второй вариант «решения в лоб» в описанном выше сценарии — это использовать для удаления дублирующихся записей DISTINCT
insert into customer_address
select distinct customer_id, street_name, house_nr
from source_table
В данном случае пара дублирующихся записей с разными атрибутами будет идентифицирована раньше, поскольку вместо одной получится две записи, и будет нарушена уникальность натурального ключа и ETL-процесс упадет с ошибкой.
Один из правильных способов
Как же стоит решать проблему наличия двух записей с одинаковым натуральным ключом, но разными атрибутами? Очевидно, что если в модель данных данных изменений не внести, то из всех записей должна быть выбрана одна единственная правильная. Выбирать ее нужно согласно заранее определенному критерию: если информация является довольно критичной, то можно реализовывать различные сценарии Data Quality, если же нет, то в качестве корректной записи брать последнюю загруженную.
insert into customer_address
select customer_id, street_name, house_nr from (
select customer_id, street_name, house_nr,
row_number() over (partition by customer_id order by change_datetime desc) row_num
from source_table)
where row_num = 1
В общем следует не забывать, что любая запись в DWH должна иметь возможность быть отслеженной до источника(ов) данных в зависимости от бизнес-правила и не создавать «необъяснимые» записи.
4. Использование «статичных» скриптов из исходных систем
Очень часто бизнес-логика для сущностей DWH приходит от разработчиков или аналитиков исходных систем в виде SQL скриптов. И это большое подспорье для ETL-разработчика, но, как говорится, «бойтесь данайцев, дары приносящих»: как правило эти скрипты фиксируют некое условно «статичное» состояние исходное системы в некоторый момент времени, а ETL-разработчик обычно занимается отслеживанием динамики в данных и загрузкой только изменений («дельта»). Что же должно настораживать в этих «статичных» SQL скриптах? Вот некоторые из:
Вроде бы все логично: грузить в нашу таблицу order_from_calls все заказы, на которые есть ссылка в таблице звонков, и для которых дата последнего изменения больше даты последней загрузки. А теперь представим, что обновление таблицы calls в DWH не произошло (например, она грузится из другой исходной системы и связь с ней по какой-то причине нарушена), и этот запрос не загрузил некоторые id заказов. После этого таблица calls была дозагружена правильно, и там эти пропущенные id заказов появились, но мы их уже не загрузим в таблицу order_from_calls, т.к. в таблице orders ничего не поменялось и новые запуски этого запроса ничего не дадут. Поэтому в данном случае отслеживать дельту нужно не только по таблице orders, но и по таблице calls.
5. Разработка на небольшом объеме данных для разработки
Как правило, ETL-разработчику для разработки на DEV-окружении выгружается небольшая часть данных из продуктивной системы, на которой и предлагается вести разработку и отладку работы ETL-процессов. К сожалению, разработанные на таком малом объеме данных решения обычно приводят к различным проблемам на продуктивной системе, таким как недостаточная производительность, нехватка места для промежуточных таблиц (например, разработчик решил красиво разнести шаги бизнес-логики по набору промежуточных таблиц, последовательно перегружая из одной в другую — а вот в продуктивной системе данных оказалось слишком много, и tablespace для временных таблиц скоропостижно закончился).
К сожалению, эту ошибку ETL-разработчик не всегда может решить самостоятельно, из-за различных регламентов и инструкций, отсутствия бюджета на полноценное DEV-окружение с тем же объемом данных как на продуктиве и т.д. Таким образом, это стоит рассматривать как проектный риск.
Одним из выходов является дробление этапов проекта на более мелкие и делать релизы более часто, чтобы идентифицировать такие проблемы не в конце проекта, а хотя бы посередине.
6. Неправильное использование технических и бизнес дат
В DWH существует 2 типа дат: бизнес-даты и технические даты. Разница у них в происхождении: бизнес-дата — это та дата, которая пришла из источника данных или была создана по бизнес-правилам; техническая дата — это дата, которая была сгенерирована ETL процессом либо самим DWH. И очень часто их используют неправильно:
#1 Бизнес-даты используются как технические даты
Если сущность историзируется как SCD2 (Slowly Changing Dimension type 2) и в источнике данных есть поля «_from» и «_to», которые ETL разработчику предлагается использовать в качестве диапазонов валидности данных, то у него должны быть просто железобетонные гарантии того, все диапазоны валидности для каждого натурального ключа будут: 1) непересекающимися, 2) между диапазонами не будет разрывов, 3) объединение этих диапазонов дат будет совпадать с диапазоном даты «от начала времен» до «конца времен» установленных для вашего DWH (это могут быть например пары дат «01.01.1000» и «31.12.9999», или «11.11.1111» и «09.09.9999»). Как правило, разработчики исходных систем мало заморачиваются, и если правило «непересекающихся диапазонов дат» обычно соблюдается, то со 2-м и 3-м пунктом обычно возникают проблемы. В любом случае, общей рекомендацией является не использовать бизнес-даты для SCD2, а генерировать свои технические даты.
#2 Технические даты используются как бизнес-даты
Очень часто источники данных не поставляют поля для отслеживания каких-либо контрольных дат: например, документ имеет только статус закрытия, но не метку времени, когда это событие произошло, и в качестве решения предлагается использовать технические даты «_from» и «_to», которые были сгенерированы ETL процессом. Однако это решение работает до первого сбоя ETL процесса (например, остановки ETL процессов на пару дней): сбой произошел в понедельник, восстановление наступило в среду, и т.к. исходная система вполне себе работала все это время, все созданные документы будут загружены как созданные в среду. В общем случае, сценарий «историческая правда» не реализуем, если источник данных не поставляет всех нужных пользователям дат и может быть лишь сэмулирован (с помощью технических дат), но в таком случае этот сценарий должен быть проговорен и описан в документации, чтобы через год пользователи не удивлялись нулевому количеству закрытых документов в понедельник и вторник, а также тройному количеству их в среду.
7. «Механическая» реализация
Это одна из самых сложных для идентификации ошибок и, по правде говоря, не является ошибкой именно ETL разработчика, а скорее архитектора DWH. Но над проектом работает же команда, и коллег выручать тоже надо.
Иногда так случается, что целевая сущность в DWH была неправильно смоделирована исходя из-за расхождений в терминологии для разработчика исходной системы и архитектора. Разработчик исходной системы мыслит категориями своей исходной системы, архитектору DWH же необходимо продумывать различные интеграционные схемы, как связать в едином DWH множество объектов из разнородных исходных систем.
Опишу на примере сущности «клиент» как одной из типичных для такого рода проблем: в источнике данных есть таблица «customer», имеющая уникальный натуральный ключ, ссылочная целостность в порядке. На основе этой таблицы в DWH была создана сущность «customer». Исходя из названия, логично предположить, что одна запись в этой таблице должна соответствовать одному клиенту, но фактически выяснилось, что на самом деле один и тот же реальный клиент мог иметь несколько записей с одними и теми же атрибутами, но разными натуральными ключами. И это привело бы к неприятной коллизии для пользователей DWH, которые использовали эту сущность, например, для подсчета общего количества клиентов компании. В результате было принято решение разделить эту сущность на две: «customer_record» и «customer», связанные через FK отношением M:1.
А если бы ETL разработчик «механически» реализовал все по спецификации, то он бы конечно был бы не виноват, но у него была возможность заметить это, т.к. в любом случае он по сравнению с архитектором работает условно говоря «на земле», в отличие от «витающего в облаках» архитектора.
В целом можно упомянуть некоторые симптомы «механической» реализации:
Побег от скуки — процессы ETL
В конце зимы и начале весны, появилась возможность поработать с новым для меня инструментом потоковой доставки данных Apache NiFi. При изучении инструмента, все время не покидало ощущение, что помимо официальной документации, нелишним были бы материалы «for dummies», с практическими примерами.
После выполнении задачи, решил попробовать облегчить вхождение в мир NiFi.
Предыстория, почти не связанная со статьей
В феврале этого года поступило предложение разработать прототип системы для обработки данных из разных источников. Источники предоставляют информацию не одновременно и в разных форматах. Нужно было данные преобразовывать в единый формат, собирать в одну структуру и передавать на обработку. Перед обработкой необходимо было провести обогащение из дополнительных справочников. Результат обработки сохраняется в две базы данных и генерируется суточный отчет.
Задача укладывалась в концепцию ETL. Поиск реализаций ETL привел на довольно большое количество ETL-систем, которые позволяют выстроить процессы обработки. В том числе opensource системы.
Система Apache NiFi была выбрана на удачу. Прототип был построен и сдан заказчику.
Первоначально заказчик хотел монолитное приложение, а использование NiFi рассматривал только как инструмент прототипирование (где-то прочитал). Но после знакомства в вблизи — NiFi остался в продукте.
А теперь собственно история
После сдачи работы заказчику, интерес к ETL не пропал. В апреле наступила самоизоляция с нерабочими днями и свободное время нужно было потратить с пользой.
Когда я начал разбираться с Apache NiFi выяснилось, что более-менее подробная информация есть только на сайте проекта. Русские статьи во многом просто переводы вводных частей официальной документации. Основной недостаток — часто не понятно в каком формате параметры вводить. Гугл спасает, но не всегда.
И появилась идея написать статью с описанием практического примера работы системы — введение для новичков. В комментариях, дорогой читатель, можно высказаться — получилось или нет.
И так, задача — получать данные о распространении вируса, обрабатывать строить графики.
Источниками данных будет сайты “стопкороновирус.рф» и «covid19.who.int». Первый сайт содержит данные по регионам России, сайт ВОЗ — данные по странам мира.
Данные на сайте «стопкороновирус.рф» находятся прямо в html-коде страницы в виде почти готового json-массива. Нужно его только обработать. Но об этом в следующей статье.
Данные ВОЗ находятся в csv-файле, который не получилось автоматически скачивать (либо я был недостаточно настойчив). Поэтому, когда нужно было, файл сохранялся из браузера в специальную папку.
Если кратко сказать о NiFi — система при выполнении процесса (pipeline), состоящего из процессоров, получает данные, модифицирует и куда-то сохраняет. Процессоры выполняют работу — читают файлы, преобразовывают один формат в другой, обогащают данные из других баз данных, загружают в хранилища и т.д. Процессоры между собой соединены очередями. У каждой очереди есть признак, по которому данные в нее загружаются из процессора. Например, в одну очередь можно загрузить данные при удачном выполнении работы процессора, в другую при сбое. Это позволяет запустить данные по разным веткам процессов. Например, берем данные, ищем подстроку — если нашли отправляем по ветке 1, если не нашли — отправляем по ветке 2.
Каждый процессор может находиться в состоянии «Running» или «Stopped». Данные накапливаются в очереди перед остановленным процессором, после старта процессора, данные будут переданы в процессор. Очень удобно, можно изменять параметры процессора, без потери данных на работающем процессе.
Данные в системе называются FlowFile, потоковый файл. Процесс начинает работать с процессора, который умеет генерировать FlowFile. Не каждый процессор умеет генерировать потоковый файл — в этом случае в начало ставится процессор «GenerateFlowFile», задача которого создать пустой потоковый файл и тем самым запустить процесс. Файл может создаваться по расписанию или событию. Файл может быть бинарным или текстовым потоком.
Файл состоит из контента и метаданных. Метаданные называются атрибутами. Например, атрибутом является имя файл, id-файла, имя схемы и т.д. Система позволяет добавлять собственные атрибуты, записывать значения. Значения атрибутов используются в работе процессоров.
Содержимое файла может быть бинарным или текстовым. Поток данных можно представить в виде массива записей. Для записи можно определить их структуру — схему данных. Схема описывает формат записи, в котором определяются имена и типы полей, валидные значения, значения по умолчанию и т.д.
В NiFi еще одна важная сущность — контроллеры. Это объекты которые знают как сделать какую-то работу. Например, процессор хочет записать в базу данных, то контроллер будет знать, как найти и подключиться к базе данных, таблице.
Контролер, который знает как читать данные на входе в контроллер называется Reader. После обработки, на выходе процессора, данные форматируются контроллеру который называется RecordSetWriter.
Итак, практическое применение вольного изложения документации — чтение данных ВОЗ и сохранение в БД. Данные будем хранить в СУБД PostgreSQL.
В таблице будет храниться дата сбора данных, код и наименование страны, код региона, суммарное количество погибших, количество погибших за сутки, суммарное количество заболевших, количество заболевших за сутки.
Источником данных будет cvs-файл с сайта https://covid19.who.int/ по кнопке “Download Map Data”. Файл содержит информацию по заболевшим и погибшим по всем странам на каждый день примерно с конца января. Оперативная информация там задерживается на 1-2 дня.
За время эксперемента, в файле менялись наименования полей (были даже наименования с пробелами), менялся формат даты.
Файл сохраняется из браузера в определенный каталог, откуда NiFi забирает его на обработку.
Общий вид визуализации процесса в интерфейсе Apache NiFI
На рисунке большие прямоугольники — процессы, прямоугольники поменьше — очереди между процессами.
Обработка начинается с верхнего процесса, отходящие от процесса стрелочки показывают направление движения FlowFile (атрибутов и контента).
На визуализации процесса показывается его статус (работает или нет), данное имя процессу, его тип, количество и общий объем прошедших файлов на входе и на выходе за период времени, в данном случае за 5 мин.
Визуализация очереди показывает ее имя, и объем данных в очереди. По умолчанию имя очереди это тип связи — success, failed и другие.
Тип первого процесса «GetFile». Этот процессор создает flowfile и запускает процесс.
В настройках процессора на вкладке Scheduling указываем расписание запуска процесса — 20 секунд. Каждые 20 секунды процессор проверяет наличие файла. Если файл найден — процесс запускается. Контентом потокового файла будет содержимое файла
Вкладка Scheduling — запуск каждые 20 сек.
Как видно из рисунка, указываем каталог и имя файла. В имени файла можно использовать символы подстановки. Например «*.csv» приведет к обработке всех csv-файлов в каталоге. Указываем также, что после обработки файл можно удалить («Keep Source File»). Также есть возможность указать максимальные и минимальные значения возраста и размера файла. Это позволяет обрабатывать, например, только не пустые файл, созданные за последний час.
На вкладке Settings указываются базовые параметры процесса, такие как имя процесса, максимальное время работы процесса, время между запусками, типы связей.
Результатом работы первого процесса «GetFile» с именем «Read WHO datafile» будет просто поток данных из файла. Поток будет передан в следующий процесс «ReplaceText».
Процессор поиска подстроки
В этом процессе обратим внимание сразу на вкладку параметров. Данный процессор ищет regex-выражение «Search Value» в входном потоке и заменяет на новое значение «Replacement Value». Обработка ведется построчно («Evaluation Mode»). В данном случае идет замена в строке даты. Во входном файле, в какой-то момент дата формата YYYY-MM-DD стала указываться как YYYY-MM-DDThh:mm:ssZ, причем время было всегда 00:00:00, а временная зона не указывалась.
Простого способа преобразования в даты уже в записи не нашел, поэтому к проблеме подошел в “лоб” — просто через процессор «ReplaceText» убрал символы T и Z. После этого строка стала конвертироваться в timestamp в avro-схеме без ошибок.
На выходе процессора будет поток текстовых данных, в которых уже поправили подстроку даты. Но пока это просто поток байтов без какой-то структуры.
Следующий процессор «Rename fields» читает поток уже как структурированные данные.
Переименование полей
Процессор содержит ссылку на Reader – специальный объект-контроллер, который умеет читать из потока структурированные данные и в таком виде уже передает процессору на обработку. В данном случае «WHO CVS Reader” просто читает поток и преобразует каждую строку cvs-файла в запись (record) которая содержит поля со значениями из строки. Имена полей берутся из заголовка cvs-файла.
Контроллер чтения записей из cvs-файла
Параметр «Schema Access Strategy» указывают, что структура записи формируется из заголовка cvs-файла. Если заголовков нет, то можно изменить стратегию доступа к схеме и в реестре схем данных создать схему, указать ее имя в параметре «Schema Name» или еще проще — указать саму схему в параметре «Schema Text».
Но так как у нас есть заголовки в файле — читаем по ним.
Итак, в процессоре данные уже структурированные — их можно представить как таблицу базы данных — поля и строки. И к этой таблице мы выполняем SQL запрос :
Поля в запросе такие же как заголовки cvs-файла. Имя таблицы служебное — FLOWFILE – обозначает чтение структурированных данных их контента файла. Язык запроса SQL довольно гибкий, есть функции преобразований, агрегаций и т.д. В данном случае запрос выводит все данные, только имена полей результата будут другие — они соответствуют полям таблицы who_outbreak в целевой БД.
Поток записей с новыми именами полей передается в контроллер RecordSetWriter, ссылка на который также указана в параметра контроллера — «WHO AvroRecordSetWriter».
Контроллер RecordSetWriter
Контролер RecordSetWriter уже использует предопределенную схему данных. Схема находится в отдельном объекте — регистре схем («Schema Registry»). В контроллере есть только ссылка на реестр схем и имя схемы.
Регистр схем
Работать с регистром схем довольно просто. Добавляем новый параметр. Его имя — будет именем схемы. Значение параметр — определение схемы.
В регистре схем создана схема who_outbreak, определение схемы:
Имена и типы атрибутов схемы соответствуют именам и типам полей записи, сформированной sql-запросом.
После выполнения контроллером sql-запроса и передачи данных на выход контроллера в формате схемы данных, структурированный поток передается в контроллер «Delete all records». Это контроллер типа «PutSQL», который может передавать на выполнение sql-команды.
Файл источник содержит полный набор данных с начала эпидемии, поэтому удаляем все строки из таблицы, чтобы загрузить данные в следующем процессоре. Это работает быстрее, чем проверка— какие данные есть в таблице, а каких нет и их нужно добавить.
delete from who_outbreak;
В параметрах контроллера указываем SQL Statement “delete from who_outbreak;” и ссылку на пул соединений «JDBC Connection Pool». Параметры JDBC стандартные. Пул содержит настройки подключения к конкретной БД, поэтому его можно использовать во всех контроллерах, которые будут работать с этой БД.
Данные или атрибуты FlowFile не обрабатываются в процессоре, поэтому вход и выход процессора идентичен.
Последний процессор «PutDatabaseRecord».
Запись в БД
В этом процессоре указываем Reader, в котором используется определенная схема who_outbreak. Так как мы удалили все записи в предыдущем процессоре, используем простой INSERT для добавления записей в таблицу. Указываем пул соединений DBCPConnectionPool, далее указываем БД и имя таблицы. Имена полей в схеме данных и БД совпадают, то больше никакой дополнительной настройки проводить не нужно.
Все процессоры, контроллеры и регистры схем нужно перевести в состояние Running (Start).
Процесс доставки данных готов. Если положить файл WHO-COVID-19-global-data.csv в каталог D:\input, то в течении 20 секунд он будет удален, а данные пройдя через процесс доставки данных будут сохранены в БД.
Сейчас вынашиваю планы на вторую часть статьи, в которой планирую описать второй процесс ETL — чтение данных с сайта, обогащение, загрузка в БД, в файлы, расщепление потока на несколько потоков и сохранение в файлы. Если тема интересна — пишите в комментарии, это мне прибавит стимула быстрее написать.
На рисунке изображение в интерфейсе Apache NiFi описанного процесса (справа) и, для затравки, процесса для второй статьи (слева).