Для чего нужны очереди сообщений
Очередь сообщений (Message Queue)
Очередь сообщений (Message Queue)
Этот пост рассказывает об очередях сообщений — почему вы должны знать о них, думать при планировании архитектуры и использовать их в вашем приложении.
Почему очереди сообщений?
Сообщения, наряду с блоками вычисления и хранения, составляют три основных блока почти в каждой блок-схеме системы. Очереди сообщений, по существу, являются связующим звеном между различными процессами в ваших приложениях и обеспечивают надежный и масштабируемый интерфейс взаимодействия с другими подключенными системами и устройствами.
О́чередь — структура данных с дисциплиной доступа к элементам «первый пришёл — первый вышел». Добавление элемента возможно лишь в конец очереди, выборка — только из начала очереди, при этом выбранный элемент из очереди удаляется.
Использование очереди сообщений
Почему SaaS?
Добавление очереди сообщений для облачных приложений имеет смысл, только если есть чистый выигрыш в плане установки и эксплуатации. Добавление дополнительного архитектурного слоя отвечающего за очереди сообщений — непростая задача, особенно если вы решили использовать собственное решение или установить на свои сервера стороннее, так как это привнесёт дополнительные затраты на мониторинг, настройку, управление и повлияет на общую надёжность и безопасность системы.
Когда очереди сообщений легки в установке, просты в использовании, высоко доступны и чрезвычайно надёжны — все становиться гораздо проще.
Тут уместна аналогия получения энергии. Прогресс шёл от ветряных мельниц и угольных печей до промышленных электростанций и линий электропередач.Этот последний шаг — индустриализация энергии — изменило лик промышленности в мире. Это снизило затраты на строительство и производство, изменило города, заводы, и дома, и позволило создать новые изобретения, услуги и виды бизнеса.
Аналогичным образом, путём подключения служб очередей сообщений, разработчики больше не должны поддерживать огромный наборов сервисов, работающих на нескольких серверах и не опасаться простоя в результате отказа систем. В современном мире поставщики услуг берут на себя ответственность за управления серверами, API и другими ресурсами, а разработчик абстрагируясь от большинства физических ограничений может сконцентрироваться на реализации своей идеи.
Преимущества перехода на облачные очереди сообщений включают в себя:
С чего начать?
PS Я надеюсь мне удалось заронить каплю сомнения в выбор «поставить свой сервер MQ или использовать сторонний сервис» и заинтересовать в существующих SaaS решениях в области очередей сообщений.
Конвертируй это — с Yandex Message Queue
Довольно прозаичный и понятный в быту термин порой все еще вызывает вопросы в IT. Зачем при разработке приложений использовать очереди или сервисы очередей, чтобы автоматизировать этот процесс? Ответим на этот вопрос практическими примером — напишем в serverless-стеке Yandex.Cloud сервис для конвертации видео в GIF, используя Yandex Message Queue — ту самую очередь.
Что такое сервис очередей и зачем он нужен
Очередь что-то накапливает и постепенно выдает. Как же это реализует для разработки сервис очередей? Сервис очередей — решение для обмена сообщениями между приложениями, реализуемое с помощью API. С одной стороны, в него можно складывать какие-то события, а с другой — выдавать их обработчику (или нескольким обработчикам) для выполнения поставленной событием задачи, после решения которой событие в очереди отмечается как решенное.
Сервисы очередей находят применение в широком спектре сценариев, особенно в ресурсоемких задачах с ограничениями в скорости обработки. Пользователи одномоментно могут сгенерировать множество новых событий, обработчики не справятся с потоком, что приведет к отказу в работе. Использование сервиса очередей амортизирует нагрузку на сервис, когда приходит слишком много событий, они встают в очередь и ждут, пока у обработчика до них «дойдут руки». Задачи никуда не теряются даже в случае аварийного завершения системы.
С технической стороны можно представить очередь как специализированную базу данных с очень узким API и строго определенным сценарием использования. Причем организовать ее можно и поверх базы данных общего назначения. Правда, это не самая лучшая история. Вот почему.
С задачей организации очереди сталкиваются многие, берут обычную базу данных и делают в ней простейшую структуру типа такой:
Когда нам необходимо что-то положить в очередь, мы делаем INSERT — запрос, который складывает события очередь. С другой стороны обработчик делает SELECT самого старого события, что-то с ним делает, и когда завершает работу, удаляет событие из нашей таблицы DELETE. Вот, казалось бы, и готова очередь на MySQL. И этот проход работает, многие разработчики идут именно по этому пути. Но в нем есть целая прорва нюансов.
При рассмотрении частного примера, приведенного выше, возникают вопросы: а что делать, если у нас несколько обработчиков, которые исполняют задачи конкурентно, и как сделать так, чтобы задачи, находящиеся в работе, блокировались? Если один обработчик взял задачу в работу, нужно чтобы второй параллельные не обрабатывал. Если мы задачи заблокируем, то надо уметь их разблокировать в случае, когда совсем обработчик умер и не реагирует на запросы. На все вопросы можно придумать решения, но только путем серьезного усложнения структуры таблиц в базе данных и написания дополнительного кода. И эти задачи также можно решить путем добавления новых полей и команд. Только зачем «изобретать велосипед»? Даже создатели некоторых баз данных против их нецелевого использования. Например, в документации к Apache Cassandra есть специальный абзац, который не рекомендует ее использование для организации очередей. Вы тратите свои ресурсы на создание собственного фреймворка, ищете баги и отлаживаете его, а в итоге получаете решение, которое уже есть на рынке и готово к работе по нажатию одной кнопки.
Сервис очередей — это готовый сервис, который решает свою узкую задачу. И, как в случае с большинством других специализированных инструментов, такой сервис решает свою узкую задачу лучше, чем инструменты общего назначения.
Преимущества сервиса очередей Yandex Message Queue
Yandex Message Queue (YMQ) — это сервис очередей, который входит в состав платформы Yandex.Cloud. Он построен по serverless-принципу, а значит надежен, отказоустойчив, способен выдерживать высокие нагрузки, не требует администрирования, а оплачивается по модели pay-as-you-go — сколько использовали, за столько и заплатили. Достаточно в несколько кликов создать очередь, и можно сразу начинать с ней работать. В YMQ реализованы очереди двух типов: стандартные неупорядоченные очереди и очереди FIFO (First In, First Out), а также добавлена поддержка API Amazon SQS — все инструменты, которые работают с очередями от Amazon, также будут работать и с YMQ.
От теории к практике — пишем в конвертер видео в GIF
В качестве примера использования очереди Yandex Message Queue мы реализуем небольшой проект, который позволит пользователям нашего сервиса конвертировать видео по ссылке в файл GIF. Такая задача хорошо демонстрирует один из сценариев использования очереди, потому что она CPU-intensive — сильно загружает процессор. Чем больше размер видео и лучше его качество, тем больше надо ресурсов для его обработки. Конвертирование может занимать от десятков секунд до нескольких часов, и без использования очередей не добиться стабильной работы сервиса. Почему?
Если бы мы решали эту задачу в лоб, то она выглядела бы так: пользователь вставляет в сервис ссылку на видео и ожидает, что тот ему синхронно вернет ссылку на готовый GIF. Но такой способ работает плохо. Синхронное ожидание на TCP-соединении — ненадежная история: соединение в любой момент может оборваться, например у пользователя «моргнет» Wi-Fi. Когда коннект рвется, пользователю приходится делать новый запрос и снова ждать конвертации, а вам на стороне сервера — выполнять задачу заново. Если связь плохая, то пользователь может никогда не дождаться выполнения задачи, а будет видеть постоянные ошибки. Это первая проблема.
Вторая проблема связана с тем, что конвертация видео — ресурсоёмкая задача. Если на сервис придет одновременно десять пользователей и каждый захочет сконвертировать свое длинное видео, может банально не хватить мощностей. И при такой примитивной реализации сервиса кто-то из пользователей сразу получит ошибку, что конвертация невозможна. Чтобы избежать этих проблем, мы и добавляем в архитектуру сервиса очередь.
Вот как будет работать наш сервис в serverless-стеке с использованием очереди:
Пользователь приходит в нашу функцию (API Function) и создает задачу на конвертацию видео: вводит URL со ссылкой на файл. Для корректной работы нашего тестового сервиса исходный файл должен лежать на Яндекс.Диске. Вместо того чтобы сразу начинать синхронно работать, мы генерируем идентификатор задачи и говорим пользователю: «Вот твоя задача, жди ее выполнения где-то там за дверью». Тем временем наша функция складывает это событие в два места: непосредственно в очередь, ставя задачу «сконвертировать видео и отписаться в такой-то идентификатор, когда итоговое видео будет готово», а также в DocAPI-таблицу, то есть дополнительную базу данных, где мы также отмечаем, что по идентификатору задачи выполняется конвертация видео. Использование дополнительной обычной базы данных для проверки пользователем состояния задачи необходимо из-за того, что YMQ — специализированная система и она не позволяет осуществлять поиск по ключу. Пользователь ждет с идентификатором на руках, периодически, например раз в 5 секунд, обращаясь по нему в функцию с запросом, не готова ли задача. Когда задача будет выполнена, он получит ссылку на скачивание файла GIF.
В это время внутри нашего сервиса происходит следующее. На очередь у нас повешен обработчик Converter Function. Это функция, которая через триггер подключена к очереди и по мере своих возможностей забирает из очереди задачи и конвертирует видео в GIF. Когда очередная задача выполнена, мы выгружаем готовую гифку в Object Storage, а в таблице DocAPI отмечаем флажком, что такая-то задача выполнена, и записываем туда ссылку на скачивание файла.
В нашей схеме сервиса помимо YMQ мы задействовали другие serverless-сервисы — Cloud Functions и Object Storage — которые легко настраиваются с помощью консоли Yandex.Cloud. А также используем DocAPI Table — это API к Yandex Database в serverless-режиме, она также совместима с Amazon DynamoDB.
Реализация проекта в Yandex.Cloud
Заходим в консоль Yandex.Cloud и в каталоге проекта создаем сервисный аккаунт, которому назначаем все требуемые для реализации проекта роли. В продакшене так делать не рекомендуется, но для упрощения примера поступим именно так.
ymq.reader и ymq.writer (чтение и запись YMQ);
storage.viewer и storage.uploader (чтение и запись из Object Storage);
ydb.admin (права администратора YDB, чтобы взаимодейсвовать с DocAPI-таблицей);
serverless.functions.invoker (роль Functions.invoker, чтобы вызывать функции);
lockbox.payloadViewer (для работы с Lockbox).
Далее создаем статический ключ доступа, который нужен для работы с Amazon-совместимыми API. Так как у нас в проекте три таких API, ключ нам обязательно потребуется.
Чтобы донести эти ключи до функции, воспользуемся новым сервисом хранения секретов платформы Yandex.Cloud — Yandex Lockbox. Он позволяет безопасно доставлять секреты куда угодно. В сервисе мы создаем секрет и сохраняем в нем под разными именами наши два ключа: ACCESS_KEY_ID и SECRET_ACCESS_KEY.
Теперь перейдем к созданию необходимых для реализации проекта ресурсов:
Yandex Message Queue;
Yandex Object Storage;
Yandex Cloud Finctions.
Начнем с создания очереди Yandex Message Queue. Задаем имя, выбираем тип «Стандартная» (дополнительная гарантия fifo для нашего тестового проекта не важна и fifo-очередь не позволяет создавать триггеры). Не включаем в тесте «Перенаправлять недоставленные сообщения», но в «проде» эту галочку лучше ставить. Когда эта функция выключена, при появлении сообщения, вызывающего падение или выход за лимит исполнения нашей функции, триггер будет бесконечно по кругу пытаться подсовывать это сообщение в функцию, что приведет к бесцельной трате денег. Активация возможности «Перенаправлять недоставленные сообщения» с настройкой Dead Letter Queue позволит после нескольких неудачных попыток такие «сломанные» сообщения отправлять в DLQ, где их можно будет проанализировать в ручном или автоматическом режиме.
Теперь создаем новую serverless-базу данных. В ней нам нужна одна табличка с типом «Документная таблица» и одним ключом. Такой тип необходим, чтобы иметь возможность работать с DynamoDB API.
Последнее — создаем бакет в Object Storage с дефолтными настройками.
Ресурсы созданы, переходим к написанию кода и взаимодействия с API.
Создаем первую функцию для работы с API. Писать код мы будем на Python, именно это язык выбираем в редакторе функции. В функции описываем зависимости. Поскольку мы будем работать с разными API, нам надо добавить зависимости SDK. Для работы с сервисом Yandex Lockbox нам потребуется библиотека yandexcloud, которая содержит SDK для работы с большинством сервисов Яндекс.Облака. А вот для работы с сервисами, реализующими AWS compatible API — мы добавляем библиотеку boto3.
Код функции целиком посмотреть тут. Остановимся на основных этапах ее выполнения.
Сначала мы извлекаем секреты из Lockbox и переносим его в переменную функции:
Инициализируем сессию boto3, а затем все остальные компоненты:
— передаем через переменные в функцию URL очереди.
Описываем нашу табличку DocAPI с передачей endpoint в функцию через переменную окружения:
Инициализируем клиент Object Storage:
Теперь посмотрим, как работает код, который описывает поведение нашей функции, реализующей API.
create_task(src_url) — создает задачу на конвертацию видео. Принимает от пользователя URL видео для конвертации, генерирует идентификатор задачи, складывает запись в табличку DocAPI со статусом «не сделана», кладет задачу также в YMQ, ставит событие, что надо обработать видео.
get_task_status(task_id) — проверка статуса задачи. Принимает от пользователя идентификатор задачи, идет в табличку и смотрит статус готовности задачи. Если готова, то возвращает пользователю ссылку на готовый GIF.
handle_api(event, context) — точка входа в функцию. Что будет производить функция по запросу пользователя: конвертировать или проверять статус готовности.
Далее вставляем этот код в функцию, заполняем дополнительные поля и переменные SECRET_ID, YMQ_QUEUE_URL, DOCAPI_ENDPOINT.
Функция готова, можно ее протестировать. Задаем входные данные в формате запроса JSON со ссылкой на исходное видео. На выходе получаем идентификатор задачи. Если его проверить, то статус будет false, так как задачу еще никто не обрабатывает: обработчик мы не подключили.
В интерфейсе YMQ мы видим, что сообщений в очереди стало «1», так как наше сообщение попало в очередь и ждет, пока будет обработано.
Теперь создаем вторую функцию, которая непосредственно будет обрабатывать видео. Также в интерфейсе создаем новую функцию и выбираем язык Python.
Код функции целиком посмотреть тут. Остановимся на основных моментах.
download_from_ya_disk(public_key, dst) — скачивание видео с Яндекс.Диска.
upload_and_presign(file_path, object_name) — выгрузка видео в Object Storage и генерация presigned url для него.
handle_process_event(event, context) — точка входа нашей обрабатывающей функции из очереди. Получаем идентификатор, само видео, вызываем конвертер ffmpeg с параметрами, получаем готовый GIF, выгружаем его вспомогательным методом и отмечаем в табличке DocAPI по идентификатору, что задача выполнена.
Чтобы использовать функцию ffmpeg, которой нет в стандартной поставке функции, загружаем статический бинарник для архитектуры AMD64 с официального сайта ffmpeg.org. Размер файла довольно большой, а в функциях есть ограничения на размер исходника, который можно передать напрямую через интерфейс. Если размер превышает 3,5 МБ, то его необходимо загружать через Object Storage.
Кладем исходники функции (requirements.txt, index.py) и бинарник ffmpeg в архив src.zip и загружаем его в бакет Object Storage.
А в редакторе функции указываем способ доставки исходников — наш бакет, объект (архив) и точку входа, другие параметры и переменные.
Теперь создаем триггер для начала обработки события. Проверяем, что выбран тип триггера Message Queue и нужная очередь сообщений, а также названия функции и сервисного аккаунта — ffmpeg-converter и ffmpeg соответственно.
Очередь по триггеру передала задачу в обработку, что можно увидеть в окне «Обзор» — поле «Сообщений в обработке».
Убедиться, что задача выполняется, можно в окне «Логи» нашей функции.
Снова переходим в окно тестирования и проверяем статус задачи через нашу функцию API, получаем результат со ссылкой на скачивание готового файла GIF. Переходим по ссылке и скачиваем результат, созданный по заданным в коде параметрам ffmpeg.
Таким образом мы решили с помощью serverless-стека Yandex.Cloud задачу по конвертированию видео в GIF с использованием сервиса Yandex Message Queue. Такой способ создания приложений удобен способом оплаты pay-as-you-go, когда вы платите только за выполненные операции, а также не требует администрирования, отказоустойчив и безопасен.
Запись вебинара вы можете найти по этой ссылке: https://www.youtube.com/watch?v=uyIMvEtr3cI
Сейчас на наши serverless-сервисы действует программа free tier, а значит эксплуатация небольших проектов будет бесплатной. Заходите в наше serverless-комьюнити, где разработчики делятся своим опытом, рассказывают истории успеха, помогают решать задачи и осваиваться в новой технологии.
Архитектура сервиса распределённых очередей сообщений в Яндекс.Облаке
Привет, меня зовут Василий Богонатов. Я один из тех, кто приложил руку и голову и вложил свою душу в сервис распределённых персистентных очередей сообщений Yandex Message Queue. Сервис вышел в общий доступ в конце мая, но внутри Яндекса он уже давно и активно используется в разных продуктах.
Сегодня я хочу рассказать читателям Хабра об очередях сообщений вообще и о Yandex Message Queue в частности. Сначала я хочу объяснить, что такое «распределённая персистентная очередь сообщений» и зачем она нужна. Показать её практическую ценность, механику работы с сообщениями, поговорить про API и удобство использования. Во второй половине материала мы посмотрим на техническую сторону: как в наших очередях используется Yandex Database (это надежный фундамент нашего сервиса), как выглядят наивный и улучшенный подход к построению архитектуры, какие проблемы вызывает распределённость и как их можно решить.
Что такое распределённая персистентная очередь сообщений?
Википедия определяет очередь сообщений как «программно-инженерный компонент, используемый для межпроцессного или межпотокового взаимодействия внутри одного процесса». На самом деле, это понятие несколько шире: процессы, взаимодействующие при помощи очереди, могут находиться на разных серверах и даже в разных дата-центрах.
Мы немного уточним термины.
Очередь сообщений – это хранилище, которое обеспечивает размещение и чтение данных в определённом порядке.
С очередью обычно взаимодействуют два типа сущностей:
Основной сценарий для очереди: надёжно и быстро передавать сообщения от писателя к читателю. В отличие от базы данных очередь не предназначена для длительного хранения сообщений. Во многих популярных реализациях существует соответствующий параметр – «Срок хранения сообщений». Он определяет, сколько времени хранится сообщение до момента безвозвратного удаления.
Мы разобрались с понятием очереди, переходим к «распределённости» и «персистентности».
Для чего нужна очередь сообщений
Очередь позволяет отделять логически независимые части сервисов друг от друга, то есть обеспечивает decoupling, который так востребован в популярных сейчас микросервисах. Это повышает масштабируемость и надёжность: всегда можно увеличить поток записи в очередь и добавить больше читателей – обработчиков сообщений, при этом отказ читателей никак не сказывается на работе писателей.
Очереди сглаживают пики нагрузок: они исполняют роль буфера для читателей. Если для мгновенной обработки всех поступающих сообщений текущих мощностей читателей недостаточно, помещённые в очередь сообщения будут обработаны позже, когда нагрузка уменьшится. Буферизация полезна для сервисов с нестабильной нагрузкой, где не нужна моментальная обработка входящих событий.
Давайте посмотрим, как это работает, на примере поискового робота (всё-таки Яндекс начинался с поиска!), который скачивает, обрабатывает и помещает веб-страницы в базу данных. Возьмём вот такую архитектуру.
Очередь сообщений решает здесь следующие проблемы:
Как Yandex Message Queue работает с сообщениями
Здесь можно выделить три основных этапа:
В момент прочтения сообщение скрывается из очереди на период времени, который называется таймаутом видимости (Visibility Timeout), и становится недоступным для других читателей. Если таймаут видимости истекает, сообщение возвращается в очередь и снова становится доступным для обработки. Порядок прочтения сообщений определяется очередью, а не читателем.
Сам читатель и сетевое соединение с ним потенциально ненадёжны. Таймаут видимости необходим, чтобы иметь возможность вернуть в очередь сообщение при аварийном завершении работы читателя или обрыве соединения. В противном случае возникает вероятность, что отдельное сообщение никогда не будет корректно обработано.
После успешного прочтения сообщение передаётся клиенту вместе с идентификатором ReceiptHandle. Идентификатор указывает на конкретные данные, которые должны быть удалены из очереди сообщений.
Типы очередей в Yandex Message Queue
Первый и наиболее часто используемый тип – стандартная очередь (Standard Queue). Она отличается высокой пропускной способностью (тысячи сообщений в секунду), отличной производительностью и малым временем исполнения основных операций. Стандартные очереди состоят из логических шардов и поддерживают практически линейное масштабирование пропускной способности.
Стандартные очереди не поддерживают дедупликацию сообщений при записи в очередь и не гарантируют порядок прочтения. Из-за использования шардинга запрос на чтение может не вернуть ни одного сообщения, даже если они есть в очереди. Чаще всего это случается в режиме short polling, когда чтение идёт из одного случайно выбранного шарда.
Yandex Message Queue API
API – крайне важная составляющая любого продукта. Хороший программный интерфейс должен быть простым и понятным, требовать минимального ознакомления с документацией для эффективного применения. Должен не позволять делать странные или ненужные действия и защищать от глупых ошибок, вовремя сообщая о нарушении «контракта».
Если у системы есть такой API, она быстро получает лояльных пользователей и обрастает удобными «обёртками» для разных платформ и языков программирования.
Amazon Simple Queue Service API (AWS SQS API) – пример такого интерфейса, проверенного временем и огромным количеством клиентов. Поэтому мы решили не изобретать уникальный интерфейс для Yandex Message Queue, а реализовали поддержку AWS SQS API, причём очень тщательно.
В большинстве случаев пользователю SQS достаточно сменить endpoint (адрес сервиса), регион (в данный момент у нас используется только «ru-central1») и получить новые реквизиты доступа (credentials) в пределах Яндекс.Облака. Всё остальное, например, скрипт с использованием командной строки AWS, код с использованием AWS SDK или готовый сервис на Celery или boto, скорее всего, трогать не придётся. Логика и функциональность сервиса очередей останутся прежними.
Подробное описание методов Yandex Message Queue API есть в документации сервиса.
Немного об удобстве
Yandex Message Queue – управляемый (managed) сервис, то есть за работоспособность серверов и программного обеспечения отвечает Яндекс.Облако. Команда сервиса следит за здоровьем очередей, оперативно заменяет вышедшие из строя диски, устраняет разрывы сети и выкатывает обновления. Обновление происходит без остановки сервиса: пока мы устанавливаем новую версию YMQ на одну группу серверов, балансировщик нагрузки старательно перенаправляет трафик на другие. Так что пользователи ничего не замечают.
Чтобы вам было удобнее контролировать работу очередей, мы добавили в YMQ большое количество наглядных графиков, здесь показана только небольшая их часть. Графики находятся в консоли Яндекс.Облака, в разделе «Статистика».
Мы расскажем про четыре самых полезных на наш взгляд графика:
Мы обсудили более-менее общие моменты, теперь перейдём к деталям.
Как в Yandex Message Queue используется Yandex Database
Сервис Yandex Message Queue построен поверх геораспределённой отказоустойчивой базы данных Yandex Database (YDB), которая обеспечивает строгую консистентность и поддержку ACID-транзакций. Мы не будем сейчас разбирать её устройство и характеристики, ограничимся общей схемой.
Очередь в YMQ состоит из логических шардов, представленных неким фиксированным набором таблиц YDB. Каждая таблица хранит свою часть информации. Например, есть таблица общего состояния под названием State, которая хранит offset’ы и реальное количество сообщений. Есть таблица с данными и метаданными сообщений. Есть таблица со связанными атрибутами.
Все основные операции с очередью — работа с сообщениями, изменение атрибутов, создание и удаление — это работа с иерархией таблиц и директорий YDB, либо транзакционные запросы к одной или нескольким таблицам очереди. Данные внутри таблиц очереди – источник абсолютной истины. Поэтому помимо корректной и стабильной работы БД нужно обеспечивать надёжное хранение и высокую доступность данных.
У нас информация хранится в нескольких репликах: по одной копии в каждом из трёх дата-центров Яндекса. В случае недоступности одного из дата-центров количество реплик в оставшихся удваивается. Таким образом восстанавливается требуемый уровень надежности. Даже если выйдет из строя целый дата-центр и одна стойка сервиса в другом, данные будут полностью доступны.
Первый вариант архитектуры Yandex Message Queue
Первый вариант архитектуры YMQ, который мы сами назвали наивным, выглядел вот так.
Схема показывает путь HTTPS-запроса от клиента YMQ до хранилища YDB. Посмотрим на основные компоненты:
Архитектура Yandex Message Queue с мастерами очередей
Нагрузочные стрельбы показали, что первый вариант архитектуры выдерживает около 450 сообщений в секунду на очередь с одним шардом. Это было очень мало.
Основной проблемой стал contention запросов. Большое количество логически конфликтующих транзакций быстро приводило кэши скрытых сообщений в несогласованное состояние. Для решения проблемы мы ввели особую сущность – мастер очередей (queue master).
Мастер очереди – актор, который в обычных условиях существует в кластере в единственном экземпляре и пропускает через себя все запросы, связанные с конкретной очередью. Если запрос к очереди приходит на сервер, где нужный мастер отсутствует, специальный прокси-актор перенаправляет запрос, а затем транслирует обратно полученный от мастера ответ.
При использовании мастера очередей правильный кэш незаблокированных сообщений снижает contention при работе с таблицами. Упрощается реализация ограничения потока запросов, например, через Leaky bucket. Доступны быстрые и точные метрики очереди: количество сообщений, общий трафик и тому подобное. Можно группировать однотипные запросы.
В теории у такой архитектуры есть определенные недостатки, связанные с централизацией:
Батчинг запросов в мастерах очередей
Распределенные транзакции с таблицами базы данных ведут к определенным дополнительным расходам, поэтому идея уменьшить количество запросов казалась нам логичной. Сто транзакций на запись сообщений по одному лучше превратить в одну транзакцию на запись ста сообщений сразу. С мастерами очередей внедрить такую пакетную обработку (батчинг, batching) намного проще.
Батчинг несколько увеличивает задержки (latency) при выполнении операций. Взамен значительно увеличивается пропускная способность. С батчингом одношардовая очередь может обработать до 30 000 запросов в секунду.
Вообще загрузка у очередей бывает очень разной: и тысячи сообщений в секунду, и несколько сообщений в день. Нам нужно было оптимизировать работу с очередями с помощью гибкого алгоритма. Лобовые варианты с накоплением сообщений в буфере до порогового количества или сбросом по таймеру нас не устраивали. Поэтому мы разработали для YMQ алгоритм адаптивного батчинга, который хорошо работает в обоих случаях. Его работа показана в формате временной диаграммы.
Здесь при поступлении нового сообщения возможен один из трёх сценариев:
Что происходит с мастерами при возникновении проблем
В Yandex Message Queue, как и в любой распределённой системе, могут возникать чрезвычайные ситуации. Отказывают серверы, тормозят диски, рвётся сеть внутри и между дата-центрами.
В подобных случаях YDB в течение нескольких секунд автоматически переносит затронутые сбоем таблетки на более подходящие серверы внутри кластера. Мастера очередей YMQ переносятся вместе со своими таблетками.
Не во всех случаях возможно достоверно определить статус сервера по сети, поэтому бывают ситуации, когда новый мастер уже запущен, а старый ещё не прекратил работу.
Для YMQ это не проблема. Запросы в базу не делают предположений о верности кэша видимых сообщений и проверяют каждое из них заново в процессе скрытия. Поэтому существование «лишних» мастеров приводит только к небольшому временному снижению производительности.
Как мы добились отказоустойчивости при создании очереди
В YDB невозможно создать несколько таблиц и модифицировать данные в рамках одной транзакции. Для нас это означало, что очередь, которая физически является набором таблиц, нельзя создать «транзакционно». При гонке в параллельных запросах или при отказах машин можно получить неконсистентное состояние, из которого невозможно выбраться без постороннего вмешательства. Мы подумали и разработали вот такую схему для решения проблемы.
Основная идея такова: для каждого запроса на создание очереди необходимые структуры данных очереди создаются параллельно и независимо. Таким образом создаются версии, которые в конце «коммитятся» в виде строки в специальную таблицу. Выбирается версия-победитель, а все «проигравшие» запросы понимают, какая версия «победила», и возвращают корректную ссылку.
Такой алгоритм в парадигме «всё или ничего» устойчив к отказам по причине независимости создаваемых структур и наличия финальной транзакции с коммитом версии. Если коммит завершился успешно, можно считать, что запрашиваемая очередь создана правильно.
Как в Yandex Message Queue организованы тестирование и мониторинг
Yandex Message Queue – сложный программно-аппаратный комплекс. У него много возможных точек отказа. Мы должны быть уверены в качестве сервиса, который предоставляем. Поэтому мы регулярно его тестируем.
В первую очередь мы отслеживаем:
В заключение
Задача инфраструктурных команд в Яндексе – создавать и поддерживать надёжные, масштабируемые и производительные решения, на основе которых можно быстро и успешно запускать новые продукты, улучшающие жизнь конечных пользователей. Внутри компании наш сервис очередей давно доказал свою полезность и стал частью архитектуры Яндекс.Видео, Яндекс.Маркета, Яндекс.Образования, Яндекс.Такси и других служб.
Теперь он доступен в экосистеме Яндекс.Облака и его можно использовать для построения сервисов внутри и вне самого Облака. Сейчас новые пользователи при регистрации получают денежный грант на ознакомление, так что попробовать Yandex Message Queue можно бесплатно.