Квитки на наступну конференцію Конференція Highload fwdays'24 вже у продажу!

Mastering stream-to-stream JOINs in distributed stream processing platform [ukr]

Презентація доповіді

Можливості людства по залізу та програмному забезпеченню досягли рівня, коли прийняття рішень стало можливим у реальному часі з миттєвою реакцією на нову інформацію, що надходить у систему. Однією з ключових можливостей таких систем є їх здатність об'єднувати (join) потокові події в реальному часі та зручно використовувати цю можливість мовою запитів SQL, поєднуючи з іншими стадіями обробки інформації цією мовою. Дана доповідь занурить глядача у шлях розробки такої нетривіальної можливості.

Олександр Сиротенко
Hazelcast
  • Senior Software Engineer у Hazelcast
  • Народився та виріс на Київщині. Закінчив КНУ ім. Т. Шевченка, факультет кібернетики, його мейджор - компілятори
  • Обожнює розбиратися у системних тулах, та робить їх краще, наразі у Hazelcast
  • Володар прекрасного песика Akitu Inu
  • Мріє здобути PhD у Computer Science.
  • GitHub

Транскрипція доповіді

Так, всім привіт. Коротше кажучи, ця доповідь присвячена Stream-to-Stream Join в розподілених системах обробки даних. Якщо комусь цікаво, є доступний QR-код. Трошки зачекайте, і презентацію можна переглянути за допомогою цього QR-коду. Багато піару на роботі, сподіваюся, що всі, хто хотів, вже розповсюдили інформацію. Перед вами Сашко, я працюю в Hazelcast вже більше 2,5 років, а в сфері програмного забезпечення - понад 6 років. Я також захоплююся розробкою складних систем і прийшов у цей напрямок з компіляторів. Компілятори - моє кохання, вибачте за це. Те, що мене надихає, - це хімічне з'єднання, відоме як дофамін. Я дуже люблю програмування, тому після доповіді буду радий спілкуванню. Щодо контенту доповіді, ви можете очікувати огляд архітектури і двигуна потокової обробки на деякому рівні абстракції. Хоча я не буду глибоко заглиблюватися, але розкажу про деякі рівні абстракції. Також поділюся досвідом вирішення складних задач у такій системі, зокрема розподілений об'єднання потоків. Можливо, чекають меми, але це не точно.

Почнемо з того, що таке Hazelcast. За термінами теореми CAP, це розподілена обчислювальна платформа реального часу. Колись це було відомо як IMDG (розподілена сітка даних), але тепер це Face Data Store. Фактично це розподілений кеш або розподілене сховище ключ-значення. Другою компонентою, над якою я працюю, є Streaming Data Engine, або JET. Крім того, в якості знавця компіляторів, я також працював над SQL Engine. У Hazelcast є багато інтерфейсів до зовнішніх систем, таких як Kafka, Hadoop, Elastic, JDBC та інші.

Платформа також має механізми геореплікації, захист від Split Brain та CP Subsystem на основі Raft. Інші можливості включають доступ через SQL, Java, C-Sharp і Python API. Інші мови, такі як JavaScript, можуть також бути використані. Щодо масштабування, Hazelcast має різні етапи, включаючи процес meta-supplier та розсилання менших задач на вузли кластера для подальшого розподілу. Це лише узагальнене уявлення про те, як працює двигун потокової обробки у Hazelcast.

Tобто це стандартний ETL для партійної обробки. Тобто, по суті, ми тут підраховуємо кількість слів у якійсь книжці. І взагалі, як масштабується це тепер вже картинкою. Бо люди люблять картинки. Тобто у нас є такий процесор-sappler для даної задачі. Тобто у нас є такий початковий джерело. Ми якось цю задачу, над даними, трансформуємо. Потім агрегуємо. І ось, між вузлами transform та aggregate, ребро графа помічене як broadcast. Тому що, коли у вас є багато вузлів у вашому кластері, ви хочете порахувати точно усе. А так як ми маємо цю задачу. Ми також масштабуємо наші джерела. Тобто у нас буде їх багато різних. Припустимо, у нас в кластері 2 на D. Ми їх масштабуємо таким чином на процесорі та sappler. А потім вже створюються процесори, які викачують інформацію з джерел. Якось їх трансформують. А потім передають broadcast всю інформацію в етап агрегації.

Так як ми рахуємо, ми точно хочемо порахувати усе. І потім ми якось його збираємо. Але це доповідь не про Jet, а про реальний час. І от одного разу до нас прийшов клієнт. І буквально у нього був такий запит. Він каже, "Я хочу знати, скільки у мене коротких, виконаних замовлень." Ну і це власне є Stream-to-Stream Joint. Тобто у нас є якийсь потік подій замовлень. І потік подій доставки. Тобто замовлення зроблено, доставка зроблена. І вони хотіли знати це в реальному часі. І це, скажімо так, найпростіший приклад, який я взагалі міг витягнути з нашої внутрішньої тули. Тому що багато було пропозицій. І це був найкращий синтаксис для SQL. Ну добре, коли задача стоїть, і ми її проєктуємо, давайте подумаємо, як ми можемо власне це зробити і масштабувати. Тому що це не просто кусок коду, який ми хочемо виконати. А це треба ще масштабувати, щоб воно добре працювало.

І первісна ідея яка? Оскільки це Joint, операція злиття, або з'єднання потоків. У нас є лівий та правий буфери. Припустимо, це масиви. Далі. У нас є припущення, що для того, щоб лімітувати кількість, власне, елементів у цьому буфері, нам потрібно якось лімітувати. І ми вирішили це зробити по часу. Тому у нас потоки обов'язково маркіровані часовими мітками. І дані, власне, тут треба розрізняти. Є дані, а є часові мітки. Це дві різні сущності, але вони друг з другом зв'язані. Тобто є часові мітки, які приходять і кажуть, який це час. І є дані, у яких є помічені цими мітками поля. Часові рамки ми думали, оскільки це дійшло у формі SQL, і більшість запитів від клієнтів це був SQL, ми одразу вирішили це робити лише для SQL.

І одразу при Major Optimization, оскільки ми знаємо, що дані маркіровані, і тобто у нас є порядок по часу, ми можемо одразу замінити масиви на кучі. І кучі з компаратором по часу. Ну, власне, простий алгоритм. Коли надходять дані, і якщо подія спізнилася, то ми ігноруємо. Далі з'єднуємо і зберігаємо в буфері. Ну, наша проста ідея, і також з часовою міткою. Добре, відновлюємо останню бачену мітку для входу, видаляємо з буферів прострочені події, оскільки ми лімітуємо наші буфера, то ми маємо з лівого чи правого, в залежності від буфера, у який прийшла часова мітка, повидаляти усе старе, що нам вже не потрібно. Ну, і от, для прикладу, самий чистий, самий простий запит до Hazelcast. От у такому вигляді. Тобто, ми лімітуємо нашу мітку. Лімітуємо наш час за допомогою цього бітвіну. І, власне, з'єднуємо. Окей. Тобто, на етапі вирішення і проєктування перед нами стали виклики. А ми можемо з'єднувати два чи більше потоків? Ну, joint – це ж можна і зліва, і з права докидувати, яка різниця скільки.

А пам'ять в такому разі? Якщо декілька – 10, 20, 30 – що робити з пам'яттю? Також має значення порядок. У якому порядку з'єднувати? Наскільки це критично? Це також було питанням. А який взагалі синтаксис має виглядати? А точно отакий синтаксис має бути? Може, синтаксис якийсь інакший? А якщо ми хочемо це робити також для доступу з Java API, то який має бути вигляд для цього? І, насправді, ми вирішили… Так.

Сконцентруватися на цих двох пунктах. Тобто з пам'яттю, припустимо, ми вирішили не рухатися далі того, що вже зробили на етапі планування. Але порядок та з'єднання двох чи більше потоків ми зараз роздивимось. Тобто це також валідний SQL-запит. І отут починаються джунглі. Тому що є таке поняття як «часові мітки». І у нас вони… Тобто на що вони відповідають? На які питання вони відповідають? Подія у потоці відбулася пізніше чи раніше події? Або по-іншому представимо. Що таке зараз у потоці? Тобто ми слідкуємо за часом. Що таке зараз? І от «часові мітки» і відповідають.

Але для цього ми маємо зробити цей потік, власне, маркованим. Тому що потік може не мати. І це також окей. Але якщо ми з'єднуємо і робимо з'єднання з якимось лімітом, то ми, власне, лімітуємо це «часовою міткою». І як, власне, виявляються події, які спізнилися, припустимо. От у нас є події, і у них є такі «часові рамки». І ми, власне, поміж них приходимо… Приходять так звані «часові мітки». І от у нас прийшла подія, і у неї час 8.13. І допустимий лаг для цієї події – дві умовних одиниці. І після цього ми можемо реордерити.

Тобто переставляти у правильному порядку. Тому що, власне, «часові мітки» допомагають нам в цьому. І в нас також є допустимий лаг, але тут важливий максимальний лаг. І, власне… Ці «часові мітки» і упорядковуємо, переупорядковуємо наш потік. Але це, як би, досить простий концепт. Важливе тут як вони розподілені. І якщо ви пам'ятаєте слайд, як джет розподіляється по процесорам на усіх машинах, можна зрозуміти, що на усіх машинах буде різний час. Припустимо, в рамках однієї машини бути один час, але нам його потрібно синхронізувати. Тобто нам потрібно якось звіряти годинники. І для цього є така штука, як англійською «coalescing», але я української мови не зміг знайти замінника, тому нехай вони зливаються.

І це виглядає приблизно так. Тобто оці два різні потоки – це події на двох різних серверах. І ви кажете, отут… Тут у мене 8.13, 8.11, добре. І є часова мітка. І вона не співпадає для… Тобто вона різна на різних серверах. І коли надходить на наступний оператор ця часова мітка, вона має бути вже злита. Тому треба приймати рішення про те, що вона зливається у мінімальне значення. Тобто в нас цей режим по дефолту.

Круто. Але для того, щоб з'єднати потоки, в яких можуть бути деякі події вже можуть бути з'єднані. В них можуть бути дві різні часові мітки. І для цього ми винайшли часову мітку з ключем. Тобто все просто. Простий концепт. Ключ – це позначник потока, з якого прийшла подія. Тобто… У попередньому слайді. У нас поля маркових… Поля тайм. Усіх трьох вхідних потоках – вони з'єднані. Перепрошую. В усіх трьох потоках вони помічені, як поля, в яких міститься час. І це виглядає так. Тобто вже готовому. Івентів, який вже злився. Ми помічаємо перше. Ми йдемо по порядку. Перше поле, яке було марковане, в нього буде ключ 0. Ключ 1 і далі. І так нам доступно 255 байтів у типі байт. Ось стільки нам ключів і доступно.

Та також для цього ми повністю переробили уніфікацію міток. Тобто, перепроектували алгоритм з'єднання. І коли я готував слайди для того, щоб представити фінальний режим, у мене було таке обличчя. Виглядає воно так. Тобто щось не змінюється. У нас все ще є два буфери в одному процесорі: лівий та правий вхід. І не важливо, скільки вже було з'єднано до цього. Потоки все ще об'єднані. Парковані часовими мітками. Усі дані також містять поля, які марковані. Часові рамки задаються тим же синтаксисом.

Але у нас є спеціальна мапа для обліку часових міток за ключами. Це поточний стан глобальний у системі для цього з'єднання. Та мапа для підтримки часових міток за ключами. Це потрібно власне для цієї уніфікації. Якщо приходить часова мітка, то нам потрібно оновити ті самі ключі міток у мапі, в якій зберігається стан, які відкладені другою мапою. Потім ми обчислюємо новий максимум для кожної вхідної мітки. Тобто ми оновлюємо наше положення серед усіх міток, які прийшли у цей процесор, процесор на рівні кластеру.

Потім, оскільки ми вже знаємо наш поточний стан, ми можемо видалити усі події з лівого та правого буфера. Та зрештою, елементи буферів, щоб знайти мінімальне значення часу для кожної мітки часу. І власне ця мітка і буде тією самою часовою міткою, яка піде далі. Алгоритм знайдення для даних не потерпів. Тобто ми не вносимо таких великих змін. Тобто, якщо подія запізнилася, то ми її ігноруємо. Якщо виходить за межі, тобто якщо часова мітка у вже з'єднаній події виходить за межі стану, теж ігноруємо, тому що це вже невалідно. Тому що для вже однієї часової мітки у цьому з'єднаній великій події також це невалідно. Зберігаємо також.

Тобто вже додали, зберігаємо у буфері. І відправляємо. Можуть запитати: "Саш, а що з розподіленістю?" Тобто ти кажеш про якісь часові мітки. І ми вирішили зробити так. Оскільки ми підтримуємо різні типи з'єднань. І для джойнів, у яких поміж цієї часової умови є ще умова рівності, його професійно називають Equi-Join. Ми робимо це, оскільки це джойн, скоріш за все, по конкретному. По ключах. Ми можемо їх розпізнати, тобто ми знаємо, де вони лежать і локально їх з'єднуємо.

Якщо це правий джойн, то, швидше за все, ви хочете зліва зробити Broadcast по всьому кластеру. А дані брати локально з правого входу. І для всіх інших випадків ми ситуацію із правим з'єднанням просто інвертуємо. Тобто це ми порахували, подумали, подивилися на use-case і зрозуміли, що це буде найоптимальнішим підходом. Так, і насправді, тут мали бути ще слайди. Але я вирішив не турбувати вас тим. Якщо є питання, пишіть, говоріть.

Увійти
Або поштою
Увійти
Або поштою
Реєстрація через e-mail
Реєстрація через e-mail
Забули пароль?