Introduktion till Apache Flink med Java

1. Översikt

Apache Flink är ett ramverk för Big Data-bearbetning som gör det möjligt för programmerare att bearbeta den stora mängden data på ett mycket effektivt och skalbart sätt.

I den här artikeln presenterar vi några av de centrala API-koncepten och standarddatatransformationer som finns tillgängliga i Apache Flink Java API . Den flytande stilen med detta API gör det enkelt att arbeta med Flinks centrala konstruktion - den distribuerade samlingen.

Först tar vi en titt på Flinks DataSet API-omvandlingar och använder dem för att implementera ett ordräkningsprogram. Sedan tar vi en kort titt på Flinks DataStream API, som låter dig bearbeta händelser i realtid.

2. Maven-beroende

För att komma igång måste vi lägga till Maven-beroenden i flink-java och flink-test-utils bibliotek:

 org.apache.flink flink-java 1.2.0   org.apache.flink flink-test-utils_2.10 1.2.0 test 

3. Grundläggande API-koncept

När vi arbetar med Flink måste vi veta några saker relaterade till dess API:

  • Varje Flink-program utför transformationer på distribuerade datainsamlingar. En mängd olika funktioner för att omvandla data tillhandahålls, inklusive filtrering, mappning, sammanfogning, gruppering och aggregering
  • En sink- operation i Flink utlöser körningen av en ström för att ge önskat resultat av programmet , till exempel att spara resultatet i filsystemet eller skriva ut det till standardutmatningen
  • Flinktransformationer är lata, vilket innebär att de inte körs förrän en sänkoperation åberopas
  • Apache Flink API stöder två driftsätt - batch och realtid. Om du har att göra med en begränsad datakälla som kan bearbetas i batch-läge använder du DataSet API. Om du vill bearbeta obegränsade dataströmmar i realtid måste du använda DataStream API

4. DataSet API-omvandlingar

Ingångspunkten för Flink-programmet är en instans av klassen ExecutionEnvironment - detta definierar i vilket sammanhang ett program körs.

Låt oss skapa en ExecutionEnvironment för att starta vår bearbetning:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Observera att när du startar applikationen på den lokala maskinen kommer den att bearbetas på den lokala JVM. Om du vill börja bearbeta på ett kluster av maskiner måste du installera Apache Flink på dessa maskiner och konfigurera ExecutionEnvironment därefter .

4.1. Skapa ett datauppsättning

För att börja utföra datatransformationer måste vi förse vårt program med data.

Låt oss skapa en instans av DataSet- klassen med vår ExecutionEnvironement :

DataSet amounts = env.fromElements(1, 29, 40, 50);

Du kan skapa en datasats från flera källor, till exempel Apache Kafka, en CSV, fil eller praktiskt taget vilken som helst annan datakälla.

4.2. Filtrera och minska

När du har skapat en instans av klassen DataSet kan du tillämpa omvandlingar till den.

Låt oss säga att du vill filtrera nummer som ligger över ett visst tröskelvärde och sedan summera dem alla . Du kan använda filter () och minska () transformationer för att uppnå detta:

int threshold = 30; List collect = amounts .filter(a -> a > threshold) .reduce((integer, t1) -> integer + t1) .collect(); assertThat(collect.get(0)).isEqualTo(90); 

Observera att metoden collect () är en sänkoperation som utlöser de faktiska datatransformationerna.

4.3. Karta

Låt oss säga att du har en DataSet av Person objekt:

private static class Person { private int age; private String name; // standard constructors/getters/setters }

Låt oss sedan skapa en datasats av dessa objekt:

DataSet personDataSource = env.fromCollection( Arrays.asList( new Person(23, "Tom"), new Person(75, "Michael")));

Antag att du bara vill extrahera åldersfältet från varje objekt i samlingen. Du kan använda map () -transformationen för att bara få ett specifikt fält i personklassen :

List ages = personDataSource .map(p -> p.age) .collect(); assertThat(ages).hasSize(2); assertThat(ages).contains(23, 75);

4.4. Ansluta sig

När du har två datamängder kanske du vill gå med i något id- fält. För detta kan du använda transformationen join () .

Låt oss skapa samlingar av transaktioner och adresser för en användare:

Tuple3 address = new Tuple3(1, "5th Avenue", "London"); DataSet
    
      addresses = env.fromElements(address); Tuple2 firstTransaction = new Tuple2(1, "Transaction_1"); DataSet
     
       transactions = env.fromElements(firstTransaction, new Tuple2(12, "Transaction_2")); 
     
    

Det första fältet i båda tuplerna är av heltalstyp , och detta är ett id- fält som vi vill gå med i båda datauppsättningarna.

För att utföra den faktiska anslutningslogiken måste vi implementera ett KeySelector- gränssnitt för adress och transaktion:

private static class IdKeySelectorTransaction implements KeySelector
    
      { @Override public Integer getKey(Tuple2 value) { return value.f0; } } private static class IdKeySelectorAddress implements KeySelector
     
       { @Override public Integer getKey(Tuple3 value) { return value.f0; } }
     
    

Varje väljare returnerar bara det fält där kopplingen ska utföras.

Tyvärr är det inte möjligt att använda lambdauttryck här eftersom Flink behöver generisk typinformation.

Låt oss sedan implementera sammanslagningslogik med dessa väljare:

List
    
     > joined = transactions.join(addresses) .where(new IdKeySelectorTransaction()) .equalTo(new IdKeySelectorAddress()) .collect(); assertThat(joined).hasSize(1); assertThat(joined).contains(new Tuple2(firstTransaction, address)); 
    

4.5. Sortera

Låt oss säga att du har följande samling av Tuple2:

Tuple2 secondPerson = new Tuple2(4, "Tom"); Tuple2 thirdPerson = new Tuple2(5, "Scott"); Tuple2 fourthPerson = new Tuple2(200, "Michael"); Tuple2 firstPerson = new Tuple2(1, "Jack"); DataSet
    
      transactions = env.fromElements( fourthPerson, secondPerson, thirdPerson, firstPerson); 
    

Om du vill sortera den här samlingen efter det första fältet i tupeln kan du använda sortPartitions () -transformationen :

List
    
      sorted = transactions .sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING) .collect(); assertThat(sorted) .containsExactly(firstPerson, secondPerson, thirdPerson, fourthPerson);
    

5. Antal ord

Ordräkningsproblemet är ett som vanligtvis används för att visa upp möjligheterna för ramverk för Big Data-bearbetning. Den grundläggande lösningen innebär att man räknar ordhändelser i en textinmatning. Låt oss använda Flink för att implementera en lösning på detta problem.

Som det första steget i vår lösning skapar vi en LineSplitter- klass som delar upp vår inmatning i tokens (ord) och samlar för varje token en Tuple2 av nyckel-värdepar. I var och en av dessa tupler är nyckeln ett ord som finns i texten, och värdet är heltalet (1).

Den här klassen implementerar FlatMapFunction- gränssnittet som tar String som en ingång och producerar en Tuple2:

public class LineSplitter implements FlatMapFunction
    
      { @Override public void flatMap(String value, Collector
     
       out) { Stream.of(value.toLowerCase().split("\\W+")) .filter(t -> t.length() > 0) .forEach(token -> out.collect(new Tuple2(token, 1))); } }
     
    

Vi kallar samla () -metoden på Collector- klassen för att driva data framåt i bearbetningspipelinen.

Vårt nästa och sista steg är att gruppera tuplarna efter deras första element (ord) och sedan utföra en summan på de andra elementen för att producera ett antal ordhändelser:

public static DataSet
    
      startWordCount( ExecutionEnvironment env, List lines) throws Exception { DataSet text = env.fromCollection(lines); return text.flatMap(new LineSplitter()) .groupBy(0) .aggregate(Aggregations.SUM, 1); }
    

Vi använder tre typer av Flink-transformationer: flatMap () , groupBy () och aggregat () .

Låt oss skriva ett test för att hävda att implementeringen av ordräkningen fungerar som förväntat:

List lines = Arrays.asList( "This is a first sentence", "This is a second sentence with a one word"); DataSet
    
      result = WordCount.startWordCount(env, lines); List
     
       collect = result.collect(); assertThat(collect).containsExactlyInAnyOrder( new Tuple2("a", 3), new Tuple2("sentence", 2), new Tuple2("word", 1), new Tuple2("is", 2), new Tuple2("this", 2), new Tuple2("second", 1), new Tuple2("first", 1), new Tuple2("with", 1), new Tuple2("one", 1));
     
    

6. DataStream API

6.1. Skapa en DataStream

Apache Flink also supports the processing of streams of events through its DataStream API. If we want to start consuming events, we first need to use the StreamExecutionEnvironment class:

StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

Next, we can create a stream of events using the executionEnvironment from a variety of sources. It could be some message bus like Apache Kafka, but in this example, we will simply create a source from a couple of string elements:

DataStream dataStream = executionEnvironment.fromElements( "This is a first sentence", "This is a second sentence with a one word");

We can apply transformations to every element of the DataStream like in the normal DataSet class:

SingleOutputStreamOperator upperCase = text.map(String::toUpperCase);

To trigger the execution, we need to invoke a sink operation such as print() that will just print the result of transformations to the standard output, following with the execute() method on the StreamExecutionEnvironment class:

upperCase.print(); env.execute();

It will produce the following output:

1> THIS IS A FIRST SENTENCE 2> THIS IS A SECOND SENTENCE WITH A ONE WORD

6.2. Windowing of Events

When processing a stream of events in real time, you may sometimes need to group events together and apply some computation on a window of those events.

Suppose we have a stream of events, where each event is a pair consisting of the event number and the timestamp when the event was sent to our system, and that we can tolerate events that are out-of-order but only if they are no more than twenty seconds late.

For this example, let's first create a stream simulating two events that are several minutes apart and define a timestamp extractor that specifies our lateness threshold:

SingleOutputStreamOperator
    
      windowed = env.fromElements( new Tuple2(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()), new Tuple2(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond())) .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor 
     
      (Time.seconds(20)) { @Override public long extractTimestamp(Tuple2 element) { return element.f1 * 1000; } });
     
    

Next, let's define a window operation to group our events into five-second windows and apply a transformation on those events:

SingleOutputStreamOperator
    
      reduced = windowed .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) .maxBy(0, true); reduced.print();
    

It will get the last element of every five-second window, so it prints out:

1> (15,1491221519)

Note that we do not see the second event because it arrived later than the specified lateness threshold.

7. Conclusion

In this article, we introduced the Apache Flink framework and looked at some of the transformations supplied with its API.

We implemented a word count program using Flink's fluent and functional DataSet API. Then we looked at the DataStream API and implemented a simple real-time transformation on a stream of events.

Implementeringen av alla dessa exempel och kodavsnitt finns på GitHub - detta är ett Maven-projekt, så det borde vara enkelt att importera och köra som det är.