Skip to content

Commit e9429db

Browse files
author
Frohwalt Egerer
committed
Merge branch 'master' of github.com:jcourtney1/java-apns
2 parents 87056c3 + c7a7217 commit e9429db

5 files changed

Lines changed: 75 additions & 21 deletions

File tree

src/main/java/com/notnoop/apns/ApnsServiceBuilder.java

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,17 +89,19 @@ public class ApnsServiceBuilder {
8989

9090
private ReconnectPolicy reconnectPolicy = ReconnectPolicy.Provided.EVERY_HALF_HOUR.newObject();
9191
private boolean isQueued = false;
92+
private ThreadFactory queueThreadFactory = null;
9293

9394
private boolean isBatched = false;
9495
private int batchWaitTimeInSec;
9596
private int batchMaxWaitTimeInSec;
96-
private ThreadFactory batchThreadFactory;
97+
private ThreadFactory batchThreadFactory = null;
9798

9899
private ApnsDelegate delegate = ApnsDelegate.EMPTY;
99100
private Proxy proxy = null;
100101
private String proxyUsername = null;
101102
private String proxyPassword = null;
102103
private boolean errorDetection = true;
104+
private ThreadFactory errorDetectionThreadFactory = null;
103105

104106
/**
105107
* Constructs a new instance of {@code ApnsServiceBuilder}
@@ -485,7 +487,20 @@ public ApnsServiceBuilder asPool(ExecutorService executor, int maxConnections) {
485487
* @return this
486488
*/
487489
public ApnsServiceBuilder asQueued() {
490+
return asQueued(Executors.defaultThreadFactory());
491+
}
492+
493+
/**
494+
* Constructs a new thread with a processing queue to process
495+
* notification requests.
496+
*
497+
* @param threadFactory
498+
* thread factory to use for queue processing
499+
* @return this
500+
*/
501+
public ApnsServiceBuilder asQueued(ThreadFactory threadFactory) {
488502
this.isQueued = true;
503+
this.queueThreadFactory = threadFactory;
489504
return this;
490505
}
491506

@@ -514,7 +529,7 @@ public ApnsServiceBuilder asBatched() {
514529
* maximum wait time for batch before executing
515530
*/
516531
public ApnsServiceBuilder asBatched(int waitTimeInSec, int maxWaitTimeInSec) {
517-
return asBatched(waitTimeInSec, maxWaitTimeInSec, Executors.defaultThreadFactory());
532+
return asBatched(waitTimeInSec, maxWaitTimeInSec, null);
518533
}
519534

520535
/**
@@ -572,6 +587,20 @@ public ApnsServiceBuilder withNoErrorDetection() {
572587
return this;
573588
}
574589

590+
/**
591+
* Provide a custom source for threads used for monitoring connections.
592+
*
593+
* This setting is desired when the application must obtain threads from a
594+
* controlled environment Google App Engine.
595+
* @param threadFactory
596+
* thread factory to use for error detection
597+
* @return this
598+
*/
599+
public ApnsServiceBuilder withErrorDetectionThreadFactory(ThreadFactory threadFactory) {
600+
this.errorDetectionThreadFactory = threadFactory;
601+
return this;
602+
}
603+
575604
/**
576605
* Returns a fully initialized instance of {@link ApnsService},
577606
* according to the requested settings.
@@ -587,15 +616,16 @@ public ApnsService build() {
587616

588617
ApnsConnection conn = new ApnsConnectionImpl(sslFactory, gatewayHost,
589618
gatewaPort, proxy, proxyUsername, proxyPassword, reconnectPolicy,
590-
delegate, errorDetection, cacheLength, autoAdjustCacheLength, readTimeout, connectTimeout);
619+
delegate, errorDetection, errorDetectionThreadFactory, cacheLength,
620+
autoAdjustCacheLength, readTimeout, connectTimeout);
591621
if (pooledMax != 1) {
592622
conn = new ApnsPooledConnection(conn, pooledMax, executor);
593623
}
594624

595625
service = new ApnsServiceImpl(conn, feedback);
596626

597627
if (isQueued) {
598-
service = new QueuedApnsService(service);
628+
service = new QueuedApnsService(service, queueThreadFactory);
599629
}
600630

601631
if (isBatched) {

src/main/java/com/notnoop/apns/internal/ApnsConnectionImpl.java

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,18 @@
3030
*/
3131
package com.notnoop.apns.internal;
3232

33-
import java.io.DataInputStream;
3433
import java.io.EOFException;
3534
import java.io.IOException;
35+
import java.io.InputStream;
3636
import java.net.InetSocketAddress;
3737
import java.net.Proxy;
3838
import java.net.Socket;
3939
import java.util.LinkedList;
4040
import java.util.Queue;
4141
import java.util.concurrent.ConcurrentLinkedQueue;
42+
import java.util.concurrent.Executors;
43+
import java.util.concurrent.ThreadFactory;
44+
4245
import javax.net.SocketFactory;
4346
import javax.net.ssl.SSLSocketFactory;
4447
import com.notnoop.apns.ApnsDelegate;
@@ -67,6 +70,7 @@ public class ApnsConnectionImpl implements ApnsConnection {
6770
private final ApnsDelegate delegate;
6871
private int cacheLength;
6972
private final boolean errorDetection;
73+
private final ThreadFactory threadFactory;
7074
private final boolean autoAdjustCacheLength;
7175
private final ConcurrentLinkedQueue<ApnsNotification> cachedNotifications, notificationsBuffer;
7276

@@ -80,12 +84,12 @@ private ApnsConnectionImpl(SocketFactory factory, String host, int port, Reconne
8084

8185
private ApnsConnectionImpl(SocketFactory factory, String host, int port, Proxy proxy, String proxyUsername, String proxyPassword,
8286
ReconnectPolicy reconnectPolicy, ApnsDelegate delegate) {
83-
this(factory, host, port, proxy, proxyUsername, proxyPassword, reconnectPolicy, delegate, false,
87+
this(factory, host, port, proxy, proxyUsername, proxyPassword, reconnectPolicy, delegate, false, null,
8488
ApnsConnection.DEFAULT_CACHE_LENGTH, true, 0, 0);
8589
}
8690

8791
public ApnsConnectionImpl(SocketFactory factory, String host, int port, Proxy proxy, String proxyUsername, String proxyPassword,
88-
ReconnectPolicy reconnectPolicy, ApnsDelegate delegate, boolean errorDetection, int cacheLength,
92+
ReconnectPolicy reconnectPolicy, ApnsDelegate delegate, boolean errorDetection, ThreadFactory tf, int cacheLength,
8993
boolean autoAdjustCacheLength, int readTimeout, int connectTimeout) {
9094
this.factory = factory;
9195
this.host = host;
@@ -94,6 +98,7 @@ public ApnsConnectionImpl(SocketFactory factory, String host, int port, Proxy pr
9498
this.delegate = delegate == null ? ApnsDelegate.EMPTY : delegate;
9599
this.proxy = proxy;
96100
this.errorDetection = errorDetection;
101+
this.threadFactory = tf == null ? defaultThreadFactory() : tf;
97102
this.cacheLength = cacheLength;
98103
this.autoAdjustCacheLength = autoAdjustCacheLength;
99104
this.readTimeout = readTimeout;
@@ -104,25 +109,38 @@ public ApnsConnectionImpl(SocketFactory factory, String host, int port, Proxy pr
104109
notificationsBuffer = new ConcurrentLinkedQueue<ApnsNotification>();
105110
}
106111

112+
private ThreadFactory defaultThreadFactory() {
113+
return new ThreadFactory() {
114+
ThreadFactory wrapped = Executors.defaultThreadFactory();
115+
@Override
116+
public Thread newThread( Runnable r )
117+
{
118+
Thread result = wrapped.newThread(r);
119+
result.setName("MonitoringThread");
120+
result.setDaemon(true);
121+
return result;
122+
}
123+
};
124+
}
125+
107126
public synchronized void close() {
108127
Utilities.close(socket);
109128
}
110129

111130
private void monitorSocket(final Socket socket) {
112131
logger.debug("Launching Monitoring Thread for socket {}", socket);
113132

114-
class MonitoringThread extends Thread {
133+
Thread t = threadFactory.newThread(new Runnable() {
115134
final static int EXPECTED_SIZE = 6;
116135

117136
@SuppressWarnings("InfiniteLoopStatement")
118137
@Override
119138
public void run() {
120139
logger.debug("Started monitoring thread");
121140
try {
122-
123-
DataInputStream in;
141+
InputStream in;
124142
try {
125-
in = new DataInputStream(socket.getInputStream());
143+
in = socket.getInputStream();
126144
} catch (IOException ioe) {
127145
in = null;
128146
}
@@ -213,7 +231,7 @@ public void run() {
213231
* @return true if a packet as been read, false if the stream was at EOF right at the beginning.
214232
* @throws IOException When a problem occurs, especially EOFException when there's an EOF in the middle of the packet.
215233
*/
216-
private boolean readPacket(final DataInputStream in, final byte[] bytes) throws IOException {
234+
private boolean readPacket(final InputStream in, final byte[] bytes) throws IOException {
217235
final int len = bytes.length;
218236
int n = 0;
219237
while (n < len) {
@@ -231,10 +249,7 @@ private boolean readPacket(final DataInputStream in, final byte[] bytes) throws
231249
}
232250
return true;
233251
}
234-
}
235-
Thread t = new MonitoringThread();
236-
t.setName("MonitoringThread");
237-
t.setDaemon(true);
252+
});
238253
t.start();
239254
}
240255

@@ -353,7 +368,7 @@ private void cacheNotification(ApnsNotification notification) {
353368

354369
public ApnsConnectionImpl copy() {
355370
return new ApnsConnectionImpl(factory, host, port, proxy, proxyUsername, proxyPassword, reconnectPolicy.copy(), delegate,
356-
errorDetection, cacheLength, autoAdjustCacheLength, readTimeout, connectTimeout);
371+
errorDetection, threadFactory, cacheLength, autoAdjustCacheLength, readTimeout, connectTimeout);
357372
}
358373

359374
public void testConnection() throws NetworkIOException {

src/main/java/com/notnoop/apns/internal/ApnsPooledConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ protected ApnsConnection initialValue() {
3838
};
3939

4040
public void sendMessage(final ApnsNotification m) throws NetworkIOException {
41-
Future future = executors.submit(new Callable<Void>() {
41+
Future<Void> future = executors.submit(new Callable<Void>() {
4242
public Void call() throws Exception {
4343
uniquePrototype.get().sendMessage(m);
4444
return null;

src/main/java/com/notnoop/apns/internal/BatchApnsService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.util.Queue;
44
import java.util.concurrent.ConcurrentLinkedQueue;
5+
import java.util.concurrent.Executors;
56
import java.util.concurrent.ScheduledExecutorService;
67
import java.util.concurrent.ScheduledFuture;
78
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -47,7 +48,7 @@ public BatchApnsService(ApnsConnection prototype, ApnsFeedbackConnection feedbac
4748
this.prototype = prototype;
4849
this.batchWaitTimeInSec = batchWaitTimeInSec;
4950
this.maxBatchWaitTimeInSec = maxBachWaitTimeInSec;
50-
this.scheduleService = new ScheduledThreadPoolExecutor(1, tf);
51+
this.scheduleService = new ScheduledThreadPoolExecutor(1, tf == null ? Executors.defaultThreadFactory() : tf);
5152
}
5253

5354
public void start() {

src/main/java/com/notnoop/apns/internal/QueuedApnsService.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
import java.util.Map;
3535
import java.util.concurrent.atomic.AtomicBoolean;
3636
import java.util.concurrent.BlockingQueue;
37+
import java.util.concurrent.Executors;
3738
import java.util.concurrent.LinkedBlockingQueue;
39+
import java.util.concurrent.ThreadFactory;
3840

3941
import org.slf4j.Logger;
4042
import org.slf4j.LoggerFactory;
@@ -52,9 +54,14 @@ public class QueuedApnsService extends AbstractApnsService {
5254
private AtomicBoolean started = new AtomicBoolean(false);
5355

5456
public QueuedApnsService(ApnsService service) {
57+
this(service, null);
58+
}
59+
60+
public QueuedApnsService(ApnsService service, final ThreadFactory tf) {
5561
super(null);
5662
this.service = service;
5763
this.queue = new LinkedBlockingQueue<ApnsNotification>();
64+
this.threadFactory = tf == null ? Executors.defaultThreadFactory() : tf;
5865
this.thread = null;
5966
}
6067

@@ -66,6 +73,7 @@ public void push(ApnsNotification msg) {
6673
queue.add(msg);
6774
}
6875

76+
private final ThreadFactory threadFactory;
6977
private Thread thread;
7078
private volatile boolean shouldContinue;
7179

@@ -79,7 +87,7 @@ public void start() {
7987

8088
service.start();
8189
shouldContinue = true;
82-
thread = new Thread() {
90+
thread = threadFactory.newThread(new Runnable() {
8391
public void run() {
8492
while (shouldContinue) {
8593
try {
@@ -95,7 +103,7 @@ public void run() {
95103
}
96104
}
97105
}
98-
};
106+
});
99107
thread.start();
100108
}
101109

0 commit comments

Comments
 (0)