Reactor is a fully non-blocking reactive programming foundation for the JVM, with efficient demand management (in the form of managing “backpressure”).

Seems it’s very hard, but it follows the Pareto principle, we just use 20% of its features in our daily work.

In my current project, we haven’t touched backpressure anymore, most of the operations we used are

  • (Flux|Mono).just
  • (Flux|Mono).map
  • (Flux|Mono).flatMap
  • (Flux|Mono).onErrorResume
  • (Flux|Mono).filter

So don’t be scared about it, after reading this blog, you can also use it easily in your project.

How to install?

Add the following configuration to your pom.xml. we use BOM here, so we don’t need to care about the version and dependency consistency.

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>2020.0.16</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

How to do a test?

Reactor supplied a test library to help us to test Flux|Mono, we can use StepVerifier in this library to

  • Verify elements and their order

    StepVerifier.create(Flux.fromArray(new Integer[] {1, 2, 3, 4, 5})).expectNext(1)
        .expectNext(2).expectNext(3).expectNext(4).expectNext(5).verifyComplete();
    
  • Verify error

    StepVerifier.create(Flux.error(new Exception("some error")))
            .verifyErrorMessage("some error");
    
  • Verify the end of the sequence

    StepVerifier.create(Flux.empty()).verifyComplete();
    

In the following sections, we will use the StepVerifier to verify examples.

What’s the basic concept we need to know?

There are two basic concepts in reactor

  • Flux

    A sequence of data, there are 0 or more elements

  • Mono

    A sequence of data, there is 0 or 1 element

What are the 20% features we need to know?

In this section, I will borrow some concepts to group the examples

  • Boxing

    Put the data into Flux|Mono

  • Transforming

    Transform the current data to another data

  • Peeking

    Do something for each element but not modify the data

  • Unboxing

    Get the data out of Flux|Mono

Boxing

How to box String?

  • Flux

      @Test
      public void canBeCreatedFromString() {
          StepVerifier.create(Flux.just("hello world")).expectNext("hello world").verifyComplete();
      }
    
  • Mono

      @Test
      public void canBeCreatedFromString() {
          StepVerifier.create(Mono.just("hello world")).expectNext("hello world").verifyComplete();
      }
    

How to box Number?

  • Flux

      @Test
      public void canBeCreatedFromNumber() {
          StepVerifier.create(Flux.just(1)).expectNext(1).verifyComplete();
          StepVerifier.create(Flux.just(1.0)).expectNext(1.0).verifyComplete();
      }
    
  • Mono

      @Test
      public void canBeCreatedFromNumber() {
          StepVerifier.create(Mono.just(1)).expectNext(1).verifyComplete();
          StepVerifier.create(Mono.just(1.0)).expectNext(1.0).verifyComplete();
      }
    

How to box Optional or Nullable value?

This can only be done by Mono

@Test
public void canBeCreatedFromNullableValue() {
    String value = null;
    StepVerifier.create(Mono.justOrEmpty(value)).verifyComplete();

    value = "hello world";
    StepVerifier.create(Mono.justOrEmpty(value)).expectNext("hello world").verifyComplete();
}

@Test
public void canBeCreatedFromOptionalValue() {

    Optional<String> value = Optional.empty();
    StepVerifier.create(Mono.justOrEmpty(value)).verifyComplete();

    value = Optional.of("hello world");
    StepVerifier.create(Mono.justOrEmpty(value)).expectNext("hello world").verifyComplete();
}

How to box data generator?

Data generator here is a function without parameter, its signature looks like () -> A.

This can only be done by Mono.

@Test
public void canBeCreatedFromCallable() {
    StepVerifier.create(Mono.fromCallable(() -> "hello world!")).expectNext("hello world!")
            .verifyComplete();
}

How to box Array?

This can only be done by Flux.

@Test
public void canBeCreatedFromArray() {
    StepVerifier.create(Flux.fromArray(new Integer[] {1, 2, 3, 4, 5})).expectNext(1)
            .expectNext(2).expectNext(3).expectNext(4).expectNext(5).verifyComplete();
}

How to box Iterable?

This can only be done by Flux.

@Test
public void canBeCreatedFromList() {
    List<Integer> list = new LinkedList<Integer>();
    list.add(1);
    list.add(2);
    list.add(3);
    list.add(4);

    StepVerifier.create(Flux.fromIterable(list)).expectNext(1).expectNext(2).expectNext(3)
            .expectNext(4).verifyComplete();
}

@Test
public void canBeCreatedFromSet() {
    Set<Integer> set = new HashSet<Integer>();
    set.add(1);
    set.add(2);
    set.add(2);
    set.add(3);
    set.add(4);

    StepVerifier.create(Flux.fromIterable(set)).expectNext(1).expectNext(2).expectNext(3)
            .expectNext(4).verifyComplete();
}

How to box Stream?

This can only be done by Flux.

@Test
public void canBeCreatedFromStream() {
    Stream<Integer> stream = Stream.of(1, 2, 3, 4, 5);
    StepVerifier.create(Flux.fromStream(stream)).expectNext(1).expectNext(2).expectNext(3)
            .expectNext(4).expectNext(5).verifyComplete();
}

How to box Throwable?

  • Flux

      @Test
      public void canBeCreatedFromThrowable() {
          StepVerifier.create(Flux.error(new Exception("some error")))
                  .verifyErrorMessage("some error");
      }
    
  • Mono

      @Test
      public void canBeCreatedFromThrowable() {
          StepVerifier.create(Mono.error(new Exception("some error")))
                  .verifyErrorMessage("some error");
      }
    

Transforming

How to filter elements by some condition?

  • Flux

      @Test
      public void canDoFilter() {
          Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6);
          StepVerifier.create(flux.filter(x -> x > 5)).expectNext(6).verifyComplete();
      }
    
  • Mono

      @Test
      public void canDoFilter() {
          StepVerifier.create(Mono.just(6).filter(x -> x > 5)).expectNext(6).verifyComplete();
          StepVerifier.create(Mono.just(2).filter(x -> x > 5)).verifyComplete();
      }
    

How to apply a function A -> B?

If there is a function A -> B, A is the element type of Flux|Mono, and B is not Flux|Mono, we can use (Flux|Mono).map to apply the function to each element.

  • Flux

      @Test
      public void canMapTheTypeOfValueInSequenceToAnotherType() {
          StepVerifier.create(flux.map(x -> x.toString())).expectNext("1").expectNext("2")
                  .expectNext("3").expectNext("4").expectNext("5").expectNext("6").verifyComplete();
      }
    
      @Test
      public void canMapTheValueInSequenceToAnotherValue() {
          StepVerifier.create(flux.map(x -> x + 1)).expectNext(2).expectNext(3).expectNext(4)
                  .expectNext(5).expectNext(6).expectNext(7).verifyComplete();
      }
    
  • Mono

      @Test
      public void canMapTheTypeOfValueInSequenceToAnotherType() {
          StepVerifier.create(Mono.just(1).map(x -> x.toString())).expectNext("1").verifyComplete();
      }
    
      @Test
      public void canMapTheValueInSequenceToAnotherValue() {
          StepVerifier.create(Mono.just(1).map(x -> x + 1)).expectNext(2).verifyComplete();
      }
    

How to apply a function A -> (Flux|Mono)<B>?

If there is a function A -> (Flux|Mono)<B>, A is the element type of Flux|Mono, we can use (Flux|Mono).flatMap or Mono.flatMapMany to apply the function to each element.

  • Flux

      @Test
      public void canMapTheValueInSequenceToAnotherSequence() {
    
          StepVerifier.create(flux.flatMap(x -> Flux.just(x, x))).expectNext(1).expectNext(1)
                  .expectNext(2).expectNext(2).expectNext(3).expectNext(3).expectNext(4).expectNext(4)
                  .expectNext(5).expectNext(5).expectNext(6).expectNext(6).verifyComplete();
    
          StepVerifier.create(flux.flatMap(x -> Mono.just(x))).expectNext(1).expectNext(2)
                  .expectNext(3).expectNext(4).expectNext(5).expectNext(6).verifyComplete();
      }
    
  • Mono

      @Test
      public void canMapTheValueInSequenceToAnotherSequence() {
    
          StepVerifier.create(Mono.just(1).flatMapMany(x -> Flux.just(x, x))).expectNext(1)
                  .expectNext(1).verifyComplete();
    
          StepVerifier.create(Mono.just(1).flatMap(x -> Mono.just(x + 1))).expectNext(2)
                  .verifyComplete();
      }
    

How to give a default value if there is no data?

  • Flux

      @Test
      public void canRecoverWithSingleDefaultValueFromEmptySequence() {
          StepVerifier.<Integer>create(Flux.<Integer>empty().defaultIfEmpty(1)).expectNext(1)
                  .verifyComplete();
      }
    
  • Mono

      @Test
      public void canRecoverWithSingleDefaultValueFromEmptySequence() {
          StepVerifier.<Integer>create(Mono.<Integer>empty().defaultIfEmpty(1)).expectNext(1)
                  .verifyComplete();
      }
    

How to replace current Flux|Mono with another Flux|Mono if there is no data?

  • Flux

      @Test
      public void canRecoverWithAnotherSequenceFromEmptySequence() {
          StepVerifier.<Integer>create(Flux.<Integer>empty().switchIfEmpty(Flux.just(1)))
                  .expectNext(1).verifyComplete();
      }
    
  • Mono

      @Test
      public void canRecoverWithAnotherSequenceFromEmptySequence() {
          StepVerifier.<Integer>create(Mono.<Integer>empty().switchIfEmpty(Mono.just(1)))
                  .expectNext(1).verifyComplete();
      }
    

Peeking

How to print log for each element?

We can use (Flux|Mono).doOnNext to peek at the value of each element but not modify its value.

  • Flux

      @Test
      public void canDoSomethingForEveryElement() {
          Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6);
          flux.doOnNext(x -> System.out.println(x)).collectList().block();
      }
    
  • Mono

      @Test
      public void canDoSomethingForEveryElement() {
          Mono.just(1).doOnNext(x -> System.out.println(x)).block();
      }
    

Unboxing

How to convert Flux to a List?

@Test
public void canBeConvertedToList() {
    List<Integer> list = Flux.just(1, 2, 3).collectList().block();
    assertThat(list.size()).isEqualTo(3);
    assertThat(list.get(0)).isEqualTo(1);
    assertThat(list.get(1)).isEqualTo(2);
    assertThat(list.get(2)).isEqualTo(3);
}

How to get data out of Mono?

@Test
public void canBeConvertedToValue() {
    assertThat(Mono.just(1).block()).isEqualTo(1);
    assertThat(Mono.empty().block()).isNull();
}

Summary

I created a repo reactor-examples to play the above examples.

Clone the repo and get your hand dirty!

git clone git@github.com:sjmyuan/reactor-examples.git

Welcome to raise PR to add more examples.

Tags:

Categories:

Updated:

Comments