Квитки на наступну конференцію Конференція Highload fwdays'24 вже у продажу!

Stateful app, як ефективний спосіб побудови диспетчеризації для райдерів та водіїв [ukr]

Презентація доповіді

У продуктовій лінійці сервісів Uklon ми часто використовуємо широко відомий та зрозумілий stateless-підхід. Але існує ряд бізнес-сценаріїв, де цей підхід може бути неефективним (з точки зору ресурсів, ефективності чи підтримки), та непередбачуваним (з точки зору обробки пікових навантажень).

В ході доповіді розглянемо наступні питання:

  • Workloads, які роблять stateless-підхід неоптимальним;
  • Масштабування stateful-сервісів за допомогою partitioning та replication;
  • Disaster recovery (RTO, RPO);
  • Шлях від використання actor-based до "native" stateful.
Олександр Чумак
Uklon
  • Solution Architect в Uklon
  • Більше 13 років в розробці, з яких 4 роки саме на позиції Solution Architect в Uklon, з IT командою 220+ спеціалістів. Відповідальний за дизайн та архітектуру бекенду, займається розробкою розподілених high-load систем, використовуючи стек .NET, Kubernetes та public clouds(AWS, GCP).
  • Cфери інтересів Олександра: designing distributed systems, event-driven architecture (EDA), DDD, event sourcing, CQRS, actor model, data-intensive applications.
  • Linkedin, Facebook

Транскрипція доповіді

Дякую. Всім привіт. Тема сьогоднішньої нашої доповіді - це Stateful Application, як ефективний шлях для побудови розподілу водіїв і пасажирів. Це, мабуть, одна із найскладніших задач, яка у нас є в компанії. Ви ще й не прослухали інформацію про Uklon, але я швидко, можливо, повторююся. Ми вже працюємо в трьох країнах, це 30 міст різних. За нашими користувачами у нас півтора мільйона активних райдерів, тобто ті, хто замовляє машинки. І, відповідно, 100-200 тисяч за день - це ті, хто входить водійський додаток. Прямо зараз на лінії кілька десятків тисяч водіїв, які обслуговують, по суті, пасажирів. Все це розробляє 12 продуктових команд, в яких працює 130 плюс інженерів.

Щодо нашого стеку, якщо ви відвідували наші мітапи, то ви знаєте, але все-таки основний у нас стек - це .NET, щодо транзакційних операцій. Для аналітики, DSA, Machine Learning - використовуємо Python, звісно. Мобільні застосунки у нас нативні. Ну і в якості стрімінгової системи ми використовуємо Kafka. Тому ці рішення, про які буду розповідати, вони, по суті, побудовані на ній. Все це працює на Kubernetes, AWS і так далі. З точки зору користувачів - це два додатки. Ви, скоріше за все, користуєтеся тільки одним з них - райдерським додатком, який знаходиться зліва. Ну і водії, про яких буде йти мова, також мають свій власний застосунок.

Щодо запитів - дві тисячі на секунду. Вебсокети працюють з зворотного боку. По водіям все набагато складніше. Вони їздять, потрібно відслідковувати їхнє місце розташування, тому вони надсилають свої координати - близько 5 тисяч координат щосекунди. Ми з свого боку відправляємо їм замовлення, тобто пропонуємо їм різні замовлення. Це приблизно 15 тисяч повідомлень на секунду в піки. Щодо звіту - як зменшити споживання ресурсів за допомогою Stateful Application і зробити їх ефективнішими, швидшими і надійними. Що роблять конкуренти? Можливо, вони просто працюють стейтлес? Насправді, ні. Якщо придивитися і прочитати їхні блоги, то, наприклад, у Grab є Scalable In-Memory Data Store для реального часу найближчих водіїв. У Uber це ще цікавіше, зокрема, щодо стейтлес баз даних.

Два місяці тому вони випустили статтю, де розповідали про оптимізацію Cassandra. Вони зазначають, що у них десятки мільйонів запитів на секунду, і це обслуговується 10 тисячами вузлів. Тобто 10 тисяч вузлів на сотні кластерів, і це проблема підтримки цього в робочому стані. Зміна стану бази даних, як вони кажуть, є непередбачуваною, і для галузі "райд-хейлінга" це потребує багато потужності. Ми будемо розглядати, як можна зменшити це з нашого боку. На адженті покажу наші ворклоуди, які не зовсім підходять для класичних стейтлес підходів. Далі буде про основні концепції, які ми використовуємо, і про масштабування та надійність всього цього.

Коли ви замовляєте машинку, нам потрібно знайти найближчого водія. А оскільки водії рухаються по наших дорогах 10-20 метрів за секунду, хто як вміє, скажемо, то в нас задача відслідковувати їхню локацію з точністю секунди 2-3. Тобто кожну секунду нам потрібно знати, де він знаходиться. Чому це може бути проблемою? Він проїжджає яке-небудь перехрестя і все. Йому там подача вже треба пітлять, розвертатися. Ви очікуєте занадто довго часу. Тому відслідковування з секундною затримкою. І далі, щоб знайти найближчого (не просто найближчого, а найкращого) водія для вас, нам потрібно вирішити задачу, яка є суттєво більшою та складнішою, ніж просто витягти із бази даних об'єкт за його ідентифікатором. Тут ми повинні опрацьовувати масиви даних.

І... Так. Основний наш ворог... Трошки домену, щоб ви розуміли, не тільки те, що ми нажали, натиснули, водій приїхав, а й ті процеси, що відбуваються зі сторони водія. Тобто водій у нас може працювати в двох режимах. Це так званий автоматичний розподіл. Він виходить на лінію і каже: "Я хочу, кидайте мені замовлення". Але на відміну від більшості конкурентів, наш водій може налаштовувати собі фільтри. Тобто в нього є десятки параметрів, він може встановити, наприклад, "Я хочу з такого району міста в такий район, за вартістю, по часу подачі". І ще десятки інших параметрів, які він може налаштовувати. Тобто він нам кидає фільтр і каже: "Я підписуюсь на замовлення, якщо що, назначайте на мене це замовлення". Ми відповідаємо йому.

Це перший автоматичний режим, і він для нас більш пріоритетний. Другий варіант - це так званий "ефір". Це, коли він каже: "Я просто хочу дивитися це замовлення, типу, брати чи не брати, я сам буду вибирати". І тут він так само налаштовує собі фільтри. Якщо за першим підходом ми не знайшли йому замовлення, ми викидаємо його в ефір, скажімо. І тут це також челенджова задача, бо... Просто взяти, наприклад, 10 тисяч водіїв і тисячу замовлень, перемножити, виходить якийсь космос чисел. Тому ми застосовуємо ряд тактик, щоб зменшити обчислювальну складність.

Наступний аспект - це створення замовлення і знаходження для нього найкращого водія. Тобто, якщо користувач RiderA створює замовлення, ми знаходимо для нього водія A, найближчого чи найшвидшого. Потім замовлення B створюється, і ми знаходимо іншого водія. Якщо розглядати сумарний час очікування, 2 плюс 9, то виходить 11 хвилин. Що ми робимо для вирішення цієї проблеми? Ми застосовуємо, по суті, алгоритми пакетного оброблення, накопичуємо замовлення і потім обробляємо їх вже в одному блоку. У результаті перерозподіляємо водіїв між замовленнями, і сумарний час очікування становить 8 хвилин. Таким чином, для виконання цих операцій ми повинні мати всі дані, всіх водіїв в пам'яті. Тягти їх постійно з бази даних виявляється дуже неефективним.

Також, наступне - коли вже водій для замовлення знайдений, потрібно передбачити пасажиру, скільки часу залишилося до приїзду водія (через хвилину, 2,5 чи інше). Якщо у нас активні тисячі таких замовлень, а водії постійно надсилають свої координати, а трек постійно змінюється, ми повинні оптимально це робити. Тут також використовуємо стейтфул-підходи, де тримаємо всі дані в гарячій пам'яті.

І ще список продовжується основними фічами, основним навантаженням, де потрібна велика кількість даних одразу. Знову ж таки, оці патерни DAI, сутність з бази даних за ідентифікатором, тут вже не працюють. Це такі, наприклад, у нас розподіл за секторами в аеропортах, на ж/д вокзалах. Це ланцюги замовлень, тобто водій виконує, завершує виконувати попереднє замовлення, і ми вже йому підкидаємо інше. Тобто, всі ці замовлення, які ми вже виконуємо, вони вже виконують попереднє замовлення, і ми вже йому підкидаємо інше.

Ну і останні, можливо, це також GPS-координати, виявлення аномалій і побудова на базі цих координат заторів, інформація по заторах, яку ми, як би, будуємо свою, а не користуємося сторонніми, наприклад, Google. Верхньорівнева архітектура - це те, що в розподілі водіїв бере участь драйверський рейс, рейдерські застосунки, рейдер створює замовлення, драйвер присилає свої фільтри, свій стейт і свої координати. Все це потрапляє в драйвер ордерінг, тут стейт, воно зберігається від пам'яті, як source of truth, умовно.

Ну і далі нам потрібно вирішити все-таки задачу знайти найближчого, найкращого серед усіх. Що я маю на увазі "серед усіх"? Наприклад, в радіусі одного кілометра від цієї точки, одного кілометра, може бути сотні водіїв, скажімо, в середньому 200-300. В радіусі двох кілометрів їх вже тисяча. Тобто, коли ми кажемо, що у нас подача може бути там десь кілометр, нам потрібно для одного замовлення водія обробити сотні водіїв, щоб знайти не лише найближчого, але й визначити, чи підходить це замовлення за фільтрами, які налаштував сам водій. І щоб вирішити це, нам потрібно також розраховувати по дорожнім розв'язкам, оскільки те, що водій може знаходитись в 100 метрах від точки, подача йому по факту може бути декілька кілометрів, якщо йому потрібно петлювати. І всі ці розрахунки, вони проходять через routing-engine наш, де ми вирішуємо цю задачу дорожнього графа. Оце, що в червоному квадраті, тут Stateful. І для того, щоб це все оптимально робити, ми їх тримаємо в пам'яті.

Також, я вже говорив, що це predict AT, це час подачі, тобто інформування пасажира, що там скільки залишилося до подачі. Це також історія Stateful. Все інше - Stateless. Stateful - це, в нашій компанії, скажімо, невелика частина. Основна частина ґрунтується на класичних принципах. І які основні проблеми відкриті у побудові Stateful Application? Це, по суті, load balancing. Тримати все на одній ноді, якщо, скажімо, проблема, їх потрібно якось скалізувати, відповідно, там якось балансувати, партиціонувати, реплікувати. Оце вся історія. Ну і надійність. Я не один раз чув, що нас запитували, а що ж ви робите, коли ваші сервіси падають? Вони там все в пам'яті тримають, впали і все, кінець. Це проблема, яку я розкажу, як ми її вирішуємо.

Ну і основні концепції для побудови Stateful Application у нас. Це те, що ми всі дані тримаємо, по суті, в key-value, скажімо, структурах in memory, в додатку. Про відновлення. Те, що там додаток падає чи ні. Цей State відновлюється з Durable Log. Згадував про Kafka. Тобто всі дані, це з Driver Ordering, вони потрапляють в топіки, по суті, які Durable. І вони вже реплікуються на ці Stateful Application, які займаються знаходженням водіїв. Тобто, як от Женя в попередньому доповіді розповідав, цей оркестратор, а от Pure Functions, скажімо, вони просто відповідають, приходять, типу, замовлення, дай водіїв найближчих, і вони пачку віддають. Тобто цей State не змінюється. State змінюється завдяки, по суті, реплікації даних. Ну і особливість також, якою ми користуємося, оскільки у нас цей лог в кавці, як я вже говорив, то у нас семантика однопотокової обробки на зміну State. Це також. Дуже спрощує саму розробку, бо там не потрібні конкурентні структури, блокуючі структури, структури синхронізації даних, тобто сам код стає значно простіший. Ну і за NFR-ами, ще раз, для Києва.

Отже, ми маємо два основних топіки, які змінюють стан нашого додатка. Це координати, про які я вже говорив, і стан водія, включаючи його фільтри та інші налаштування. Для розуміння розмірів даних, один запис водія в 50-му перцентилі становить приблизно 2 кілобайти. У 99-му перцентилі цей розмір зростає до 13 кілобайтів. Якщо ми припускаємо, що у нас є 100 000 водіїв в додатку, це складає 200 мегабайт. Це враховуючи, що ми розглядаємо Київ як мільйонне місто, де кількість користувачів на декілька мільйонів, і всі дані цього регіону легко поміщаються в оперативну пам'ять.

Щодо оновлень, які змінюють стан, і читань, у нас, в середньому, 5-10 тисяч RPS (запитів на секунду) координат і 100-500 RPS для зміни стану водія. Щодо читань, ми маємо від 200 до 500 QPS (запитів на секунду). Проте слід зазначити, що на кожен запит нам потрібно знаходити 100-500 водіїв, які є найближчими, і працювати з ними. Це призводить до того, що на кожен запит нам потрібно отримувати від 20 до 100 тисяч рядків зі сховища даних, що призводить до трафіку до 200 мегабайт на секунду.

Важливим моментом є те, що ці дані постійно змінюються, і якщо ми просто записуємо їх в базу даних і читаємо, це становить велике навантаження на обробку даних. Дані стають застарілими всього за кілька секунд, і їх потрібно постійно оновлювати. Одна з ключових концепцій при використанні цих стрімів - це co-partitioning. Ми розділяємо дані по координатах та стану водія на різні топіки, а потім забезпечуємо, щоб координати і стан водія потрапляли на один і той же робочий вузол для забезпечення локального доступу до даних. Це дозволяє нам забезпечити високий data locality і уникнути ділення інфраструктури між вузлами. Ключові шаблони доступу до даних включають key-lookup, використання індексів для різних сценаріїв та сканування всіх даних для врахування фільтрів, які водії можуть налаштовувати.

Наступна вимога стосується кількості партицій в топіках, яка повинна бути однаковою. Це потребує планування заздалегідь та розрахунків для забезпечення консистентності стану щодо кількості партицій. Якщо потрібно розмістити один топік, це може вимагати змін у кількості партицій в декількох топіках, що може бути проблематичним у деяких сценаріях. Важливо використовувати однаковий "партиціонер" (partitioner) для продюсера, щоб забезпечити однаковий розподіл даних і локальний доступ. Це означає, що, наприклад, дані про зелених водіїв потрапляють в один топік, а дані про синіх водіїв - в інший, і т.д.

Другою концепцією є "ріки даних". Ми маємо топіки з різними схемами ключів, і щоб їх об'єднати, створюємо "ріку", читаємо один топік і публікуємо його з новим ключем у новий топік. Це дозволяє зберігати водіїв одного регіону на одній партиції, що сприяє "data locality". Однак можуть виникнути проблеми з нерівномірним розподілом навантаження, якщо міста мають великі різниці в розмірах. Третя концепція - це "фільтрація та збагачення". Для ефективного розрахунку ATA (очікуваного часу прибуття) пасажиру ми фільтруємо координати, накопичуємо їх у пам'яті, а потім розраховуємо ATA і розсилаємо збагачені дані. Це відбувається без використання сторіджів.

Щодо масштабування, варто враховувати, що scalability відноситься до збільшення навантаження від року до року, в той час як еластичність стосується короткочасних піків, таких як годинні піки використання. Щодо стратегії "шардування" або "партіціонування", використання геопросторових індексів, таких як GeoHash, може допомогти вибрати дані водіїв, які розташовані поруч на одній ноді. Однак це може призвести до розбалансування, оскільки великі міста можуть потрапити на одну партицію, а менші - на іншу. Розбаланс можна боротися, виділяючи кілька місць на один топік для різних регіонів.

Якщо цих недостатньо, то вони додаються ще там, і тоді розкручують цей комок. Але можна просто розділити за регіонами. Іншими словами, ми беремо, скажімо, що весь Київ потрапляє на одну "ноду". Як я вже показував, за даними 200 Мб, все легко поміщується там. Таким чином, перший підхід використовується у нас в більшості — це в Києві. У Data Analytics, Machine Learning, DCA в основному використовується це партизування по місту, по регіону. Там, де є замовлення на Processing, ми в основному використовуємо цей підхід.

Саме для того, щоб зменшити cross-partition взаємодії і зробити дані більш локальними. Просто партизування по регіону. Тобто, як я вже говорив, дані з Києва потрапляють в одну партицію, інші міста розбалансовуються. Відповідно, піднімається консюмер, який обробляє тільки Київ, піднімається консюмер, який обробляє інші міста і так далі. Однак можуть виникнути проблеми, такі як downtime. Тобто, якщо відбувається збій, ніхто нічого не обробляє, і для відновлення потрібен час.

І знову ж таки, розбалансовані ворклоди. Київ — значно більше за інші міста. Тобто, навантаження на першому значно більше. Як вирішити це? Додавати репліки. Репліки додаються не тільки для застосування при збоях, але й для масштабування навантаження. Як це можна вирішувати за допомогою цих інструментів? Тобто, ми повинні використовувати не класичні консюмери з консюмер-групами, які автоматично розподіляються між партиціями, але ручний асайн. Тобто, наш консюмер каже, що буде читати всі дані з нульової партиції, інший каже, що буде читати з сьомої, шостої, п'ятої.

Тут вже немає концепції консюмер-групи. Тут немає проблеми з ребалансом при додаванні нової ноди, як у класичному асайні всіх партицій. Тут ми продовжуємо читати, і ми вже можемо сміло перезапускати. Незалежно від того, як часто піднімається у нас холодний старт, не відбувається відновлення реплік, завжди є інші репліки, які оброблять наші замовлення. Таким чином, масштабування досягається на довгому проміжку часу за допомогою додавання нових партицій, додавання нових шарт, еластичність.

Щодо надійності, ми можемо використовувати підходи CRDT для збереження консистентності даних на репліках. Використання State-based реплікації уможливлює зведення даних до консистентного стану. Також можна використовувати стратегію Last-Write-Wins, де повідомлення з більшим таймстемпом переважають. Це може бути корисним у випадку Unordering у Kafka. Звісно, для досягнення оптимальної працездатності доводиться робити деякі компроміси щодо продюсерів. Та важливо пам'ятати, що оптимізація надійності може вплинути на продуктивність.

А що ж робити з Replication Lag? Які можуть бути проблеми? Звісно, це те, що Reading Your Own Rights, тобто ти щось змінив, читаєш, а там у тебе ще попередні дані. Або Monotonic Reads, тобто у тебе кожне наступне читання може повернути дані попереднього, тобто більш старіше. Або Consistent Prefix, тобто коли спочатку там створюється замовлення, а потім івент про те, що машина знайшлась, а вони приходять в зворотному порядку. Це класичні проблеми з Replication Lag. Але давайте розглянемо, чи вони нам притаманні. Оскільки ми маємо, що State змінюється зі сторони водія, а читається зі сторони пасажира, оці всі проблеми, вони зникають. Для Consistent там взагалі три повинні бути учасники, третій спостерігач. Тобто у нас водій змінюється, пасажир читає, і цих проблем немає.

Це залежить від того, як ви задизайнуєте свій домін. Replication Lag дійсно може бути проблемою, якщо репліка дуже сильно відстане, приблизно на хвилини якісь. Але в нашому випадку ми цього ніколи не спостерігали. Ну і надійність цього рішення. Оскільки у нас тільки з інфраструктури підключення до Kafka, Kafka зарекомендувала за всі ці часи як перевірена і мега надійна. У нас за кілька років жодного з них не було інциденту, саме з Kafka. А більше у нас нічого немає. Ні Redis, ні POSRIS, нічого немає. Тобто по надійності це значно вище, ніж класично там ще тримати, якийсь додатковий Remote Storage.

Але є проблема, що нам потрібно відновлювати стейти із цих топіків, бо стейт ми кожен раз відновлюємо при старті. І тепер, як довго це відбувається? І тут також є декілька варіантів, як можна організувати дані в топіках. Це по компактним. Тобто ми кажемо, що для кожного драйвера ми залишаємо мінімум останній меседж. Або ж просто по Time Retention. Кажемо, що залишаємо в топіку дані тільки за кілька годин. Насправді більш надійний перший варіант. Але знову ж таки, оскільки ми там реплікуємо їх по регіону, а не по Driver ID. Для того, щоб дійсно залишався тільки останній меседж.

А ми використовуємо другий. Ми просто забезпечуємо, що у нас за останню годину завжди буде івент про водія. І виходить, що оскільки ми тримаємо дані за одну годину, маючи там півмільйона меседжів в топіку, у нас відновлення відбувається, от лох я витягнув, за одну секунду, за 1,8 секунд. Тобто, для підняття ноди потрібно менше двох секунд, щоб вона запопулейтила свій стейт внутрішній. Багато це чи мало? Там на задніх рядах, мабуть, не видно. Але якщо там взяти, згадати про SLA, то дві секунди це майже п'ять дев'яток. Заздалегідь надійність намагаються досягти до чотири дев'ятки. Це вже, типу, космос майже. Тут майже п'ять дев'яток.

Але знову ж таки, це від нашого форуму. Залежить від кількості даних, якими дизайнами і так далі. А можна ж все-таки реалізувати за допомогою Redis? Можна спитати. Можна. Якщо взяти дані про продуктивність з Amazon, то приблизно для цих ворклоїдів нам потрібно клієнт з 16 CPU і три ноди Redis по вісім. Там 24 CPU. 16, 24, десь 40 CPU цього потрібно. І клієнт тільки буде читати. Тобто просто отримувати ці дані з Redis, це отам от 16 CPU потрібно. Далі я покажу, скільки виходить у нас. От наш весь аплікейшн, який хендлить всі міста. Він споживає всього два CPU. По пам'яті, я казав, корисний об'єм там 200 МБ на Київ. От там додаткові менше 3 ГБ. Тобто весь розподіл може бути два CPU. Тобто це сама дешева віртуалка, все це вивозить. Якщо згадувати ретроспективно, 20-і роки я знаходив такий скріншот. Там у нас інші підходи були. Там замість двох було 50. 50 CPU використовувалось.

Ну і наступні кроки, які ми будемо робити наразі, у нас є тільки переоцінювання з тим двосекундним відновленням стейта. Далі все-таки, звісно. Зробимо реплікацію додаткову, щоб ще зменшити ці навантаження. Ну і так само ми зможемо більше, ще більше масштабуватись під еластичність саме. Уроки, які вивчили. Тобто стейтфул підходи, вони насправді, через те, що ми утримуємо всі дані в пам'яті, дуже сильно зменшують accidental complexity, про яку Женя також згадував. Тобто ми намагаємось. Сами алгоритми стають більш прямолінійними. Навіть от всіх там розробників спитав. Вони кажуть, що це поточна версія значно простіша та інтуїтивно зрозуміліша, ніж всі попередні рішення, які у нас були.

Ну і звісно, це вже щодо ресурсів. Порівнюючи два CPU та 40 там. Якщо ми кажемо, що ми ще підключаємо там десяток країн, то різниця буде значно більшою. Я вже не встигаю трошки за таймінгом розказати про наші попередні історії з серлінсом, але ладно, потім. І ще такий момент. Раніше від DevOps'ів часто можна було почути. От є там 12 факторів, раніше там 12 факторів застосунку, рекомендації щодо того, які там застосунки мають бути. Ну і там от прописано, що всі сервіси повинні бути стейтлес. Якщо розглядати більш детально, то там, вони, знову ж таки, вказують на те, що при перезапуску дані втрачаються. Ну там, нода петнулась там, весь стейт загубили.

Я ж показав, що за допомогою оцього durable лога і відновлення дуже швидкого, ця проблема вирішується. Тобто потрібно не просто вірити в якісь правила, що можна чи не можна. Варто глибоко розібратися в сутності. Ну і ще як бонус, знову вже було згадано цю книгу, "Fundamental Software Architecture". Є ще друга, "The Hard Part". Можна знайти спейсбейс та архітектуру там. Звісно, вона тут набагато складніша, але концепція залишається. Тобто у нас є репліковані дані і у нас є якісь обчислення, які це дані використовують. І те, що я хотів показати, це наступне, що автор вказує, що такі схеми найбільше ефективні з високим рівнем еластичності та масштабованості. У мене все. Дякую.

Увійти
Або поштою
Увійти
Або поштою
Реєстрація через e-mail
Реєстрація через e-mail
Забули пароль?