"Streaming solution" How to ensure data streaming from databases to cloud solutions such as GBQ using Symfony 6, RabbitMQ and Kafka? [ukr]

Talk presentation

Let's talk about how we created a stable streaming of data from a relational database to GBQ. Which technologies we chose and what influenced our choice. Our architect solutions that will allow the system to scale usage with other data warehouses. How we built our producers and consumers using the Rdkafka library and why we also needed RabbitMQ.

Oleksii Shamuratov
Brainstack_
  • Data engineer at Brainstack_
  • I have been programming in PHP for 6 years. I also program in Python, Go
  • Lecturer at Lviv Polytechnic University
  • Currently I am engaged in the construction and maintenance of data warehouse and ETL processes
  • LinkedIn, Facebook

Talk transcription

Вітаю всіх, хто зібрався. Давно я не виступав офлайн, можливо, ще в 2019 році, перед початком пандемії COVID-19. Дуже приємно знову виступати вживу і бачити людей. Отже, сьогодні ми поговоримо про те, як ми розробляли наше стрімінг-рішення, використовуючи Symfony. Спочатку у нас був лише Revit. Я потрапив до цієї компанії, коли аналітика тільки розвивалася, і ми вирішили створити свій власний сервіс. Ми розглянемо першу версію, помилки, які ми допустили, і чому ми обрали Kafka замість Revit.

Ми також розглянемо логіку стрімінгу і ті проблеми, які ми вирішили, модифікуючи наш стрімінг-флоу. Спочатку визначимо місце стрімінгу в наших бізнес-процесах. У нас є зовнішні джерела даних, провайдери, рекламні кабінети та інші, з яких дані потрапляють у наші data warehouses. Аналітичні скрипти та інструменти працюють з цими даними у наших аналітичних data warehouses. Нам потрібно забезпечити постійну актуальність даних в аналітичних data warehouses, тому ми розробили власний стрімінг-сервіс.

Ми вирішили мати повний контроль над тим, які події спричиняють стрім, такі як виконання ETL-пайплайнів та аналітичних скриптів. Ми також виконуємо різні логічні операції після завершення стрімінгу, такі як побудова звітів чи виконання аналітичних скриптів. Наш перший стек включав Symfony 6, RabbitMQ та таблицю конфігурації стріма в базі даних аналітики. Вибір Symfony був обумовлений тим, що всі розробники знали або Symfony, або Python. Крім того, Symfony та PHP працюють швидше, ніж Python. Ми також розглядали наш стек інструментів, використовуючи RabbitMQ як брокера повідомлень для передачі батчів до місця призначення на стрімінгу. Старий флоу був перевантажений через послідовні додавання різних функціональностей. Спочатку все запускалося по крону, а потім запускалася крон-команда кожні 10 хвилин, яка викликала весь стрімінг-сервіс.

Потім ми вирішили, що довго чекати запуску всіх стрімів — це неефективно. Тому ми додали можливість запуску через Webhook. Для цього ми додали додатковий обробник, який просто викликав цю команду, в якій вже містилася ідея побудови стріму. Починали ми з генерації батчів, спочатку генеруючи їх на Тріції. У нас була велика таблиця з унікальними ID, які генерувалися секвенсом в Postgres або автоінкрементом в MySQL. Щоб зменшити час виконання команди, ми спочатку вибирали діапазон ID на Patricia — наприклад, ID 1, потім 30 тисяч ID — і відправляли їх у наш pre-abstream handler.

Pre-abstream handler генерував готові батчі, які потрапляли до pre-abstream handler, а потім вже відправлялися у призначення, наприклад, Google BigQuery. Розмір батчів був обмежений до 2 тисяч, щоб уникнути помилок при HTTP-запитах. Після обробки останніх батчів визначалися як останні, порівнюючи максимальну ID з кількістю записів, і відправлялися в temp-finalist. Він порівнював кількість записів у джерелі та призначенні, і у разі невідповідності вважав, що стрім виконався неуспішно. Якщо все виконалося успішно, викликали post-finalist, який виконував логіку після стріму, таку як виконання скриптів аналітичних або побудова звітів.

Ми використовували черги з пріоритетами для відправки стрімів: 50 стрімів з першим пріоритетом, 100 з другим і 100 з третім. Але це створювало проблеми, оскільки ми не могли точно визначити, на якому стрімі відбувалася зупинка. Для вирішення цих проблем ми вирішили додати логування помилок та створити таблицю з логами, в яку записували інформацію про виконання стрімів. Також ми виявили проблеми зі збудованням запитів та жорсткою прив'язкою до конкретних джерел та призначень. У нас були проблеми з обробкою помилок 500, time-outs та затримками при вибірці даних. Нам також було не зручно, що ми були прив'язані до конкретних баз даних. Ми також вирішили проблему, додаючи таблицю з логами, в яку записували інформацію про виконання стрімів. Цей додаток дозволяє нам бути більш гнучкими та ефективними в роботі зі стрімінгом даних.

Отже, ми вирішили відмовитись від RabbitMQ з кількох причин. По-перше, важко було контролювати процеси стріму в цьому середовищі. По-друге, необхідно було переписати логіку ретраїв, щоб вони не відправлялися в кінець черги. Ми також хотіли, щоб всі бачі були доступні для перегляду, які вже відправлені в стрім, незалежно від того, чи вони оброблені чи ні. У новій архітектурі ми залишили Symfony 6 через той факт, що багато коду вже було написано, і він працював. MySQL був використаний для створення нормальної бази даних для нового сервісу. Kafka був обраний для обробки апстрімів, а RabbitMQ залишили для івентів, таких як ініціалізація стріма.

Ми винесли стейти стріма та дані, які можна було б закешувати, в окремі таблички в Redis, щоб полегшити обробку стрімінгу. Основна ідея бази даних - це поля, які користувач заповнює ручним чином. Тільки сервіс має доступ до полів, які міняються динамічно, для уникнення непередбачених змін. Ми обрали Kafka через можливість створювати топіки легко та миттєво. В Kafka Drop можна слідкувати за прогресом стріму, оскільки всі бачі зберігаються там під час обробки. RabbitMQ залишили для обробки окремих івентів. У новому стрімінг флоу ми спростили команди, патрони та побудову патрицій через зміни у вигрібанні даних з джерела. Відмовились від використання row number, який раніше був необхідний.

Отже, ми вирішили використовувати ітеративний підхід з PDO fetch all та генератором. Бачі будуються залежно від конфігурації, створюються більше 2000 бачів. Prep StreamHandler був відмінений, оскільки час виконання команди був зменшений, і тепер виглядає все простіше. Використовуєм бібліотеку RD Kafka для роботи з Kafka, збільшили її функціонал, написавши власні Kafka Consumer та Kafka Producer. Вирішили відправляти повідомлення транзакційно, щоб уникнути дублікатів бачів у випадку відмови коннекшну до бази. Для консьюмера створили можливість підписуватися на декілька топіків для оптимізації роботи з багатьма stream jobs.

Для полегшення додавання source та destination створили інтерфейси та upstream processor service, який відправляє в Kafka upstream batch та обробляє їхнє споживання. Для взаємодії з різними базами даних створили інтерфейси та класи, такі як DynamoDB, які використовуються в AppStream Service Processor. Створили source-інтерфейс для отримання полів, підключення, перевірки таблиці на порожність та функції вироблення upstream batch. Також створили destination-інтерфейс для відправлення upstream batch в destination та ряд додаткових функцій. Для побудови AppStream Message вирішили використовувати базовий клас Kafka та додали деякі додаткові поля, такі як MaxRetries і xlast, для полегшення обробки та ретраїв повідомлень. Цей опис детально розкриває наш підхід до стрімінгу даних та роботи з різними джерелами та призначеннями.

І далі, якщо ми хочемо додавати нові destination і те, як обробляються у нас отримані App StreamMessage, ми просто додаємо нові повідомлення. Тобто, як і в Google BigQuery, можна додати DynamoDB і все інше. Ось, ми також переробили логіку ретраїв. Стара логіка на Symfony Messenger передбачала, що ми просто відправляли повідомлення в чергу знову, що викликало постійні проблеми, оскільки ми вважали, що ми не застрягли у стрімі. Нову логіку ми змінили на те, що весь стрім, фактично, зупиняється, поки одне повідомлення не пройде або досягне максимальної кількості спроб, або не буде успішно виконано.

Чому? Оскільки, зазвичай, помилки, такі як 500 від Google, тобто коли їхнє API відмовило, або коли ми перевищили ліміти, немає сенсу пробувати відправляти інші повідомлення туди. Доки це повідомлення та помилка не будуть виправлені, ми не намагаємося відправляти інші повідомлення туди. Коли досягається максимальна кількість спроб ретраїту, ми відправляємо їх в так звану dead letter queue, яку у Kafka називають топіком, з якого вони виходять, і приставкою dead letter queue. Ще одна ситуація, яка траплялася, це коли ми визначили розмір партії (batch size) 2000, але таблиця складалася з 60 колонок.

Але іноді скрипти аналітики переписувалися, і тепер таблиця може мати 120 колонок. Оскільки ми не змінювали розмір партії і не були в курсі цього, можливо, виникала помилка, така як "request entity too large" від Google. Щоб уникнути цього, коли ретраїтимо dead letter queue, ми розбиваємо партію з 2000 записів на 2000 повідомлень і відправляємо їх по черзі. І надсилаємо собі повідомлення, щоб переглянути можливість зміни розміру партії, оскільки сталася помилка, що ми не вмістилися в цей розмір. Після перенесення всього на Kafka у нас тепер чудово видно, які стріми виконуються в даний момент. Тобто, топік створюється, коли стрім запускається і знищується, коли стрім успішно оброблений. Легко зайти в будь-який з цих топіків і подивитися, як вони працюють, на якому офсеті в даний момент відбувається обробка цього стріма. Взагалі, ми дуже раді, оскільки вирішили проблему того, що багато стрімів падало, хоча насправді вони не впадали. І ще один плюс тепер у нас, завдяки тому, що ми визначаємо, коли стрім закінчився, отримавши останнє повідомлення з Кавки, ми точно знаємо, що ми обробили останнє повідомлення.

Також ми зменшили час виконання стрімів, зменшивши флоу до більш лаконічного вигляду. Ми видали складні запити до бази даних з VAIN запитів, зменшили на етапі апстріма кількість запитів до бази даних, і, написавши свої консьюмери, змогли їх оптимізувати. Також вирішили проблему визначення завершення обстріма, яка раніше довго нас турбувала. І, головне, тепер нам легко додавати нові source та destination, що, в принципі, буде дуже актуально при розвитку департаменту аналітики. Ось. Чи є питання?

Sign in
Or by mail
Sign in
Or by mail
Register with email
Register with email
Forgot password?