spark submit что это

Big Data Tools EAP 11: Zeppelin в DataGrip и spark-submit во всех поддерживаемых IDE

Только что вышло очередное обновление EAP 11 для плагина под названием Big Data Tools, доступного для установки в IntelliJ IDEA Ultimate, PyCharm, and DataGrip. Можно установить его через страницу плагина на сайте или внутри IDE.

Big Data Tools — это плагин, позволяющий соединяться с кластерами Hadoop и Spark. Он предоставляет мониторинг узлов, приложений и отдельных задач. Кроме того, в IDEA и DataGrip можно создавать, запускать и редактировать ноутбуки Zeppelin. Можно не переключаться на веб-интерфейс Zeppelin и спокойно работать, не выходя из любимого IDE. Плагин позволяет удобно перемещаться по коду, делать умное автодополнение, рефакторинги и квик-фиксы прямо внутри ноутбука.

spark submit что это. Смотреть фото spark submit что это. Смотреть картинку spark submit что это. Картинка про spark submit что это. Фото spark submit что это

Новый тип конфигурации запуска для spark-submit

Одно из важнейших улучшений в этом релизе — возможность запускать Spark-приложения из IDE, без необходимости набирать команды в консоли. Эта функциональность доступна для всех поддерживаемых IDE, включая PyCharm. Напоминаю, скрипт spark-submit лежит в директории bin дистрибутива Spark и используется для запуска приложений на кластере. Он использует все поддерживаемые типы кластеров через единый интерфейс: благодаря этому не нужно несколько раз по-разному перенастраивать своё приложение. Удобно.

Большая проблема пользователей spark-submit в том, что его использование сопряжено с рядом ритуальных рутинных действий. Нужно вручную собрать все артефакты, скопировать их на целевой сервер по SSH, запустить spark-submit с кучей параметров. Обычно всё это заканчивается написанием пачки bash-скриптов на все случаи жизни, которые ты теперь обязан поддерживать. В целом, эта рутина крадет время разработчиков, которое можно было бы потратить на что-то более полезное.

spark submit что это. Смотреть фото spark submit что это. Смотреть картинку spark submit что это. Картинка про spark submit что это. Фото spark submit что это

Поддержка Apache Zeppelin в DataGrip

Теперь можно использовать интеграцию с Zeppelin внутри DataGrip. Для пользователей DataGrip это означает, что наконец-то можно нормально визуализировать данные. Кроме того, писать код на SQL куда легче с новым умным автодополнением прямо внутри интерактивного блокнота.

Надо понимать, что языковая поддержка сейчас ограничена только SQL. Например, Matplotlib или анализ кода на Scala в DataGrip у вас не заработают.

Изменения

В этом обновлении мы сконцентрировались на улучшении существующей функциональности. Полный список изменений можно прочитать по ссылке, а дальше будут перечислены только самые важные.

Новая функциональность

Улучшения в интерфейсе

Список исправленных багов

Документация и социальные сети

Ну и наконец, если вам нужно разобраться функциональностью Big Data Tools, у нас есть подробная документация. Если хочется задать вопрос, можно сделать это прямо в комментариях на Хабре или перейти в наш Twitter.

Надеемся, что все эти улучшения окажутся полезными, позволят вам делать более интересыне вещи, и более приятным способом.

Источник

Запускаем Apache Spark на Kubernetes

Дорогие читатели, доброго дня. Сегодня поговорим немного про Apache Spark и его перспективы развития.

spark submit что это. Смотреть фото spark submit что это. Смотреть картинку spark submit что это. Картинка про spark submit что это. Фото spark submit что это

В современном мире Big Data Apache Spark является де факто стандартом при разработке задач пакетной обработки данных. Помимо этого, он также используется для создания стриминговых приложений, работающих в концепции micro batch, обрабатывающих и отгружающих данные маленькими порциями (Spark Structured Streaming). И традиционно он являлся частью общего стека Hadoop, используя в качестве менеджера ресурсов YARN (или, в некоторых случаях, Apache Mesos). К 2020 году его использование в традиционном виде для большинства компаний находится под большим вопросом в виду отсутствия приличных дистрибутивов Hadoop — развитие HDP и CDH остановлено, CDH недостаточно проработан и имеет высокую стоимость, а остальные поставщики Hadoop либо прекратили своё существование, либо имеют туманное будущее. Поэтому всё больший интерес у сообщества и крупных компаний вызывает запуск Apache Spark с помощью Kubernetes — став стандартом в оркестрации контейнеров и управлении ресурсами в приватных и публичных облаках, он решает проблему с неудобным планированием ресурсов задач Spark на YARN и предоставляет стабильно развивающуюся платформу с множеством коммерческих и открытых дистрибутивов для компаний всех размеров и мастей. К тому же на волне популярности большинство уже успело обзавестись парой-тройкой своих инсталляций и нарастить экспертизу в его использовании, что упрощает переезд.

Начиная с версии 2.3.0 Apache Spark обзавёлся официальной поддержкой запуска задач в кластере Kubernetes и сегодня, мы поговорим о текущей зрелости данного подхода, различных вариантах его использования и подводных камнях, с которыми предстоит столкнуться при внедрении.

Прежде всего, рассмотрим процесс разработки задач и приложений на базе Apache Spark и выделим типовые случаи, в которых требуется запустить задачу на кластере Kubernetes. При подготовке данного поста в качестве дистрибутива используется OpenShift и будут приведены команды, актуальные для его утилиты командной строки (oc). Для других дистрибутивов Kubernetes могут быть использованы соответствующие команды стандартной утилиты командной строки Kubernetes (kubectl) либо их аналоги (например, для oc adm policy).

Первый вариант использования — spark-submit

В процессе разработки задач и приложений разработчику требуется запускать задачи для отладки трансформации данных. Теоретически для этих целей могут быть использованы заглушки, но разработка с участием реальных (пусть и тестовых) экземпляров конечных систем, показала себя в этом классе задач быстрее и качественнее. В том случае, когда мы производим отладку на реальных экземплярах конечных систем, возможны два сценария работы:

spark submit что это. Смотреть фото spark submit что это. Смотреть картинку spark submit что это. Картинка про spark submit что это. Фото spark submit что это

Первый вариант имеет право на существование, но влечёт за собой ряд недостатков:

Расскажем подробнее о процессе настройки Spark для локального запуска. Чтобы начать пользоваться Spark его требуется установить:

Собираем необходимые пакеты для работы с Kubernetes:

Полная сборка занимает много времени, а для создания образов Docker и их запуска на кластере Kubernetes в реальности нужны только jar файлы из директории «assembly/», поэтому можно собрать только данный подпроект:

Для запуска задач Spark в Kubernetes требуется создать образ Docker, который будет использоваться в качестве базового. Здесь возможны 2 подхода:

С её помощью можно создавать образы Docker и осуществлять их загрузку в удалённые реестры, но по умолчанию она имеет ряд недостатков:

С её помощью собираем базовый образ Spark, содержащий в себе тестовую задачу для вычисления числа Pi с помощью Spark (здесь — URL вашего реестра образов Docker, — имя репозитория внутри реестра, совпадающее с проектом в OpenShift, — имя образа (если используется трёхуровневое разделение образов, например, как в интегрированном реестре образов Red Hat OpenShift), — тег данной версии образа):

Авторизуемся в кластере OKD с помощью консольной утилиты (здесь — URL API кластера OKD):

Получим токен текущего пользователя для авторизации в Docker Registry:

Авторизуемся во внутреннем Docker Registry кластера OKD (в качестве пароля используем токен, полученный с помощью предыдущей команды):

Загрузим собранный образ Docker в Docker Registry OKD:

Проверим, что собранный образ доступен в OKD. Для этого откроем в браузере URL со списком образов соответствующего проекта (здесь — имя проекта внутри кластера OpenShift, — URL Web консоли OpenShift) — https:///console/project//browse/images/.

Для запуска задач должен быть создан сервисный аккаунт с привилегиями запуска подов под root (в дальнейшем обсудим этот момент):

Выполним команду spark-submit для публикации задачи Spark в кластере OKD, указав созданный сервисный аккаунт и образ Docker:

—name — имя задачи, которое будет участвовать в формировании имени подов Kubernetes;

—class — класс исполняемого файла, вызываемый при запуске задачи;

—conf — конфигурационные параметры Spark;

spark.executor.instances — количество запускаемых экзекьюторов Spark;

spark.kubernetes.authenticate.driver.serviceAccountName — имя служебной учётной записи Kubernetes, используемой при запуске подов (для определения контекста безопасности и возможностей при взаимодействии с API Kubernetes);

spark.kubernetes.namespace — пространство имён Kubernetes, в котором будут запускаться поды драйвера и экзекьютеров;

spark.submit.deployMode — способ запуска Spark (для стандартного spark-submit используется «cluster», для Spark Operator и более поздних версий Spark «client»);

spark.kubernetes.container.image — образ Docker, используемый для запуска подов;

spark.master — URL API Kubernetes (указывается внешний так обращение происходит с локальной машины);

local:// — путь до исполняемого файла Spark внутри образа Docker.

Переходим в соответствующий проект OKD и изучаем созданные поды — https:///console/project//browse/pods.

Для упрощения процесса разработки может быть использован ещё один вариант, при котором создаётся общий базовый образ Spark, используемый всеми задачами для запуска, а снэпшоты исполняемых файлов публикуются во внешнее хранилище (например, Hadoop) и указываются при вызове spark-submit в виде ссылки. В этом случае можно запускать различные версии задач Spark без пересборки образов Docker, используя для публикации образов, например, WebHDFS. Отправляем запрос на создание файла (здесь — хост сервиса WebHDFS, — порт сервиса WebHDFS, — желаемый путь к файлу на HDFS):

При этом будет получен ответ вида (здесь — это URL, который нужно использовать для загрузки файла):

Загружаем исполняемый файл Spark в HDFS (здесь — путь к исполняемому файлу Spark на текущем хосте):

После этого можем делать spark-submit с использованием файла Spark, загруженного на HDFS (здесь — имя класса, который требуется запустить для выполнения задачи):

При этом надо заметить, что для доступа к HDFS и обеспечения работы задачи может потребоваться изменить Dockerfile и скрипт entrypoint.sh — добавить в Dockerfile директиву для копирования зависимых библиотек в директорию /opt/spark/jars и включить файл конфигурации HDFS в SPARK_CLASSPATH в entrypoint.sh.

Второй вариант использования — Apache Livy

Далее, когда задача разработана и требуется протестировать полученный результат, возникает вопрос её запуска в рамках процесса CI/CD и отслеживания статусов её выполнения. Конечно, можно запускать её и с помощью локального вызова spark-submit, но это усложняет инфраструктуру CI/CD поскольку требует установку и конфигурацию Spark на агентах/раннерах CI сервера и настройки доступа к API Kubernetes. Для данного случая целевой реализацией выбрано использование Apache Livy в качестве REST API для запуска задач Spark, размещённого внутри кластера Kubernetes. С его помощью можно запускать задачи Spark на кластере Kubernetes используя обычные cURL запросы, что легко реализуемо на базе любого CI решения, а его размещение внутри кластера Kubernetes решает вопрос аутентификации при взаимодействии с API Kubernetes.

spark submit что это. Смотреть фото spark submit что это. Смотреть картинку spark submit что это. Картинка про spark submit что это. Фото spark submit что это

Выделим его в качестве второго варианта использования — запуск задач Spark в рамках процесса CI/CD на кластере Kubernetes в тестовом контуре.

Немного про Apache Livy — он работает как HTTP сервер, предоставляющий Web интерфейс и RESTful API, позволяющий удалённо запустить spark-submit, передав необходимые параметры. Традиционно он поставлялся в составе дистрибутива HDP, но также может быть развёрнут в OKD или любой другой инсталляции Kubernetes с помощью соответствующего манифеста и набора образов Docker, например, этого — github.com/ttauveron/k8s-big-data-experiments/tree/master/livy-spark-2.3. Для нашего случая был собран аналогичный образ Docker, включающий в себя Spark версии 2.4.5 из следующего Dockerfile:

Созданный образ может быть собран и загружен в имеющийся у вас репозиторий Docker, например, внутренний репозиторий OKD. Для его развёртывания используется следующий манифест ( — URL реестра образов Docker, — имя образа Docker, — тег образа Docker, — желаемый URL, по которому будет доступен сервер Livy; манифест «Route» применяется в случае, если в качестве дистрибутива Kubernetes используется Red Hat OpenShift, в противном случае используется соответствующий манифест Ingress или Service типа NodePort):

После его применения и успешного запуска пода графический интерфейс Livy доступен по ссылке: http:///ui. С помощью Livy мы можем опубликовать нашу задачу Spark, используя REST запрос, например, из Postman. Пример коллекции с запросами представлен ниже (в массиве «args» могут быть переданы конфигурационные аргументы с переменными, необходимыми для работы запускаемой задачи):

Выполним первый запрос из коллекции, перейдём в интерфейс OKD и проверим, что задача успешно запущена — https:///console/project//browse/pods. При этом в интерфейсе Livy (http:///ui) появится сессия, в рамках которой с помощью API Livy или графического интерфейса можно отслеживать ход выполнения задачи и изучать логи сессии.

Теперь покажем механизм работы Livy. Для этого изучим журналы контейнера Livy внутри пода с сервером Livy — https:///console/project//browse/pods/?tab=logs. Из них видно, что при вызове REST API Livy в контейнере с именем «livy» выполняется spark-submit, аналогичный используемому нами выше (здесь — имя созданного пода с сервером Livy). В коллекции также представлен второй запрос, позволяющий запускать задачи с удалённым размещением исполняемого файла Spark с помощью сервера Livy.

Третий вариант использования — Spark Operator

Теперь, когда задача протестирована, встаёт вопрос её регулярного запуска. Нативным способом для регулярного запуска задач в кластере Kubernetes является сущность CronJob и можно использовать её, но в данный момент большую популярность имеет использование операторов для управления приложениями в Kubernetes и для Spark существует достаточно зрелый оператор, который, в том числе, используется в решениях Enterprise уровня (например, Lightbend FastData Platform). Мы рекомендуем использовать его — текущая стабильная версия Spark (2.4.5) имеет достаточно ограниченные возможности по конфигурации запуска задач Spark в Kubernetes, при этом в следующей мажорной версии (3.0.0) заявлена полноценная поддержка Kubernetes, но дата её выхода остаётся неизвестной. Spark Operator компенсирует этот недостаток, добавляя важные параметры конфигурации (например, монтирование ConfigMap с конфигурацией доступа к Hadoop в поды Spark) и возможность регулярного запуска задачи по расписанию.

spark submit что это. Смотреть фото spark submit что это. Смотреть картинку spark submit что это. Картинка про spark submit что это. Фото spark submit что это
Выделим его в качестве третьего варианта использования — регулярный запуск задач Spark на кластере Kubernetes в продуктивном контуре.

Spark Operator имеет открытый исходный код и разрабатывается в рамках Google Cloud Platform — github.com/GoogleCloudPlatform/spark-on-k8s-operator. Его установка может быть произведена 3 способами:

Если оператор установлен корректно, то в соответствующем проекте появится активный под с оператором Spark (например, cloudflow-fdp-sparkoperator в пространстве Cloudflow для установки Cloudflow) и появится соответствующий тип ресурсов Kubernetes с именем «sparkapplications». Изучить имеющиеся приложений Spark можно следующей командой:

Для запуска задач с помощью Spark Operator требуется сделать 3 вещи:

В данном манифесте указана сервисная учётная запись, для которой требуется до публикации манифеста создать необходимые привязки ролей, предоставляющие необходимые права доступа для взаимодействия приложения Spark с API Kubernetes (если нужно). В нашем случае приложению нужны права на создание Pod’ов. Создадим необходимую привязку роли:

Также стоит отметить, что в спецификации данного манифеста может быть указан параметр «hadoopConfigMap», который позволяет указать ConfigMap с конфигурацией Hadoop без необходимости предварительного помещения соответствующего файла в образ Docker. Также он подходит для регулярного запуска задач — с помощью параметра «schedule» может быть указано расписание запуска данной задачи.

После этого сохраняем наш манифест в файл spark-pi.yaml и применяем его к нашему кластеру Kubernetes:

При этом создастся объект типа «sparkapplications»:

При этом будет создан под с приложением, статус которого будет отображаться в созданном «sparkapplications». Его можно посмотреть следующей командой:

По завершении задачи POD перейдёт в статус «Completed», который также обновится в «sparkapplications». Логи приложения можно посмотреть в браузере или с помощью следующей команды (здесь — имя пода запущенной задачи):

Также управление задачами Spark может быть осуществлено с помощью специализированной утилиты sparkctl. Для её установки клонируем репозиторий с её исходным кодом, установим Go и соберём данную утилиту:

Изучим список запущенных задач Spark:

Создадим описание для задачи Spark:

Запустим описанную задачу с помощью sparkctl:

Изучим список запущенных задач Spark:

Изучим список событий запущенной задачи Spark:

Изучим статус запущенной задачи Spark:

В заключение хотелось бы рассмотреть обнаруженные минусы эксплуатации текущей стабильной версии Spark (2.4.5) в Kubernetes:

Источник

Submitting Applications

The spark-submit script in Spark’s bin directory is used to launch applications on a cluster. It can use all of Spark’s supported cluster managers through a uniform interface so you don’t have to configure your application especially for each one.

Bundling Your Application’s Dependencies

If your code depends on other projects, you will need to package them alongside your application in order to distribute the code to a Spark cluster. To do this, create an assembly jar (or “uber” jar) containing your code and its dependencies. Both sbt and Maven have assembly plugins. When creating assembly jars, list Spark and Hadoop as provided dependencies; these need not be bundled since they are provided by the cluster manager at runtime. Once you have an assembled jar you can call the bin/spark-submit script as shown here while passing your jar.

Launching Applications with spark-submit

Once a user application is bundled, it can be launched using the bin/spark-submit script. This script takes care of setting up the classpath with Spark and its dependencies, and can support different cluster managers and deploy modes that Spark supports:

Some of the commonly used options are:

A common deployment strategy is to submit your application from a gateway machine that is physically co-located with your worker machines (e.g. Master node in a standalone EC2 cluster). In this setup, client mode is appropriate. In client mode, the driver is launched directly within the spark-submit process which acts as a client to the cluster. The input and output of the application is attached to the console. Thus, this mode is especially suitable for applications that involve the REPL (e.g. Spark shell).

Alternatively, if your application is submitted from a machine far from the worker machines (e.g. locally on your laptop), it is common to use cluster mode to minimize network latency between the drivers and the executors. Currently, the standalone mode does not support cluster mode for Python applications.

Master URLs

The master URL passed to Spark can be in one of the following formats:

Loading Configuration from a File

The spark-submit script can load default Spark configuration values from a properties file and pass them on to your application. By default, it will read options from conf/spark-defaults.conf in the Spark directory. For more detail, see the section on loading default configurations.

Advanced Dependency Management

Spark uses the following URL scheme to allow different strategies for disseminating jars:

Note that JARs and files are copied to the working directory for each SparkContext on the executor nodes. This can use up a significant amount of space over time and will need to be cleaned up. With YARN, cleanup is handled automatically, and with Spark standalone, automatic cleanup can be configured with the spark.worker.cleanup.appDataTtl property.

More Information

Once you have deployed your application, the cluster mode overview describes the components involved in distributed execution, and how to monitor and debug applications.

Источник

Подготовка приложений Spark Streaming к использованию в рабочей среде

spark submit что это. Смотреть фото spark submit что это. Смотреть картинку spark submit что это. Картинка про spark submit что это. Фото spark submit что это

Всех желающих приглашаем на бесплатный вебинар «Тестирование Spark приложений». На открытом уроке рассмотрим проблемы в тестировании Spark приложений: стат данные, частичную проверку и запуск/остановку тяжелых систем. Изучим библиотеки для решения и напишем тесты.

Проект Apache Spark стал одним из основных инструментов в наборе средств инженеров по обработке больших данных. Он включает широкий ряд возможностей: от высокопроизводительного ядра пакетной обработки до ядра потоковой передачи в режиме, близком к реальному времени.

Spark Streaming

Наша компания Clairvoyant работает с клиентами, бизнес-задачи которых требуют создания высокопроизводительных систем для обработки больших данных в режиме реального времени. В число таких задач входят, например, системы оповещения, обработка данных Интернета вещей и многие другие. Мы пробовали разные технологии, включая Apache Nifi, Apache Flume, Apache Flink и др. Однако одно из любимых наших решений — это Spark Streaming.

Spark Streaming — это расширение Core Apache Spark для масштабируемой, высокопроизводительной и устойчивой к сбоям обработки потоков данных в режиме реального времени. Некоторые возможные источники таких потоков данных приведены на схеме ниже.

spark submit что это. Смотреть фото spark submit что это. Смотреть картинку spark submit что это. Картинка про spark submit что это. Фото spark submit что это

Внутренние процессы Spark Streaming реализованы с использованием архитектуры микропакетов. Это означает, что периодически (каждые X секунд) Spark Streaming активирует задание для выполнения ядром Spark Engine. В течение этого времени Spark принимает сообщения из какого-то источника, обрабатывает данные с помощью определенного пользователем направленного ациклического графа (Directed Acyclic Graph, DAG) и сохраняет данные в расположении, указанном в качестве приемника.

spark submit что это. Смотреть фото spark submit что это. Смотреть картинку spark submit что это. Картинка про spark submit что это. Фото spark submit что это

При реализации решений с использованием Spark Streaming для больших данных мы обнаружили, что для эффективной работы Spark в рабочем кластере необходимы некоторые дополнительные шаги. Эти шаги описаны в данной статье.

Начальный код

Раз речь идет о том, чтобы взять задание Spark Streaming и подготовить его к использованию в рабочей среде, прежде всего нам потребуется задание Spark Streaming, которое мы будем улучшать. Ниже представлен код, который мы будем использовать как отправную точку:

Этот код выполняет следующие действия:

Создает StreamingContext и определяет интервал между пакетами, равный 2 секундам

Устанавливает соединение с Kafka и создает поток DStream

Выполняет подсчет слов на RDD в DStream

Выводит результаты на консоль

То есть это простой пример подсчета слов, использующий Apache Kafka в качестве источника.

Использование режима кластера YARN

Сначала рассмотрим, как запускается приложение Spark:

Сборка файла JAR (или файла Python)

Выполнение команды spark-submit:

В этой команде spark-submit в качестве параметра master мы указали local[4]. Это означает, что приложение Spark запускается в локальном режиме, а не в кластере, где находятся данные.

Рассмотрим архитектуру Spark:

spark submit что это. Смотреть фото spark submit что это. Смотреть картинку spark submit что это. Картинка про spark submit что это. Фото spark submit что это

На приведенной схеме присутствует процесс Spark Driver. Это управляющий (master) процесс, который содержит все процедуры и задания, которые надлежит выполнить (направленные ациклические графы — DAG, определенные пользователем в коде Java, Scala или Python). Управляющий процесс передает исполнительным процессам (Executor) задачи, которые надлежит выполнить, и контролирует их успешное выполнение, прежде чем будет завершен сам.

Почти во всех случаях, с которыми мы сталкивались, приложения Spark выполнялись в кластере больших данных Hadoop, на котором доступен модуль YARN (Yet Another Resource Negotiator — «еще один ресурсный посредник»). Поэтому, когда ваш код будет протестирован и готов к переносу в рабочую среду, имеет смысл использовать YARN в качестве менеджера ресурсов для выделения исполнительных ресурсов вашим процессам Spark Driver и Executor. Для этого следует указать YARN в качестве master:

Версии Spark до 1.6.3

YARN в режиме клиента: —master yarn-client

YARN в режиме кластера: —master yarn-cluster

Версии Spark до 2.0

Доступны 2 режима: клиент и кластер. Они отличаются друг от друга местом выполнения управляющего процесса Spark Driver: на клиенте или в кластере. Рассмотрим этот момент подробнее.

YARN в режиме клиента

spark submit что это. Смотреть фото spark submit что это. Смотреть картинку spark submit что это. Картинка про spark submit что это. Фото spark submit что это

В режиме клиента процесс Spark Driver запускается на компьютере-клиенте (или на том же компьютере, с которого выполнена команда spark-submit). Как показывает наша практика, большинство организаций запускают все свои приложения Spark в этом режиме. Это вполне подходящее решение для выполнения процессов пакетной обработки с помощью Spark. Однако если действовать так же с приложениями Spark Streaming, возникает проблема.

Приложения Spark Streaming — это процессы, которые в принципе должны выполняться бесконечно. Но что если компьютер, на котором выполняется приложение Spark Streaming, будет выключен? Это приведет к прекращению работы приложения.

YARN в режиме кластера

spark submit что это. Смотреть фото spark submit что это. Смотреть картинку spark submit что это. Картинка про spark submit что это. Фото spark submit что это

В режиме кластера управляющий процесс Spark Driver выполняется в контейнере в YARN. Теперь, если возникнут какие-либо сбои, YARN справится с ними. Если компьютер, на котором выполняется управляющий процесс, будет отключен, то процесс будет автоматически перезапущен на другом узле.

Полезные параметры конфигурации

Максимальное число попыток отправки приложения. Число не должно быть больше, чем общее максимальное число попыток в конфигурации YARN.

Определение периода достоверности для отслеживания сбоев процесса Application Master (AM). Если процесс AM выполняется в течение этого периода, то счетчик сбоев AM обнуляется. Данная функция активна, только если настроена.

Таким образом, если задать в настройках значения:

то каждый час программа будет выполнять две попытки запустить приложение.

Настройка параметров конфигурации

В команде spark-submit:

Корректное завершение работы приложения потоковой передачи

Мы уже научились запускать приложение в корректном режиме, а теперь обсудим, как правильно завершить работу приложения Spark Streaming для больших данных, если мы захотим развернуть новые возможности, внести изменения в конфигурацию и т. п.

В настоящее время YARN позволяет завершить работу приложения Spark Streaming следующей командой:

Но что будет, если выполнить эту команду и завершить работу приложения в момент, когда выполняется обработка микропакета Spark Streaming?

Говоря коротко, ответ таков: данные, которые вы обрабатываете, будут потеряны.

Кроме того, учитывая, как система Spark получает сообщения от кластера Kafka (она сперва посылает в Kafka подтверждение получения сообщений, а затем обрабатывает их), при перезапуске приложения Spark Streaming оно пропустит сообщения, которые обрабатывались в момент отключения, и начнет обработку с сообщения, которое поступило следующим.

Чтобы решить эту проблему, необходимо реализовать процесс корректного завершения работы, который гарантирует, что приложение Spark Streaming может быть закрыто только в промежуток между микропакетами, чтобы не потерять данные.

Первый шаг по реализации корректного завершения работы для нашего начального кода будет следующим:

Вместо этого для корректного завершения работы приложения Spark Streaming мы выполним следующие шаги:

При запуске Spark Streaming: создать пустой файл в HDFS.

В коде Spark Code: периодически проверять, существует ли еще этот пустой файл. Если пустой файл не существует, запустить процесс корректного завершения работы.

Для остановки: удалить пустой файл и дождаться, пока будет выполнено корректное завершение работы.

Совет: создайте скрипт в оболочке для выполнения этих операций запуска и остановки.

Понадобится написать примерно такой код для Spark:

Первое изменение — добавление глобальной переменной, которая указывает, что мы приступаем к завершению работы приложения. Затем следует заменить процесс awaitTermination на цикл while. В этом цикле мы будем периодически проверять, существует ли еще файл в HDFS. Если файл отсутствует, то значение глобальной переменной меняется на true и заданная в цикле while логика выполняет команду остановки в контексте StreamingContext.

Мониторинг приложения для потоковой передачи больших данных

Как и с любым важным приложением, вам понадобится возможность убедиться, что ваше приложение выполняется, причем корректно. С приложением Spark Streaming это можно сделать несколькими способами.

Мониторинг в ходе выполнения

Прослушиватель StreamingListener (Spark ≥ 2.1)

В версиях Apache Spark начиная с 2.1 поддерживается добавление прослушивателей, которые запускают события на различных этапах запуска и выполнения приложения Spark Streaming. Вот некоторые из доступных прослушивателей:

Эти прослушиватели позволяют вручную реализовать процесс для отправки различных метрик службе мониторинга, которую использует ваша организация. В прошлом мы использовали этот подход для отправки метрик по каждому микропакету (количество полученных сообщений, время обработки, различные ошибки и т. п.) в реляционную базу данных. Затем мы делали запросы к этой базе, чтобы убедиться, что процесс выполняется с приемлемой производительностью.

Пользовательский интерфейс Spark

spark submit что это. Смотреть фото spark submit что это. Смотреть картинку spark submit что это. Картинка про spark submit что это. Фото spark submit что этоПользовательский интерфейс Spark

Интерфейс Spark входит в комплектацию Apache Spark и содержит некоторые весьма полезные сведения. Выше приведено одно из нескольких и, пожалуй, самое важное представление интерфейса: общее представление процессов потоковой передачи. Оно содержит информацию о каждом микропакете. Отображаются сведения о том, сколько записей обработано микропакетом, сколько времени это заняло, какой была задержка при запуске микропакета и т. п. В целом это отличный способ подтвердить, что ваше приложение Spark Streaming выполняется и демонстрирует надлежащую производительность.

Использование контрольных точек

Возможно, вам уже знакомо стандартное применение контрольных точек для пакетов Apache Spark. При этой методике данные, содержащиеся в RDD или DataFrame, сохраняются на диск между выполнением задач, содержащихся в RDD. Благодаря этому в случае сбоя исполнительного процесса Spark может просто возобновить его начиная с такой контрольной точки, а не перезапускать исполнение RDD или DataFrame с начала.

Эта функция вполне применима к Spark Streaming, однако есть и другой способ использования контрольных точек, который может быть полезен для приложений Spark Streaming, — контрольные точки метаданных.

Контрольные точки метаданных

Речь идет о сохранении метаданных, которые определяют вычисления, связанные с потоковой передачей, в устойчивом к сбоям хранилище, например HDFS. Этот способ используется для восстановления после сбоя узла, на котором выполняется управляющий процесс приложения Spark Streaming. Метаданные включают, в частности, следующее:

конфигурации. Настройки конфигурации, которые использовались для создания приложения потоковой передачи;

операции DStream. Набор операций DStream, которые определяют приложение потоковой передачи;

незавершенные пакеты. Пакеты, задания которых поставлены в очередь, но еще не выполнены.

Этот способ применения контрольных точек требуется также в случае, когда нужно выполнять трансформации с сохранением состояния, например updateStateByKey или reduceByKeyAndWindow.

Включить использование контрольных точек в коде можно следующим образом:

Проблемы с контрольными точками метаданных

При использовании контрольных точек метаданных необходимо помнить о нескольких проблемных моментах.

Контрольные точки ломаются после обновления версии Spark

При обновлении версии Spark вам придется удалить контрольную точку вручную.

Контрольные точки необходимо удалять перед обновлением кода

Поскольку в контрольные точки метаданных входят контрольные точки реальных операций DStream, которые должны выполняться для входящих записей, для загрузки новых операций из обновленного кода необходимо удалить имеющиеся контрольные точки. В случае незначительной правки кода для использования чуть отличающихся операций, если вы развернете код заново, но не удалите контрольную точку, из старой контрольной точки будут загружаться старые операции, и внесенные вами изменения не будут применены.

Создание нескольких разделов в темах Kafka

Если вы используете Kafka в качестве источника данных для приложения Spark Streaming, целесообразно определить при создании темы Kafka несколько разделов. Ниже с помощью схем я объясню, как это сделать.

В команде для создания темы Kafka можно указать количество разделов (параметр выделен жирным шрифтом):

spark submit что это. Смотреть фото spark submit что это. Смотреть картинку spark submit что это. Картинка про spark submit что это. Фото spark submit что это

Когда данные передаются в тему Kafka, они автоматически распределяются между разделами согласно ключу, который вы определили в сообщении Kafka. Каждое сообщение добавляется в тему Kafka с определенным смещением или с идентификатором, указывающим его позицию в разделе. Если в качестве ключа задать null, то сообщение будет автоматически равномерно распределяться между разделами.

spark submit что это. Смотреть фото spark submit что это. Смотреть картинку spark submit что это. Картинка про spark submit что это. Фото spark submit что это

На схеме выше показано, как приложение Spark Streaming может действовать при обработке сообщений из темы Kafka с несколькими разделами. Каждого «потребителя» можно рассматривать как один из исполнительных процессов Spark. Каждый исполнительный процесс Spark может независимо загружать данные из определенной темы Kafka, вместо того чтобы использовать единый источник. При этом каждый раздел может существовать на отдельном экземпляре брокера Kafka (отдельном узле), что позволяет снизить нагрузку на узлы.

Использование прямых потоков с кластером Kafka

Если вы используете кластер Kafka, то стоит помнить, что существует 2 типа соединителей: поток через приемник и прямой поток. И если название этого раздела еще не дало ответ, вы можете спросить: «Какой из них мне следует использовать?», а также, возможно, «А в чем разница?». Разберем, чем они отличаются друг от друга.

Потоковая передача через приемник

spark submit что это. Смотреть фото spark submit что это. Смотреть картинку spark submit что это. Картинка про spark submit что это. Фото spark submit что это

Потоковая передача Spark Streaming через приемник — ссылка на источник

На самом деле, потоковая передача через приемник — это стандартная для Spark Streaming реализация получения данных из любого источника (она одинаковым образом используется для таких источников, как Twitter, Kinesis и др.). В каждом исполнительном процессе запущен и выполняется экземпляр приемника. В начале микропакета управляющий процесс запускает задание для исполнительного процесса. При этом активируется процесс-приемник в таком исполнительном процессе, использующий высокоуровневый API Kafka для загрузки последних данных из темы Kafka. Затем данные из приемника сохраняются в журнале упреждающего протоколирования (Write Ahead Log — WAL). Перед обновлением Kafka выполняется получение данных (это защищает их от потери). Когда данные благополучно сохранены в WAL, исполнительные процессы Spark переходят к обработке сообщений.

Такая стратегия обеспечивает защиту следующим образом: если в одном из исполнительных процессов возникает сбой, то вместо него порождается новый исполнительный процесс. Этот исполнительный процесс загружает данные, которые процесс, завершившийся сбоем, предварительно сохранил в WAL.

Примечание. Если вы хотите использовать поток, направляемый через приемник, надлежит сделать следующее:

активировать использование контрольных точек — это позволит записывать журналы упреждающего протоколирования WAL в каталог контрольных точек;

активировать WAL — упреждающее протоколирование не включено при потоковой передаче с использованием приемника по умолчанию. Чтобы включить его, задайте в конфигурации следующую настройку: spark.streaming.receiver.wrteAheadLog.enable=true;

задать необходимый уровень StorageLevel для WAL — поскольку данные уже сохранены в HDFS, можно отключить репликацию в памяти, чтобы не дублировать сохранение: StorageLevel.MEMROY_AND_DISK_SER.

Такой была первая реализация, доступная в Spark Streaming, и сейчас она по-прежнему работает. Почему же понадобилась другая реализация?

По следующим соображениям: кластер Kafka уже сохранял реплицированные копии данных в циклическом буфере, обеспечивая их высокую доступность. Зачем тогда нужен журнал WAL? Действительно: использование WAL несколько снижает производительность, так как после получения данных из кластера Kafka их требуется записать на диск. Поэтому была добавлена следующая возможность: прямой поток.

Прямой поток

spark submit что это. Смотреть фото spark submit что это. Смотреть картинку spark submit что это. Картинка про spark submit что это. Фото spark submit что это

Прямой поток Spark Streaming — ссылка на источник

При использовании прямого потока мы отказываемся от WAL, и роль WAL выполняет Kafka. Исполнение начинается с того, что управляющий процесс загружает задания в исполнительные процессы. Также он передает каждому из исполнительных процессов диапазон смещений, которые тот должен обрабатывать. Например, исполнительный процесс 1 может получить диапазон смещений 2000–2050, а исполнительный процесс 2 — диапазон смещений 2051–2100. Каждый исполнительный процесс выполняет загрузку в назначенный для него диапазон смещений и обрабатывает эти данные.

Такая стратегия обеспечивает защиту следующим образом: если в одном из исполнительных процессов возникает сбой, то вместо него порождается новый исполнительный процесс. Управляющий процесс назначает новому исполнительному процессу тот же диапазон смещений, что и предыдущему, и выполняется повторная попытка обработки этих данных.

Сохранение смещений Kafka

Большинство организаций и отдельных групп в составе организаций, с которыми мы сталкивались, рассчитывают получить с помощью Spark Streaming семантику доставки «только один раз». К сожалению, в распределенной системе, устойчивой к сбоям, добиться этого очень сложно. И уж во всяком случае, Spark Streaming не предоставляет такую семантику в качестве готовой возможности (доступные варианты — либо «минимум один раз», либо «максимум один раз»). Тем не менее Spark Streaming позволяет получить нужный результат с помощью определенных изменений.

Главное, что для этого требуется, — сохранить смещения Kafka, которые были успешно обработаны, после завершения обработки микропакета и загрузить при запуске приложения Spark Streaming последние завершенные смещения Kafka. Такое сохранение должно выполняться в том и только в том случае, если транзакция с входящими сообщениями была завершена успешно. Говоря конкретнее, вы должны сохранять смещения после идемпотентного вывода ИЛИ сохранять их посредством атомарной транзакции параллельно выводу.

В результате вы будете переходить от текущего набора данных к следующему только после того, как данные будут трансформированы и сохранены на ваш источник вывода. Используя прямые потоки с описанными выше изменениями, вы обеспечите устойчивость к сбоям при обработке всех данных в составе микропакета и реализуете желаемую семантику доставки «только один раз».

На схеме ниже показано, как при этом будет работать ваше приложение.

spark submit что это. Смотреть фото spark submit что это. Смотреть картинку spark submit что это. Картинка про spark submit что это. Фото spark submit что это

Приведенный ниже пример кода демонстрирует, как инициализировать прямой поток Kafka DStream, загружая смещения из команды loadOffsets:

Здесь предполагается, что для сохранения смещений используется Kudu (поэтому указан контекст kuduContext), но общая процедура будет работать с любой системой, в которой сохраняются смещения: Zookeeper, HBase, HDFS, Hive, Impala и др.

Стабилизация приложения для потоковой передачи больших данных

Прежде чем выпустить приложение в рабочую среду, стоит потратить немного времени на тестирование производительности. Главное, что вы должны при этом обеспечить, —

среднее время обработки пакета должно быть меньше интервала между пакетами.

Например, если вы установили интервал между пакетами, равный 30 секундам, то среднее время обработки одного микропакета должно быть менее 30 секунд.

В каждый момент времени обрабатывается только один микропакет. Поэтому, если обработка первого микропакета займет 40 секунд, то перед фактическим началом обработки второго пакета возникнет задержка. Если время обработки микропакетов постоянно будет составлять около 40 секунд, они будут прибывать быстрее, чем обрабатываться. В скором времени у вас скопятся десятки микропакетов, ожидающих обработки.

Опасность этой ситуации состоит в том, что микропакеты будут поступать, пока не заполнят всю динамическую память. В конце концов произойдет сбой приложения Spark.

В пользовательском интерфейсе Spark можно посмотреть, сколько времени занимает обработка микропакетов. На изображениях ниже показано задание, для которого задан интервал между пакетами 10 секунд, а время обработки иногда превышает 10 секунд, в результате чего возникает скачок времени задержки.

spark submit что это. Смотреть фото spark submit что это. Смотреть картинку spark submit что это. Картинка про spark submit что это. Фото spark submit что этоСтатистика потоковой передачи

Если окажется, что время обработки ваших микропакетов постоянно превышает интервал между пакетами, можно применить описанные ниже стратегии.

Оптимизация операций (трансформации, присоединения и записи)

Стоит проанализировать, какие операции выполняются вашим приложением Spark Streaming. Если вы сохраняете данные во внешнюю базу данных, которая требует индексации, то операция сохранения может оказывать критическое влияние на производительность приложения. Также можно проверить, насколько эффективно выполняются присоединения (join) и можно ли оптимизировать производительность, расположив те или иные наборы данных справа или слева при присоединении.

Реализация кэширования

Если вы несколько раз обрабатываете один и тот же источник RDD/DataFrame, значительно улучшить ситуацию может кэширование результата RDD/DataFrame в памяти.

Повышение объемов параллельной обработки

Пропускная способность приложения при работе с Kafka может страдать, если используется недостаточное количество разделов. Если в теме определен только один раздел или слишком мало брокеров, это может сократить время доставки данных исполнительным процессам Spark.

Также может оказаться, что в приложении Spark Streaming не хватает исполнительных процессов для максимально эффективной обработки всех поступающих данных. Добавление исполнительных процессов может помочь в такой ситуации.

Перераспределение данных по разделам

Если выяснится, что данные несбалансированны (то есть практически все данные распределяются на один исполнительный процесс), следует вернуться к оптимизации операций и подобрать более эффективное присоединение или другую операцию, которая обеспечит баланс. В худшем варианте этого сценария можно переопределить разделы, чтобы изменить распределение данных:

Увеличение длительности пакета

Если все остальные способы не дадут результата, попробуйте увеличить длительность пакета. Конечно, это приведет к увеличению количества обрабатываемых данных, но может оказаться так, что вы выполняете какую-нибудь операцию, которая всегда занимает около 10 секунд, независимо от объема данных. В этом случае увеличение длительности пакета может помочь.

Источник

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *