Початок роботи з потоковою обробкою з Spring Cloud Data Flow

1. Вступ

Spring Cloud Data Flow - це власне хмарне програмування та операційна модель для компонуючих мікросервісів даних.

За допомогою Spring Cloud Data Flow розробники можуть створювати та організовувати конвеєри даних для загальних випадків використання, таких як передача даних, аналітика в режимі реального часу та імпорт / експорт даних.

Ці конвеєри даних мають два варіанти - потокові та пакетні конвеєри даних.

У першому випадку необмежений обсяг даних споживається або створюється за допомогою проміжного програмного забезпечення для обміну повідомленнями. У той час як у другому випадку короткочасне завдання обробляє кінцевий набір даних, а потім завершує.

Ця стаття буде зосереджена на потоковій обробці.

2. Архітектурний огляд

Ключовими компонентами архітектури цього типу є Програми , Сервер потоку даних та цільова середовище виконання.

Крім цих ключових компонентів, ми також зазвичай маємо оболонку потоку даних та посередника повідомлень в архітектурі.

Давайте розглянемо всі ці компоненти докладніше.

2.1. Програми

Як правило, конвеєр потокового передавання даних включає споживання подій із зовнішніх систем, обробку даних та стійкість поліглотів. Ці фази зазвичай називають термінологією Source , Processor і Sink у Spring Cloud :

  • Джерело: це програма, яка споживає події
  • Процесор: споживає дані з Джерела , виконує певну обробку над ним і передає оброблені дані до наступного додатка в конвеєрі
  • Раковина: споживає з джерела або процесора і записує дані на потрібний рівень стійкості

Ці програми можна упакувати двома способами:

  • Uber-jar Spring Boot, розміщений у сховищі maven, файлі, http або будь-якій іншій реалізації ресурсу Spring (цей метод буде використаний у цій статті)
  • Докер

Багато додатків, процесорів та додатків для поширених випадків використання (наприклад, jdbc, hdfs, http, маршрутизатор) вже надані та готові до використання командою Spring Cloud Data Flow .

2.2. Час роботи

Крім того, для виконання цих програм необхідний час виконання. Підтримувані середовища виконання:

  • Хмарна ливарня
  • Пряжа Apache
  • Кубернетес
  • Apache Mesos
  • Локальний сервер для розробки (який буде використаний у цій статті)

2.3. Сервер потоку даних

Компонентом, який відповідає за розгортання програм у середовищі виконання, є Сервер потоку даних . Для кожного цільового часу виконання передбачено виконуваний файл Data Flow Server .

Flow дані сервер відповідає за інтерпретацію:

  • Потік DSL, що описує логічний потік даних через безліч програм.
  • Маніфест розгортання, який описує відображення програм у середовищі виконання.

2.4. Оболонка потоку даних

Оболонка потоку даних є клієнтом для сервера потоку даних. Оболонка дозволяє нам виконувати команду DSL, необхідну для взаємодії з сервером.

Як приклад, DSL для опису потоку даних від джерела http до джерела jdbc буде записано як “http | jdbc ”. Ці імена в DSL реєструються на сервері Data Flow Server і відображаються на артефактах додатків, які можна розмістити у сховищах Maven або Docker.

Spring також пропонує графічний інтерфейс під назвою Flo для створення та моніторингу потокових конвеєрів даних. Однак його використання поза межами обговорення цієї статті.

2.5. Брокер повідомлень

Як ми бачили на прикладі попереднього розділу, ми використали символ труби для визначення потоку даних. Символ конвеєра представляє зв'язок між двома програмами за допомогою проміжного програмного забезпечення для обміну повідомленнями.

Це означає, що нам потрібен брокер повідомлень, який працює і працює в цільовому середовищі.

Підтримуються два посередники-посередники для обміну повідомленнями:

  • Апач Кафка
  • RabbitMQ

Отже, тепер, коли ми маємо огляд архітектурних компонентів - настав час побудувати наш перший конвеєр для обробки потоків.

3. Встановіть Брокер повідомлень

Як ми вже бачили, конвеєрні програми потребують проміжного програмного забезпечення для обміну повідомленнями для спілкування. Для цілей цієї статті ми поговоримо з RabbitMQ .

Для отримання детальної інформації про встановлення ви можете слідувати інструкціям на офіційному сайті.

4. Локальний сервер потоку даних

Щоб пришвидшити процес генерації наших додатків, ми використаємо Spring Initializr; за його допомогою ми можемо отримати наші програми Spring Boot за кілька хвилин.

Після переходу на веб-сайт просто виберіть Групу та ім'я Артефакту .

Після цього натисніть кнопку Створити проект, щоб розпочати завантаження артефакту Maven.

Після завершення завантаження розпакуйте проект та імпортуйте його як проект Maven у вибрану вами IDE.

Додамо до проекту залежність Maven. Оскільки нам будуть потрібні бібліотеки локального сервера потоку даних , додамо залежність spring-cloud-starter-dataflow-server-local:

 org.springframework.cloud spring-cloud-starter-dataflow-server-local 

Тепер нам потрібно анотувати основний клас Spring Boot за допомогою анотації @EnableDataFlowServer :

@EnableDataFlowServer @SpringBootApplication public class SpringDataFlowServerApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowServerApplication.class, args); } } 

Це все. Наш сервер локального потоку даних готовий до запуску:

mvn spring-boot:run

Програма завантажиться через порт 9393.

5. Оболонка потоку даних

Знову перейдіть до Spring Initializr і оберіть назву групи та артефакту .

Завантаживши та імпортувавши проект, додамо залежність spring-cloud-dataflow-shell:

 org.springframework.cloud spring-cloud-dataflow-shell 

Тепер нам потрібно додати анотацію @EnableDataFlowShell до основного класу Spring Boot :

@EnableDataFlowShell @SpringBootApplication public class SpringDataFlowShellApplication { public static void main(String[] args) { SpringApplication.run(SpringDataFlowShellApplication.class, args); } } 

Тепер ми можемо запустити оболонку:

mvn spring-boot:run

After the shell is running, we can type the help command in the prompt to see a complete list of command that we can perform.

6. The Source Application

Similarly, on Initializr, we'll now create a simple application and add a Stream Rabbit dependency called spring-cloud-starter-stream-rabbit:

 org.springframework.cloud spring-cloud-starter-stream-rabbit 

We'll then add the @EnableBinding(Source.class) annotation to the Spring Boot main class:

@EnableBinding(Source.class) @SpringBootApplication public class SpringDataFlowTimeSourceApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowTimeSourceApplication.class, args); } }

Now we need to define the source of the data that must be processed. This source could be any potentially endless workload (internet-of-things sensor data, 24/7 event processing, online transaction data ingest).

In our sample application, we produce one event (for simplicity a new timestamp) every 10 seconds with a Poller.

The @InboundChannelAdapter annotation sends a message to the source’s output channel, using the return value as the payload of the message:

@Bean @InboundChannelAdapter( value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1") ) public MessageSource timeMessageSource() { return () -> MessageBuilder.withPayload(new Date().getTime()).build(); } 

Our data source is ready.

7. The Processor Application

Next- we'll create an application and add a Stream Rabbit dependency.

We'll then add the @EnableBinding(Processor.class) annotation to the Spring Boot main class:

@EnableBinding(Processor.class) @SpringBootApplication public class SpringDataFlowTimeProcessorApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowTimeProcessorApplication.class, args); } }

Next, we need to define a method to process the data that coming from the source application.

To define a transformer, we need to annotate this method with @Transformer annotation:

@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Object transform(Long timestamp) { DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd hh:mm:yy"); String date = dateFormat.format(timestamp); return date; }

It converts a timestamp from the ‘input' channel to a formatted date which will be sent to the ‘output' channel.

8. The Sink Application

The last application to create is the Sink application.

Again, go to the Spring Initializr and choose a Group, an Artifact name. After downloading the project let's add a Stream Rabbit dependency.

Then add the @EnableBinding(Sink.class) annotation to the Spring Boot main class:

@EnableBinding(Sink.class) @SpringBootApplication public class SpringDataFlowLoggingSinkApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowLoggingSinkApplication.class, args); } }

Now we need a method to intercept the messages coming from the processor application.

To do this, we need to add the @StreamListener(Sink.INPUT) annotation to our method:

@StreamListener(Sink.INPUT) public void loggerSink(String date) { logger.info("Received: " + date); }

The method simply prints the timestamp transformed in a formatted date to a log file.

9. Register a Stream App

The Spring Cloud Data Flow Shell allow us to Register a Stream App with the App Registry using the app register command.

We must provide a unique name, application type, and a URI that can be resolved to the app artifact. For the type, specify “source“, “processor“, or “sink“.

When providing a URI with the maven scheme, the format should conform to the following:

maven://:[:[:]]:

To register the Source, Processor and Sink applications previously created , go to the Spring Cloud Data Flow Shell and issue the following commands from the prompt:

app register --name time-source --type source --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-source:jar:0.0.1-SNAPSHOT app register --name time-processor --type processor --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-processor:jar:0.0.1-SNAPSHOT app register --name logging-sink --type sink --uri maven://com.baeldung.spring.cloud:spring-data-flow-logging-sink:jar:0.0.1-SNAPSHOT 

10. Create and Deploy the Stream

To create a new stream definition go to the Spring Cloud Data Flow Shell and execute the following shell command:

stream create --name time-to-log --definition 'time-source | time-processor | logging-sink'

This defines a stream named time-to-log based on the DSL expression ‘time-source | time-processor | logging-sink'.

Then to deploy the stream execute the following shell command:

stream deploy --name time-to-log

The Data Flow Server resolves time-source, time-processor, and logging-sink to maven coordinates and uses those to launch the time-source, time-processor and logging-sink applications of the stream.

If the stream is correctly deployed you’ll see in the Data Flow Server logs that the modules have been started and tied together:

2016-08-24 12:29:10.516 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer: deploying app time-to-log.logging-sink instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink 2016-08-24 12:29:17.600 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer : deploying app time-to-log.time-processor instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034556862/time-to-log.time-processor 2016-08-24 12:29:23.280 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer : deploying app time-to-log.time-source instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034562861/time-to-log.time-source

11. Reviewing the Result

In this example, the source simply sends the current timestamp as a message each second, the processor format it and the log sink outputs the formatted timestamp using the logging framework.

The log files are located within the directory displayed in the Data Flow Server’s log output, as shown above. To see the result, we can tail the log:

tail -f PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink/stdout_0.log 2016-08-24 12:40:42.029 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:01 2016-08-24 12:40:52.035 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:11 2016-08-24 12:41:02.030 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:21

12. Conclusion

In this article, we have seen how to build a data pipeline for stream processing through the use of Spring Cloud Data Flow.

Крім того, ми побачили роль додатків Source , Processor та Sink всередині потоку, а також те, як підключити та зв’язати цей модуль всередині сервера Data Flow Server за допомогою оболонки Flow Data .

Приклад коду можна знайти в проекті GitHub.