Skip to content

Commit 4a9276a

Browse files
AFFINITY-6 Transparent version of the child-thread affinity reset feature
1 parent 74d122c commit 4a9276a

File tree

5 files changed

+293
-0
lines changed

5 files changed

+293
-0
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package java.lang;
2+
3+
4+
/**
5+
* A listener for various events in a Thread's life: creation, termination, etc.
6+
*/
7+
public interface ThreadLifecycleListener {
8+
9+
/**
10+
* The specified thread is about to be started.
11+
* @param t the thread which is being started
12+
*/
13+
void started(Thread t);
14+
15+
/**
16+
* The specified thread failed to start.
17+
* @param t the thread that had a failed start
18+
*/
19+
void startFailed(Thread t);
20+
21+
/**
22+
* The specified thread has been terminated.
23+
* @param t the thread that has been terminated
24+
*/
25+
void terminated(Thread t);
26+
27+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package java.lang;
2+
3+
/**
4+
* A wrapper of {@link java.lang.ThreadGroup} that tracks the creation and termination of threads.
5+
*/
6+
public class ThreadTrackingGroup extends ThreadGroup {
7+
8+
/**
9+
* Listener to be notified of various events in thread lifecycles.
10+
*/
11+
private final ThreadLifecycleListener listener;
12+
13+
public ThreadTrackingGroup(ThreadGroup parent, ThreadLifecycleListener listener) {
14+
super(parent, ThreadTrackingGroup.class.getSimpleName().toLowerCase() + System.identityHashCode(listener));
15+
this.listener = listener;
16+
}
17+
18+
@Override
19+
void add(Thread t) {
20+
System.out.println("ThreadTrackingGroup.add: " + t); //todo: remove
21+
super.add(t);
22+
listener.started(t);
23+
}
24+
25+
@Override
26+
void threadStartFailed(Thread t) {
27+
super.threadStartFailed(t);
28+
listener.startFailed(t);
29+
}
30+
31+
@Override
32+
void threadTerminated(Thread t) {
33+
super.threadTerminated(t);
34+
listener.terminated(t);
35+
}
36+
}

affinity/src/main/java/net/openhft/affinity/AffinitySupport.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,4 +140,25 @@ public static void setThreadId() {
140140
throw new IllegalStateException(e);
141141
}
142142
}
143+
144+
public static AffinityLock acquireLock() {
145+
return isNonForkingAffinityAvailable() ? NonForkingAffinityLock.acquireLock() : AffinityLock.acquireLock();
146+
}
147+
148+
public static AffinityLock acquireCore() {
149+
return isNonForkingAffinityAvailable() ? NonForkingAffinityLock.acquireCore() : AffinityLock.acquireCore();
150+
}
151+
152+
public static AffinityLock acquireLock(boolean bind) {
153+
return isNonForkingAffinityAvailable() ? NonForkingAffinityLock.acquireLock(bind) : AffinityLock.acquireLock(bind);
154+
}
155+
156+
public static AffinityLock acquireCore(boolean bind) {
157+
return isNonForkingAffinityAvailable() ? NonForkingAffinityLock.acquireCore(bind) : AffinityLock.acquireCore(bind);
158+
}
159+
160+
private static boolean isNonForkingAffinityAvailable() {
161+
BootClassPath bootClassPath = BootClassPath.INSTANCE;
162+
return bootClassPath.has("java.lang.ThreadTrackingGroup") && bootClassPath.has("java.lang.ThreadLifecycleListener");
163+
}
143164
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package net.openhft.affinity;
2+
3+
import sun.misc.URLClassPath;
4+
5+
import java.io.File;
6+
import java.net.MalformedURLException;
7+
import java.net.URL;
8+
import java.util.logging.Logger;
9+
10+
enum BootClassPath {
11+
INSTANCE;
12+
13+
private final Logger logger = Logger.getLogger(BootClassPath.class.getName());
14+
15+
private final URLClassPath bootClassPath = new URLClassPath(getBootClassPathURLs());
16+
17+
public final boolean has(String binaryClassName) {
18+
String resourceClassName = binaryClassName.replace('.', '/').concat(".class");
19+
return bootClassPath.getResource(resourceClassName, false) != null;
20+
}
21+
22+
private URL[] getBootClassPathURLs() {
23+
try {
24+
String bootClassPath = System.getProperty("sun.boot.class.path");
25+
logger.fine("Boot class-path is: " + bootClassPath);
26+
27+
String pathSeparator = System.getProperty("path.separator");
28+
logger.fine("Path separator is: '" + pathSeparator + "'");
29+
30+
String[] pathElements = bootClassPath.split(pathSeparator);
31+
URL[] pathURLs = new URL[pathElements.length];
32+
for (int i = 0; i < pathElements.length; i++) {
33+
pathURLs[i] = new File(pathElements[i]).toURI().toURL();
34+
}
35+
36+
return pathURLs;
37+
} catch (MalformedURLException e) {
38+
logger.warning("Parsing the boot class-path failed! Reason: " + e.getMessage());
39+
return new URL[0];
40+
}
41+
}
42+
}
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package net.openhft.affinity;
2+
3+
import net.openhft.affinity.impl.NoCpuLayout;
4+
import org.jetbrains.annotations.NotNull;
5+
6+
import java.lang.reflect.Field;
7+
8+
public class NonForkingAffinityLock extends AffinityLock implements ThreadLifecycleListener {
9+
10+
private static final Field GROUP_FIELD = makeThreadFieldModifiable("group");
11+
12+
private static final Field TARGET_FIELD = makeThreadFieldModifiable("target");
13+
14+
private static final LockInventory LOCK_INVENTORY = new LockInventory(new NoCpuLayout(PROCESSORS)) {
15+
@Override
16+
protected AffinityLock newLock(int cpuId, boolean base, boolean reservable) {
17+
return new NonForkingAffinityLock(cpuId, base, reservable, this);
18+
}
19+
};
20+
21+
/**
22+
* Assign any free cpu to this thread.
23+
*
24+
* @return A handle for the current AffinityLock.
25+
*/
26+
public static AffinityLock acquireLock() {
27+
return acquireLock(true);
28+
}
29+
30+
/**
31+
* Assign any free core to this thread.
32+
* <p></p>
33+
* In reality, only one cpu is assigned, the rest of the threads for that core are reservable so they are not used.
34+
*
35+
* @return A handle for the current AffinityLock.
36+
*/
37+
public static AffinityLock acquireCore() {
38+
return acquireCore(true);
39+
}
40+
41+
/**
42+
* Assign a cpu which can be bound to the current thread or another thread.
43+
* <p></p>
44+
* This can be used for defining your thread layout centrally and passing the handle via dependency injection.
45+
*
46+
* @param bind if true, bind the current thread, if false, reserve a cpu which can be bound later.
47+
* @return A handle for an affinity lock.
48+
*/
49+
public static AffinityLock acquireLock(boolean bind) {
50+
return acquireLock(bind, -1, AffinityStrategies.ANY);
51+
}
52+
53+
/**
54+
* Assign a core(and all its cpus) which can be bound to the current thread or another thread.
55+
* <p></p>
56+
* This can be used for defining your thread layout centrally and passing the handle via dependency injection.
57+
*
58+
* @param bind if true, bind the current thread, if false, reserve a cpu which can be bound later.
59+
* @return A handle for an affinity lock.
60+
*/
61+
public static AffinityLock acquireCore(boolean bind) {
62+
return acquireCore(bind, -1, AffinityStrategies.ANY);
63+
}
64+
65+
private static AffinityLock acquireLock(boolean bind, int cpuId, @NotNull AffinityStrategy... strategies) {
66+
return LOCK_INVENTORY.acquireLock(bind, cpuId, strategies);
67+
}
68+
69+
private static AffinityLock acquireCore(boolean bind, int cpuId, @NotNull AffinityStrategy... strategies) {
70+
return LOCK_INVENTORY.acquireCore(bind, cpuId, strategies);
71+
}
72+
73+
/**
74+
* Set the CPU layout for this machine. CPUs which are not mentioned will be ignored.
75+
* <p></p>
76+
* Changing the layout will have no impact on thread which have already been assigned.
77+
* It only affects subsequent assignments.
78+
*
79+
* @param cpuLayout for this application to use for this machine.
80+
*/
81+
public static void cpuLayout(@NotNull CpuLayout cpuLayout) {
82+
LOCK_INVENTORY.set(cpuLayout);
83+
}
84+
85+
/**
86+
* @return The current CpuLayout for the application.
87+
*/
88+
@NotNull
89+
public static CpuLayout cpuLayout() {
90+
return LOCK_INVENTORY.getCpuLayout();
91+
}
92+
93+
/**
94+
* @return All the current locks as a String.
95+
*/
96+
@NotNull
97+
public static String dumpLocks() {
98+
return LOCK_INVENTORY.dumpLocks();
99+
}
100+
101+
NonForkingAffinityLock(int cpuId, boolean base, boolean reservable, LockInventory lockInventory) {
102+
super(cpuId, base, reservable, lockInventory);
103+
}
104+
105+
@Override
106+
public void bind(boolean wholeCore) {
107+
super.bind(wholeCore);
108+
Thread thread = Thread.currentThread();
109+
changeGroupOfThread(thread, new ThreadTrackingGroup(thread.getThreadGroup(), this));
110+
}
111+
112+
@Override
113+
public void release() {
114+
Thread thread = Thread.currentThread();
115+
changeGroupOfThread(thread, thread.getThreadGroup().getParent());
116+
super.release();
117+
}
118+
119+
@Override
120+
public void started(Thread t) {
121+
wrapRunnableOfThread(t, this);
122+
}
123+
124+
@Override
125+
public void startFailed(Thread t) {
126+
}
127+
128+
@Override
129+
public void terminated(Thread t) {
130+
}
131+
132+
private static Field makeThreadFieldModifiable(String fieldName) {
133+
try {
134+
Field field = Thread.class.getDeclaredField(fieldName);
135+
field.setAccessible(true);
136+
return field;
137+
} catch (NoSuchFieldException e) {
138+
throw new RuntimeException(Thread.class.getName() + " class doesn't have a " + fieldName + " field! Quite unexpected!");
139+
}
140+
}
141+
142+
private static void changeGroupOfThread(Thread thread, ThreadGroup group) {
143+
try {
144+
GROUP_FIELD.set(thread, group);
145+
} catch (IllegalAccessException e) {
146+
throw new RuntimeException("Failed changing " + Thread.class.getName() + "'s the '" + GROUP_FIELD.getName() + "' field! Reason: " + e.getMessage());
147+
}
148+
}
149+
150+
private static void wrapRunnableOfThread(Thread thread, final AffinityLock lock) {
151+
try {
152+
final Runnable originalRunnable = (Runnable) TARGET_FIELD.get(thread);
153+
TARGET_FIELD.set(
154+
thread,
155+
new Runnable() {
156+
@Override
157+
public void run() {
158+
lock.release();
159+
originalRunnable.run();
160+
}
161+
}
162+
);
163+
} catch (IllegalAccessException e) {
164+
throw new RuntimeException("Failed wrapping " + Thread.class.getName() + "'s '" + TARGET_FIELD.getName() + "' field! Reason: " + e.getMessage());
165+
}
166+
}
167+
}

0 commit comments

Comments
 (0)