Issue
Question:
Need help successfully running test from Reactive Streams TCK (Technology Compatibility Kit)?
SimplePublisher.java (example)
package aaa.bbb.ccc.jar;
import java.util.Iterator;
import java.util.stream.IntStream;
import org.reactivestreams.FlowAdapters;
import java.util.concurrent.Flow;
public class SimplePublisher implements Flow.Publisher<Integer> {
private final Iterator<Integer> iterator;
SimplePublisher(int count) {
this.iterator = IntStream.rangeClosed(1, count).iterator();
}
@Override
public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
iterator.forEachRemaining(subscriber::onNext);
subscriber.onComplete();
}
public static void main(String[] args) {
new SimplePublisher(10).subscribe(new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
}
@Override
public void onNext(Integer item) {
System.out.println("item = [" + item + "]");
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
System.out.println("complete");
}
});
}
}
...running "main()" method successfully runs producing the output predicted...
item = [1]
item = [2]
item = [3]
item = [4]
item = [5]
item = [6]
item = [7]
item = [8]
item = [9]
item = [10]
complete
SimplePublisherTest.java (test - fails to run, i.e., slows/hangs)
package aaa.bbb.ccc.jar;
import java.util.concurrent.Flow;
import org.reactivestreams.tck.TestEnvironment;
import org.reactivestreams.tck.flow.FlowPublisherVerification;
//FlowPublisherVerification
public class SimplePublisherTest extends FlowPublisherVerification<Integer> {
public SimplePublisherTest() {
super(new TestEnvironment());
}
@Override
public Flow.Publisher<Integer> createFlowPublisher(long elements) {
return new SimplePublisher((int) elements);
}
@Override
public Flow.Publisher<Integer> createFailedFlowPublisher() {
return null;
}
}
output from the running the test
--- maven-clean-plugin:2.5:clean (default-clean) @ sp ---
Deleting C:\tools\sp\target
--- maven-resources-plugin:2.6:resources (default-resources) @ sp ---
Using 'UTF-8' encoding to copy filtered resources.
skip non existing resourceDirectory C:\tools\sp\src\main\resources
--- maven-compiler-plugin:3.8.0:compile (default-compile) @ sp ---
Changes detected - recompiling the module!
Compiling 1 source file to C:\tools\sp\target\classes
--- maven-resources-plugin:2.6:testResources (default-testResources) @ sp ---
Using 'UTF-8' encoding to copy filtered resources.
skip non existing resourceDirectory C:\tools\sp\src\test\resources
--- maven-compiler-plugin:3.8.0:testCompile (default-testCompile) @ sp ---
Changes detected - recompiling the module!
Compiling 1 source file to C:\tools\sp\target\test-classes
--- maven-surefire-plugin:2.12.4:test (default-test) @ sp ---
Surefire report directory: C:\tools\sp\target\surefire-reports
-------------------------------------------------------
T E S T S
-------------------------------------------------------
Running aaa.bbb.ccc.jar.SimplePublisherTest
Configuring TestNG with: org.apache.maven.surefire.testng.conf.TestNGMapConfigurator@123a439b
(test hangs at this point!)
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>aaa.bbb.ccc</groupId>
<artifactId>sp</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<name>sp</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<skip.unit.tests>false</skip.unit.tests>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-flow-adapters</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck-flow</artifactId>
<version>1.0.2</version>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<release>11</release>
</configuration>
</plugin>
</plugins>
</build>
</project>
example taken from...
https://blog.softwaremill.com/how-not-to-use-reactive-streams-in-java-9-7a39ea9c2cb3
Environment
java 11
reactive-streams (v1.0.2)
reactive-streams-flow-adapters (v1.0.2)
reactive-streams-tck-flow (v1.0.2)
Solution
Seems the answer was simply to add a maxElementsFromPublisher() method to the SimplePublisherTest.java class...
e.g.,
@Override
public long maxElementsFromPublisher() {
return 1000;
}
Found this info at this github page...
https://github.com/reactive-streams/reactive-streams-jvm/tree/master/tck
from readme.md...
In order to inform the TCK that the Publisher is only able to signal up to 2 elements, override the maxElementsFromPublisher method like this:
@Override public long maxElementsFromPublisher() {
return 2;
}
The TCK also supports Publishers which are not able to signal completion. Imagine a Publisher being backed by a timer—such a Publisher does not have a natural way to "complete" after some number of ticks. It would be possible to implement a Processor which would "take n elements from the TickPublisher and then signal completion to the downstream", but this adds a layer of indirection between the TCK and the Publisher one initially wanted to test. It is suggested to test such unbouded Publishers either way—using a "TakeNElementsProcessor" or by informing the TCK that the Publisher is not able to signal completion. The TCK will then skip all tests which require onComplete signals to be emitted.
In order to inform the TCK that your Publiher is not able to signal completion, override the maxElementsFromPublisher method like this:
@Override public long maxElementsFromPublisher() {
return publisherUnableToSignalOnComplete(); // == Long.MAX_VALUE == unbounded
}
Answered By - sairn
Answer Checked By - David Marino (JavaFixing Volunteer)