Skip to content

Commit

Permalink
Add support for Aerospike.
Browse files Browse the repository at this point in the history
  • Loading branch information
uncle-betty committed Jun 25, 2015
1 parent 45631e8 commit 35df165
Show file tree
Hide file tree
Showing 6 changed files with 282 additions and 0 deletions.
41 changes: 41 additions & 0 deletions aerospike/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
## Quick Start

This section describes how to run YCSB on Aerospike.

### 1. Start Aerospike

### 2. Install Java and Maven

### 3. Set Up YCSB

Git clone YCSB and compile:

git clone http://github.com/brianfrankcooper/YCSB.git
cd YCSB
mvn -pl com.yahoo.ycsb:aerospike-binding -am clean package

### 4. Provide Aerospike Connection Parameters

The following connection parameters are available.

* `as.host` - The Aerospike cluster to connect to (default: `localhost`)
* `as.port` - The port to connect to (default: `3000`)
* `as.user` - The user to connect as (no default)
* `as.password` - The password for the user (no default)
* `as.timeout` - The transaction and connection timeout (in ms, default: `1000`)
* `as.namespace` - The namespace to be used for the benchmark (default: `ycsb`)

Add them to the workload or set them with the shell command, as in:

./bin/ycsb load aerospike -s -P workloads/workloada -p as.timeout=5000 >outputLoad.txt

### 5. Load Data and Run Tests

Load the data:

./bin/ycsb load aerospike -s -P workloads/workloada >outputLoad.txt

Run the workload test:

./bin/ycsb run aerospike -s -P workloads/workloada >outputRun.txt

28 changes: 28 additions & 0 deletions aerospike/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>binding-parent</artifactId>
<version>0.3.0-SNAPSHOT</version>
<relativePath>../binding-parent</relativePath>
</parent>

<artifactId>aerospike-binding</artifactId>
<name>Aerospike DB Binding</name>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<version>${aerospike.version}</version>
</dependency>
<dependency>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
205 changes: 205 additions & 0 deletions aerospike/src/main/java/com/yahoo/ycsb/db/AerospikeClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package com.yahoo.ycsb.db;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.ResultCode;
import com.aerospike.client.policy.ClientPolicy;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.RecordExistsAction;
import com.aerospike.client.policy.WritePolicy;

import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DBException;

public class AerospikeClient extends com.yahoo.ycsb.DB {
private static final boolean DEBUG = false;

private static final String DEFAULT_HOST = "localhost";
private static final String DEFAULT_PORT = "3000";
private static final String DEFAULT_TIMEOUT = "10000";
private static final String DEFAULT_NAMESPACE = "ycsb";

private static final int RESULT_OK = 0;
private static final int RESULT_ERROR = 1;

private static final int WRITE_OVERLOAD_DELAY = 5;
private static final int WRITE_OVERLOAD_TRIES = 3;

private String namespace = null;

private com.aerospike.client.AerospikeClient client = null;
private int writeOverloadTries = WRITE_OVERLOAD_TRIES;

private Policy readPolicy = new Policy();
private WritePolicy insertPolicy = new WritePolicy();
private WritePolicy updatePolicy = new WritePolicy();
private WritePolicy deletePolicy = new WritePolicy();

public void init() throws DBException {
insertPolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY;
updatePolicy.recordExistsAction = RecordExistsAction.UPDATE_ONLY;

Properties props = getProperties();

namespace = props.getProperty("as.namespace", DEFAULT_NAMESPACE);

String host = props.getProperty("as.host", DEFAULT_HOST);
String user = props.getProperty("as.user");
String password = props.getProperty("as.password");
int port = Integer.parseInt(props.getProperty("as.port", DEFAULT_PORT));
int timeout = Integer.parseInt(props.getProperty("as.timeout",
DEFAULT_TIMEOUT));

readPolicy.timeout = timeout;
insertPolicy.timeout = timeout;
updatePolicy.timeout = timeout;
deletePolicy.timeout = timeout;

ClientPolicy clientPolicy = new ClientPolicy();

if (user != null && password != null) {
clientPolicy.user = user;
clientPolicy.password = password;
}

try {
client =
new com.aerospike.client.AerospikeClient(clientPolicy, host, port);
} catch (AerospikeException e) {
throw new DBException(String.format("Error while creating Aerospike " +
"client for %s:%d.", host, port));
}
}

public void cleanup() throws DBException {
client.close();
}

@Override
public int read(String table, String key, Set<String> fields,
HashMap<String, ByteIterator> result) {
try {
Record record;

if (fields != null) {
record = client.get(readPolicy, new Key(namespace, table, key),
fields.toArray(new String[fields.size()]));
} else {
record = client.get(readPolicy, new Key(namespace, table, key));
}

if (record == null) {
if (DEBUG) {
System.err.println("Record key " + key + " not found (read)");
}

return RESULT_ERROR;
}

for (Map.Entry<String, Object> entry: record.bins.entrySet()) {
result.put(entry.getKey(),
new ByteArrayByteIterator((byte[])entry.getValue()));
}

return RESULT_OK;
} catch (AerospikeException e) {
System.err.println("Error while reading key " + key + ": " + e);
return RESULT_ERROR;
}
}

@Override
public int scan(String table, String start, int count, Set<String> fields,
Vector<HashMap<String, ByteIterator>> result) {
System.err.println("Scan not implemented");
return RESULT_ERROR;
}

private int write(String table, String key, WritePolicy writePolicy,
HashMap<String, ByteIterator> values) {
if (writeOverloadTries == 0) {
return RESULT_ERROR;
}

Bin[] bins = new Bin[values.size()];
int index = 0;

for (Map.Entry<String, ByteIterator> entry: values.entrySet()) {
bins[index] = new Bin(entry.getKey(), entry.getValue().toArray());
++index;
}

int delay = WRITE_OVERLOAD_DELAY;
Key keyObj = new Key(namespace, table, key);

while (true) {
try {
client.put(writePolicy, keyObj, bins);
writeOverloadTries = WRITE_OVERLOAD_TRIES;
return RESULT_OK;
} catch (AerospikeException e) {
if (e.getResultCode() != ResultCode.DEVICE_OVERLOAD) {
System.err.println("Error while updating key " + key + ": " + e);
return RESULT_ERROR;
}

if (--writeOverloadTries == 0) {
if (DEBUG) {
System.err.println("Device overload: " + e);
}

return RESULT_ERROR;
}

try {
Thread.sleep(delay);
} catch (InterruptedException e2) {
if (DEBUG) {
System.err.println("Interrupted: " + e2);
}
}

delay *= 2;
}
}
}

@Override
public int update(String table, String key,
HashMap<String, ByteIterator> values) {
return write(table, key, updatePolicy, values);
}

@Override
public int insert(String table, String key,
HashMap<String, ByteIterator> values) {
return write(table, key, insertPolicy, values);
}

@Override
public int delete(String table, String key) {
try {
if (!client.delete(deletePolicy, new Key(namespace, table, key))) {
if (DEBUG) {
System.err.println("Record key " + key + " not found (delete)");
}

return RESULT_ERROR;
}

return RESULT_OK;
} catch (AerospikeException e) {
System.err.println("Error while deleting key " + key + ": " + e);
return RESULT_ERROR;
}
}
}
1 change: 1 addition & 0 deletions bin/ycsb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ COMMANDS = {

DATABASES = {
"accumulo" : "com.yahoo.ycsb.db.AccumuloClient",
"aerospike" : "com.yahoo.ycsb.db.AerospikeClient",
"basic" : "com.yahoo.ycsb.BasicDB",
"cassandra-7" : "com.yahoo.ycsb.db.CassandraClient7",
"cassandra-8" : "com.yahoo.ycsb.db.CassandraClient8",
Expand Down
5 changes: 5 additions & 0 deletions distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
<artifactId>accumulo-binding</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>aerospike-binding</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>cassandra-binding</artifactId>
Expand Down
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
<hypertable.version>0.9.5.6</hypertable.version>
<couchbase.version>1.1.8</couchbase.version>
<tarantool.version>1.6.1</tarantool.version>
<aerospike.version>3.1.2</aerospike.version>
</properties>

<modules>
Expand All @@ -78,6 +79,7 @@
<module>binding-parent</module>
<!-- all the datastore bindings, lex sorted please -->
<module>accumulo</module>
<module>aerospike</module>
<module>cassandra</module>
<module>couchbase</module>
<module>distribution</module>
Expand Down

0 comments on commit 35df165

Please sign in to comment.