Skip to content

Commit ea14532

Browse files
committed
Follow terracotta-api#117 changes
1 parent c11c63d commit ea14532

File tree

9 files changed

+157
-7
lines changed

9 files changed

+157
-7
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
target/
22

3+
*.iml
34
.project
45
.classpath
6+
.idea/
57
.settings/

galvan-support/src/main/java/org/terracotta/testing/client/IPCClusterControl.java

+10
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,16 @@ public void waitForActive() throws Exception {
6262
ipcManager.waitForActive();
6363
}
6464

65+
@Override
66+
public void terminateActive() throws Exception {
67+
ipcManager.terminateActive();
68+
}
69+
70+
@Override
71+
public void startLastTerminatedServer() throws Exception {
72+
ipcManager.startLastTerminatedServer();
73+
}
74+
6575
@Override
6676
public void waitForPassive() throws Exception {
6777
ipcManager.waitForPassive();

galvan-support/src/main/java/org/terracotta/testing/rules/BasicExternalCluster.java

+10
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,16 @@ public void waitForActive() throws Exception {
197197
cluster.stripeControl.waitForActive();
198198
}
199199

200+
@Override
201+
public void terminateActive() throws Exception {
202+
cluster.stripeControl.terminateActive();
203+
}
204+
205+
@Override
206+
public void startLastTerminatedServer() throws Exception {
207+
cluster.stripeControl.startLastTerminatedServer();
208+
}
209+
200210
@Override
201211
public void waitForPassive() throws Exception {
202212
cluster.stripeControl.waitForPassive();

galvan/src/main/java/org/terracotta/testing/client/ClientSideIPCManager.java

+4
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ public void restartActive() {
8484
sendAndWait(IPCMessageConstants.RESTART_ACTIVE);
8585
}
8686

87+
public void startLastTerminatedServer() { sendAndWait(IPCMessageConstants.START_LAST_TERMINATED_ACTIVE); }
88+
89+
public void terminateActive() { sendAndWait(IPCMessageConstants.TERMINATE_ACTIVE); }
90+
8791
public void shutDownStripeAndWaitForTermination() {
8892
sendAndWait(IPCMessageConstants.SHUT_DOWN_STRIPE);
8993
}

galvan/src/main/java/org/terracotta/testing/common/IPCMessageConstants.java

+8
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,14 @@ public static String ackFrom(String message) {
3535
public static final String RESTART_ACTIVE_SYN = synFrom(RESTART_ACTIVE);
3636
public static final String RESTART_ACTIVE_ACK = ackFrom(RESTART_ACTIVE);
3737

38+
public static final String TERMINATE_ACTIVE = "TERMINATE_ACTIVE";
39+
public static final String TERMINATE_ACTIVE_SYN = synFrom(TERMINATE_ACTIVE);
40+
public static final String TERMINATE_ACTIVE_ACK = ackFrom(TERMINATE_ACTIVE);
41+
42+
public static final String START_LAST_TERMINATED_ACTIVE = "START_LAST_TERMINATE_ACTIVE";
43+
public static final String START_LAST_TERMINATED_ACTIVE_SYN = synFrom(START_LAST_TERMINATED_ACTIVE);
44+
public static final String START_LAST_TERMINATED_ACTIVE_ACK = ackFrom(START_LAST_TERMINATED_ACTIVE);
45+
3846
public static final String SHUT_DOWN_STRIPE = "SHUT_DOWN_STRIPE";
3947
public static final String SHUT_DOWN_STRIPE_SYN = synFrom(SHUT_DOWN_STRIPE);
4048
public static final String SHUT_DOWN_STRIPE_ACK = ackFrom(SHUT_DOWN_STRIPE);

galvan/src/main/java/org/terracotta/testing/master/ClientEventManager.java

+22
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,28 @@ public void onEvent(Event e) throws Throwable {
5858
processStdin.println(IPCMessageConstants.RESTART_ACTIVE_ACK);
5959
processStdin.flush();
6060
}});
61+
62+
String terminateActiveEventName = "Terminate active";
63+
eventMap.put(IPCMessageConstants.TERMINATE_ACTIVE_SYN, terminateActiveEventName);
64+
subBus.on(terminateActiveEventName, new EventListener() {
65+
@Override
66+
public void onEvent(Event e) throws Throwable {
67+
control.terminateActive();
68+
// We also want to send the ACK to the client.
69+
processStdin.println(IPCMessageConstants.TERMINATE_ACTIVE_ACK);
70+
processStdin.flush();
71+
}});
72+
73+
String startLastTerminatedServerEventName = "Start Last Terminated active";
74+
eventMap.put(IPCMessageConstants.START_LAST_TERMINATED_ACTIVE_SYN, startLastTerminatedServerEventName);
75+
subBus.on(startLastTerminatedServerEventName, new EventListener() {
76+
@Override
77+
public void onEvent(Event e) throws Throwable {
78+
control.startLastTerminatedServer();
79+
// We also want to send the ACK to the client.
80+
processStdin.println(IPCMessageConstants.START_LAST_TERMINATED_ACTIVE_ACK);
81+
processStdin.flush();
82+
}});
6183

6284
String shutDownStripeEventName = "Shut down stripe";
6385
eventMap.put(IPCMessageConstants.SHUT_DOWN_STRIPE_SYN, shutDownStripeEventName);

galvan/src/main/java/org/terracotta/testing/master/IMultiProcessControl.java

+4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ public interface IMultiProcessControl {
2828

2929
public void restartActive();
3030

31+
public void terminateActive();
32+
33+
public void startLastTerminatedServer();
34+
3135
public void shutDown();
3236

3337
public void waitForActive();

galvan/src/main/java/org/terracotta/testing/master/MultiStripeProcessControl.java

+14
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,20 @@ public synchronized void restartActive() {
4141
}
4242
}
4343

44+
@Override
45+
public void terminateActive() {
46+
for (IMultiProcessControl oneControl : this.subControl) {
47+
oneControl.terminateActive();
48+
}
49+
}
50+
51+
@Override
52+
public void startLastTerminatedServer() {
53+
for (IMultiProcessControl oneControl : this.subControl) {
54+
oneControl.startLastTerminatedServer();
55+
}
56+
}
57+
4458
@Override
4559
public synchronized void shutDown() {
4660
for (IMultiProcessControl oneControl : this.subControl) {

galvan/src/main/java/org/terracotta/testing/master/SynchronousProcessControl.java

+83-7
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
package org.terracotta.testing.master;
1717

1818
import java.io.IOException;
19+
import java.util.EmptyStackException;
1920
import java.util.List;
21+
import java.util.Stack;
2022
import java.util.Vector;
2123

2224
import org.terracotta.testing.common.Assert;
@@ -34,6 +36,7 @@ public class SynchronousProcessControl implements IMultiProcessControl {
3436
private final List<ServerProcess> unknownServers = new Vector<ServerProcess>();
3537
// It is invalid to use a process control object which has already been shut down so keep that flag.
3638
private boolean isShutDown;
39+
private Stack<ServerProcess> terminatedServers = new Stack();
3740

3841
public SynchronousProcessControl(ITestStateManager stateManager, ContextualLogger logger) {
3942
this.stateManager = stateManager;
@@ -84,6 +87,60 @@ public synchronized void restartActive() {
8487
this.logger.output("<<< restartActive");
8588
}
8689

90+
@Override
91+
public void terminateActive() {
92+
this.logger.output(">>> terminateActive");
93+
verifyNotShutdown();
94+
// First, make sure that there is an active.
95+
internalWaitForActive();
96+
// We MUST now have an active.
97+
Assert.assertTrue(null != this.activeServer);
98+
99+
// Remove the active.
100+
ServerProcess victim = this.activeServer;
101+
this.activeServer = null;
102+
// Stop it.
103+
try {
104+
int ret = victim.stop();
105+
this.logger.output("terminated server, return status: " + ret);
106+
} catch (InterruptedException e) {
107+
// We can't leave a consistent state if interrupted at this point.
108+
Assert.unexpected(e);
109+
}
110+
// Return the process to its installation and get a new one.
111+
ServerInstallation underlyingInstallation = victim.getUnderlyingInstallation();
112+
underlyingInstallation.retireProcess(victim);
113+
114+
// Close its logs.
115+
try {
116+
underlyingInstallation.closeStandardLogFiles();
117+
} catch (IOException e) {
118+
// We don't expect this IOException on closing the logs.
119+
Assert.unexpected(e);
120+
}
121+
}
122+
123+
@Override
124+
public void startLastTerminatedServer() {
125+
try {
126+
ServerProcess lastTerminatedServer = terminatedServers.pop();
127+
128+
ServerInstallation underlyingInstallation = lastTerminatedServer.getUnderlyingInstallation();
129+
ServerProcess freshProcess = underlyingInstallation.createNewProcess(this.stateManager);
130+
131+
// Start it.
132+
long pid = freshProcess.start();
133+
this.logger.output("Server restarted with PID: " + pid);
134+
// Enqueue it onto the unknown list.
135+
this.unknownServers.add(freshProcess);
136+
137+
// At this point, we don't know the active server.
138+
this.logger.output("<<< restartActive");
139+
} catch (EmptyStackException e) {
140+
throw new RuntimeException("There are no terminated processes");
141+
}
142+
}
143+
87144
@Override
88145
public synchronized void shutDown() {
89146
this.logger.output(">>> shutDown");
@@ -175,14 +232,23 @@ private void internalWaitForActive() {
175232
waitForAllUnknowns();
176233
// If we still have no active, walk the passives to see if one was promoted.
177234
if (null == this.activeServer) {
178-
// Note that we don't want to walk this list while placing servers in states since looping the list, at this point, is either a bug or a serious race condition.
179-
Vector<ServerProcess> oldPassives = new Vector<ServerProcess>(this.passiveServers);
180-
this.passiveServers.clear();
181-
while (!oldPassives.isEmpty()) {
182-
ServerProcess passive = oldPassives.remove(0);
183-
waitAndPlaceServerInState(passive);
235+
checkAllPassivesState();
236+
237+
// If we still can't find an active, do multiple retries with some timeout as it might take sometime for
238+
// other passives to become active (Note that this is a hacky way to fix the issue for now, we are going to
239+
// address this properly later)
240+
if(null == this.activeServer) {
241+
int retryCount = 30;
242+
while (retryCount-- != 0) {
243+
try {
244+
Thread.sleep(1000);
245+
checkAllPassivesState();
246+
} catch (InterruptedException e) {
247+
Assert.unexpected(e);
248+
}
249+
}
184250
}
185-
251+
186252
// See if we are still without an active.
187253
if (null == this.activeServer) {
188254
// If there is no active at this point, it probably means that there is something seriously wrong with the
@@ -193,6 +259,16 @@ private void internalWaitForActive() {
193259
}
194260
}
195261

262+
private void checkAllPassivesState() {
263+
// Note that we don't want to walk this list while placing servers in states since looping the list, at this point, is either a bug or a serious race condition.
264+
Vector<ServerProcess> oldPassives = new Vector<ServerProcess>(this.passiveServers);
265+
this.passiveServers.clear();
266+
while (!oldPassives.isEmpty()) {
267+
ServerProcess passive = oldPassives.remove(0);
268+
waitAndPlaceServerInState(passive);
269+
}
270+
}
271+
196272
private void waitForAllUnknowns() {
197273
while (!this.unknownServers.isEmpty()) {
198274
ServerProcess unknown = this.unknownServers.remove(0);

0 commit comments

Comments
 (0)