Skip to content

Commit 96f7f0e

Browse files
committed
upgraded to aeron 0.1.4, created sub projects, refactored client to create to DuplexConnections via factory and no longer wrapper ReactiveSocket
1 parent c18da4e commit 96f7f0e

57 files changed

Lines changed: 2995 additions & 750 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

build.gradle

Lines changed: 74 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,36 +6,89 @@ buildscript {
66
dependencies { classpath 'io.reactivesocket:gradle-nebula-plugin-reactivesocket:1.0.1' }
77
}
88

9+
10+
subprojects {
11+
apply plugin: 'java'
12+
13+
repositories {
14+
maven { url 'https://oss.jfrog.org/libs-snapshot' }
15+
}
16+
17+
dependencies {
18+
compile 'io.reactivex:rxjava:1.0.13'
19+
compile 'io.reactivex:rxjava-reactive-streams:1.0.1'
20+
compile 'io.reactivesocket:reactivesocket:0.0.1-SNAPSHOT'
21+
compile 'uk.co.real-logic:Agrona:0.4.4'
22+
compile 'uk.co.real-logic:aeron-all:0.1.4'
23+
compile 'org.hdrhistogram:HdrHistogram:2.1.7'
24+
compile 'org.slf4j:slf4j-api:1.7.12'
25+
testCompile 'junit:junit-dep:4.10'
26+
testCompile 'org.mockito:mockito-core:1.8.5'
27+
testCompile 'org.slf4j:slf4j-simple:1.7.12'
28+
}
29+
}
30+
31+
/*
32+
buildscript {
33+
repositories {
34+
jcenter()
35+
}
36+
37+
dependencies { classpath 'io.reactivesocket:gradle-nebula-plugin-reactivesocket:1.0.1' }
38+
}
39+
940
description = 'ReactiveSocket: stream oriented messaging passing with Reactive Stream semantics.'
1041
11-
apply plugin: 'reactivesocket-project'
12-
apply plugin: 'java'
1342
1443
repositories {
1544
maven { url 'https://oss.jfrog.org/libs-snapshot' }
1645
}
1746
18-
dependencies {
19-
compile 'io.reactivex:rxjava:1.0.13'
20-
compile 'io.reactivex:rxjava-reactive-streams:1.0.1'
21-
compile 'io.reactivesocket:reactivesocket:0.0.1-SNAPSHOT'
22-
compile 'uk.co.real-logic:Agrona:0.4.3'
23-
compile 'uk.co.real-logic:aeron-all:0.1.3'
24-
compile 'org.hdrhistogram:HdrHistogram:2.1.7'
25-
compile 'org.slf4j:slf4j-api:1.7.12'
26-
testCompile 'junit:junit-dep:4.10'
27-
testCompile 'org.mockito:mockito-core:1.8.5'
28-
testCompile 'org.slf4j:slf4j-simple:1.7.12'
29-
testCompile 'io.dropwizard.metrics:metrics-core:3.1.2'
47+
subprojects {
48+
apply plugin: 'reactivesocket-project'
49+
apply plugin: 'java'
50+
51+
sourceCompatibility = JavaVersion.VERSION_1_8
52+
targetCompatibility = JavaVersion.VERSION_1_8
53+
54+
dependencies {
55+
compile 'io.reactivex:rxjava:1.0.13'
56+
compile 'io.reactivex:rxjava-reactive-streams:1.0.1'
57+
compile 'io.reactivesocket:reactivesocket:0.0.1-SNAPSHOT'
58+
compile 'uk.co.real-logic:Agrona:0.4.4'
59+
compile 'uk.co.real-logic:aeron-all:0.1.4'
60+
compile 'org.hdrhistogram:HdrHistogram:2.1.7'
61+
compile 'org.slf4j:slf4j-api:1.7.12'
62+
testCompile 'junit:junit-dep:4.10'
63+
testCompile 'org.mockito:mockito-core:1.8.5'
64+
testCompile 'org.slf4j:slf4j-simple:1.7.12'
65+
}
66+
67+
68+
// support for snapshot/final releases via versioned branch names like 1.x
69+
nebulaRelease {
70+
addReleaseBranchPattern(/\d+\.\d+\.\d+/)
71+
addReleaseBranchPattern('HEAD')
72+
}
73+
74+
if (project.hasProperty('release.useLastTag')) {
75+
tasks.prepare.enabled = false
76+
}
3077
}
3178
3279
33-
// support for snapshot/final releases via versioned branch names like 1.x
34-
nebulaRelease {
35-
addReleaseBranchPattern(/\d+\.\d+\.\d+/)
36-
addReleaseBranchPattern('HEAD')
80+
task(md, dependsOn: 'classes', type: JavaExec) {
81+
main = 'io.reactivesocket.aeron.example.MediaDriver'
82+
classpath = sourceSets.main.runtimeClasspath
3783
}
3884
39-
if (project.hasProperty('release.useLastTag')) {
40-
tasks.prepare.enabled = false
41-
}
85+
task(ping, dependsOn: 'classes', type: JavaExec) {
86+
main = 'io.reactivesocket.aeron.example.Ping'
87+
classpath = sourceSets.main.runtimeClasspath
88+
}
89+
90+
task(pong, dependsOn: 'classes', type: JavaExec) {
91+
main = 'io.reactivesocket.aeron.example.Pong'
92+
classpath = sourceSets.main.runtimeClasspath
93+
}
94+
*/
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
dependencies {
2+
compile project(':reactivesocket-aeron-core')
3+
}

src/main/java/io/reactivesocket/aeron/client/AeronClientDuplexConnection.java renamed to reactivesocket-aeron-client/src/main/java/io/reactivesocket/aeron/client/AeronClientDuplexConnection.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import io.reactivesocket.DuplexConnection;
44
import io.reactivesocket.Frame;
5-
import io.reactivesocket.aeron.internal.Constants;
65
import io.reactivesocket.aeron.internal.Loggable;
76
import io.reactivesocket.rx.Completable;
87
import io.reactivesocket.rx.Disposable;
@@ -12,7 +11,7 @@
1211
import org.reactivestreams.Subscriber;
1312
import org.reactivestreams.Subscription;
1413
import uk.co.real_logic.aeron.Publication;
15-
import uk.co.real_logic.agrona.concurrent.ManyToOneConcurrentArrayQueue;
14+
import uk.co.real_logic.agrona.concurrent.AbstractConcurrentArrayQueue;
1615

1716
import java.io.IOException;
1817
import java.util.concurrent.CopyOnWriteArrayList;
@@ -22,12 +21,12 @@ public class AeronClientDuplexConnection implements DuplexConnection, Loggable {
2221

2322
private final Publication publication;
2423
private final CopyOnWriteArrayList<Observer<Frame>> subjects;
25-
private final ManyToOneConcurrentArrayQueue<FrameHolder> frameSendQueue;
24+
private final AbstractConcurrentArrayQueue<FrameHolder> frameSendQueue;
2625
private final Consumer<Publication> onClose;
2726

2827
public AeronClientDuplexConnection(
2928
Publication publication,
30-
ManyToOneConcurrentArrayQueue<FrameHolder> frameSendQueue,
29+
AbstractConcurrentArrayQueue<FrameHolder> frameSendQueue,
3130
Consumer<Publication> onClose) {
3231
this.publication = publication;
3332
this.subjects = new CopyOnWriteArrayList<>();
@@ -64,10 +63,11 @@ public void addOutput(Publisher<Frame> o, Completable callback) {
6463
o
6564
.subscribe(new Subscriber<Frame>() {
6665
private Subscription s;
66+
6767
@Override
6868
public void onSubscribe(Subscription s) {
6969
this.s = s;
70-
s.request(Constants.QUEUE_SIZE);
70+
s.request(Long.MAX_VALUE);
7171

7272
}
7373

@@ -101,11 +101,9 @@ public void close() throws IOException {
101101
onClose.accept(publication);
102102
}
103103

104-
public ManyToOneConcurrentArrayQueue<FrameHolder> getFrameSendQueue() {
105-
return frameSendQueue;
106-
}
107-
108104
public CopyOnWriteArrayList<Observer<Frame>> getSubjects() {
109105
return subjects;
110106
}
107+
108+
111109
}

0 commit comments

Comments
 (0)