forked from sqlancer/sqlancer
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathFelderaClient.java
More file actions
87 lines (68 loc) · 3.2 KB
/
FelderaClient.java
File metadata and controls
87 lines (68 loc) · 3.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package sqlancer.feldera.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.MapType;
import com.fasterxml.jackson.databind.type.TypeFactory;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.*;
public class FelderaClient {
private final HttpRequests httpRequests;
public FelderaClient(String url) {
this.httpRequests = new HttpRequests(url + "/v0");
}
public FelderaPipeline getPipeline(String name) throws Exception {
String resp = this.httpRequests.get(String.format("/pipelines/%s", name));
return FelderaPipeline.fromJson(resp);
}
private void waitForCompilation(String name) throws Exception {
List<String> wait = Arrays.asList("Pending", "CompilingSql", "SqlCompiled", "CompilingRust");
while (true) {
FelderaPipeline resp = this.getPipeline(name);
String status = resp.getProgramStatus();
if (Objects.equals(status, "Success")) {
return;
} else if (!wait.contains(status)) {
throw new AssertionError(String.format("err: pipeline: %s failed to compile: %s", name, status));
}
Thread.sleep(500);
}
}
public Map<String, Object> exec(String pipelineName, String sql) throws Exception {
Map<String, String> options = new HashMap<>();
options.put("sql", URLEncoder.encode(sql, StandardCharsets.UTF_8));
options.put("format", "json");
String resp = this.httpRequests.get(String.format("/pipelines/%s/query", pipelineName), options);
if (resp.isBlank()) {
return Collections.emptyMap();
}
ObjectMapper mapper = new ObjectMapper();
TypeFactory typeFactory = mapper.getTypeFactory();
MapType mapType = typeFactory.constructMapType(HashMap.class, String.class, Object.class);
return mapper.readValue(resp, mapType);
}
private void blockTillDesiredState(String pipelineName, String desired) throws Exception {
while (true) {
String deploymentStatus = this.getPipeline(pipelineName).getDeploymentStatus();
if (deploymentStatus.equalsIgnoreCase(desired)) {
break;
}
Thread.sleep(500);
}
}
public void createPipeline(String pipelineName, String body) throws Exception {
this.httpRequests.put(String.format("/pipelines/%s", pipelineName), body);
waitForCompilation(pipelineName);
}
public void start(String pipelineName) throws Exception {
this.httpRequests.post(String.format("/pipelines/%s/start", pipelineName), Collections.emptyMap());
blockTillDesiredState(pipelineName, "running");
}
public void pause(String pipelineName) throws Exception {
this.httpRequests.post(String.format("/pipelines/%s/pause", pipelineName), Collections.emptyMap());
blockTillDesiredState(pipelineName, "paused");
}
public void shutdown(String pipelineName) throws Exception {
this.httpRequests.post(String.format("/pipelines/%s/shutdown", pipelineName), Collections.emptyMap());
blockTillDesiredState(pipelineName, "shutdown");
}
}