Skip to content

Commit 471d2c0

Browse files
authored
Fix CircularBuffer and add unit tests (TheAlgorithms#3411)
1 parent e96f567 commit 471d2c0

File tree

2 files changed

+165
-109
lines changed

2 files changed

+165
-109
lines changed
Lines changed: 38 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -1,132 +1,61 @@
11
package com.thealgorithms.datastructures.buffers;
22

3-
import java.util.Random;
43
import java.util.concurrent.atomic.AtomicInteger;
54

6-
public class CircularBuffer {
5+
public class CircularBuffer<Item> {
6+
private final Item[] buffer;
7+
private final CircularPointer putPointer;
8+
private final CircularPointer getPointer;
9+
private final AtomicInteger size = new AtomicInteger(0);
710

8-
private char[] _buffer;
9-
public final int _buffer_size;
10-
private int _write_index = 0;
11-
private int _read_index = 0;
12-
private AtomicInteger _readable_data = new AtomicInteger(0);
13-
14-
public CircularBuffer(int buffer_size) {
15-
if (!IsPowerOfTwo(buffer_size)) {
16-
throw new IllegalArgumentException();
17-
}
18-
this._buffer_size = buffer_size;
19-
_buffer = new char[buffer_size];
11+
public CircularBuffer(int size) {
12+
//noinspection unchecked
13+
this.buffer = (Item[]) new Object[size];
14+
this.putPointer = new CircularPointer(0, size);
15+
this.getPointer = new CircularPointer(0, size);
2016
}
2117

22-
private boolean IsPowerOfTwo(int i) {
23-
return (i & (i - 1)) == 0;
18+
public boolean isEmpty() {
19+
return size.get() == 0;
2420
}
2521

26-
private int getTrueIndex(int i) {
27-
return i % _buffer_size;
22+
public boolean isFull() {
23+
return size.get() == buffer.length;
2824
}
2925

30-
public Character readOutChar() {
31-
Character result = null;
32-
33-
// if we have data to read
34-
if (_readable_data.get() > 0) {
35-
result = Character.valueOf(_buffer[getTrueIndex(_read_index)]);
36-
_readable_data.decrementAndGet();
37-
_read_index++;
38-
}
26+
public Item get() {
27+
if (isEmpty())
28+
return null;
3929

40-
return result;
30+
Item item = buffer[getPointer.getAndIncrement()];
31+
size.decrementAndGet();
32+
return item;
4133
}
4234

43-
public boolean writeToCharBuffer(char c) {
44-
boolean result = false;
45-
46-
// if we can write to the buffer
47-
if (_readable_data.get() < _buffer_size) {
48-
// write to buffer
49-
_buffer[getTrueIndex(_write_index)] = c;
50-
_readable_data.incrementAndGet();
51-
_write_index++;
52-
result = true;
53-
}
35+
public boolean put(Item item) {
36+
if (isFull())
37+
return false;
5438

55-
return result;
39+
buffer[putPointer.getAndIncrement()] = item;
40+
size.incrementAndGet();
41+
return true;
5642
}
5743

58-
private static class TestWriteWorker implements Runnable {
44+
private static class CircularPointer {
45+
private int pointer;
46+
private final int max;
5947

60-
String _alphabet = "abcdefghijklmnopqrstuvwxyz0123456789";
61-
Random _random = new Random();
62-
CircularBuffer _buffer;
63-
64-
public TestWriteWorker(CircularBuffer cb) {
65-
this._buffer = cb;
66-
}
67-
68-
private char getRandomChar() {
69-
return _alphabet.charAt(_random.nextInt(_alphabet.length()));
48+
public CircularPointer(int pointer, int max) {
49+
this.pointer = pointer;
50+
this.max = max;
7051
}
7152

72-
public void run() {
73-
while (!Thread.interrupted()) {
74-
if (!_buffer.writeToCharBuffer(getRandomChar())) {
75-
Thread.yield();
76-
try {
77-
Thread.sleep(10);
78-
} catch (InterruptedException e) {
79-
return;
80-
}
81-
}
82-
}
53+
public int getAndIncrement() {
54+
if (pointer == max)
55+
pointer = 0;
56+
int tmp = pointer;
57+
pointer++;
58+
return tmp;
8359
}
8460
}
85-
86-
private static class TestReadWorker implements Runnable {
87-
88-
CircularBuffer _buffer;
89-
90-
public TestReadWorker(CircularBuffer cb) {
91-
this._buffer = cb;
92-
}
93-
94-
@Override
95-
public void run() {
96-
System.out.println("Printing Buffer:");
97-
while (!Thread.interrupted()) {
98-
Character c = _buffer.readOutChar();
99-
if (c != null) {
100-
System.out.print(c.charValue());
101-
} else {
102-
Thread.yield();
103-
try {
104-
Thread.sleep(10);
105-
} catch (InterruptedException e) {
106-
System.out.println();
107-
return;
108-
}
109-
}
110-
}
111-
}
112-
}
113-
114-
public static void main(String[] args) throws InterruptedException {
115-
int buffer_size = 1024;
116-
// create circular buffer
117-
CircularBuffer cb = new CircularBuffer(buffer_size);
118-
119-
// create threads that read and write the buffer.
120-
Thread write_thread = new Thread(new TestWriteWorker(cb));
121-
Thread read_thread = new Thread(new TestReadWorker(cb));
122-
read_thread.start();
123-
write_thread.start();
124-
125-
// wait some amount of time
126-
Thread.sleep(10000);
127-
128-
// interrupt threads and exit
129-
write_thread.interrupt();
130-
read_thread.interrupt();
131-
}
13261
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package com.thealgorithms.datastructures.buffers;
2+
3+
import org.junit.jupiter.api.BeforeEach;
4+
import org.junit.jupiter.api.RepeatedTest;
5+
import org.junit.jupiter.api.Test;
6+
7+
import java.util.ArrayList;
8+
import java.util.Comparator;
9+
import java.util.List;
10+
import java.util.concurrent.*;
11+
import java.util.concurrent.atomic.AtomicIntegerArray;
12+
13+
import static org.junit.jupiter.api.Assertions.*;
14+
15+
class CircularBufferTest {
16+
private static final int BUFFER_SIZE = 10;
17+
private CircularBuffer<Integer> buffer;
18+
19+
@BeforeEach
20+
void setUp() {
21+
buffer = new CircularBuffer<>(BUFFER_SIZE);
22+
}
23+
24+
@Test
25+
void isEmpty() {
26+
assertTrue(buffer.isEmpty());
27+
buffer.put(generateInt());
28+
assertFalse(buffer.isEmpty());
29+
}
30+
31+
@Test
32+
void isFull() {
33+
assertFalse(buffer.isFull());
34+
buffer.put(generateInt());
35+
assertFalse(buffer.isFull());
36+
37+
for (int i = 1; i < BUFFER_SIZE; i++)
38+
buffer.put(generateInt());
39+
assertTrue(buffer.isFull());
40+
}
41+
42+
@Test
43+
void get() {
44+
assertNull(buffer.get());
45+
for (int i = 0; i < 100; i++)
46+
buffer.put(i);
47+
for (int i = 0; i < BUFFER_SIZE; i++)
48+
assertEquals(i, buffer.get());
49+
assertNull(buffer.get());
50+
}
51+
52+
@Test
53+
void put() {
54+
for (int i = 0; i < BUFFER_SIZE; i++)
55+
assertTrue(buffer.put(generateInt()));
56+
assertFalse(buffer.put(generateInt()));
57+
}
58+
59+
@RepeatedTest(1000)
60+
void concurrentTest() throws InterruptedException {
61+
final int numberOfThreadsForProducers = 3;
62+
final int numberOfThreadsForConsumers = 2;
63+
final int numberOfItems = 300;
64+
final CountDownLatch producerCountDownLatch = new CountDownLatch(numberOfItems);
65+
final CountDownLatch consumerCountDownLatch = new CountDownLatch(numberOfItems);
66+
final AtomicIntegerArray resultAtomicArray = new AtomicIntegerArray(numberOfItems);
67+
68+
// We are running 2 ExecutorService simultaneously 1 - producer, 2 - consumer
69+
// Run producer threads to populate buffer.
70+
ExecutorService putExecutors = Executors.newFixedThreadPool(numberOfThreadsForProducers);
71+
putExecutors.execute(() -> {
72+
while (producerCountDownLatch.getCount() > 0) {
73+
int count = (int) producerCountDownLatch.getCount();
74+
boolean put = buffer.put(count);
75+
while (!put) put = buffer.put(count);
76+
producerCountDownLatch.countDown();
77+
}
78+
});
79+
80+
// Run consumer threads to retrieve the data from buffer.
81+
ExecutorService getExecutors = Executors.newFixedThreadPool(numberOfThreadsForConsumers);
82+
getExecutors.execute(() -> {
83+
while (consumerCountDownLatch.getCount() > 0) {
84+
int count = (int) consumerCountDownLatch.getCount();
85+
Integer item = buffer.get();
86+
while (item == null) item = buffer.get();
87+
resultAtomicArray.set(count - 1, item);
88+
consumerCountDownLatch.countDown();
89+
}
90+
});
91+
92+
producerCountDownLatch.await();
93+
consumerCountDownLatch.await();
94+
putExecutors.shutdown();
95+
getExecutors.shutdown();
96+
shutDownExecutorSafely(putExecutors);
97+
shutDownExecutorSafely(getExecutors);
98+
99+
List<Integer> resultArray = getSortedListFrom(resultAtomicArray);
100+
for (int i = 0; i < numberOfItems; i++) {
101+
int expectedItem = i + 1;
102+
assertEquals(expectedItem, resultArray.get(i));
103+
}
104+
}
105+
106+
private int generateInt() {
107+
return ThreadLocalRandom.current().nextInt(0, 100);
108+
}
109+
110+
private void shutDownExecutorSafely(ExecutorService executorService) {
111+
try {
112+
if (!executorService.awaitTermination(1_000, TimeUnit.MILLISECONDS))
113+
executorService.shutdownNow();
114+
} catch (InterruptedException e) {
115+
executorService.shutdownNow();
116+
}
117+
}
118+
119+
public List<Integer> getSortedListFrom(AtomicIntegerArray atomicArray) {
120+
int length = atomicArray.length();
121+
ArrayList<Integer> result = new ArrayList<>(length);
122+
for (int i = 0; i < length; i++)
123+
result.add(atomicArray.get(i));
124+
result.sort(Comparator.comparingInt(o -> o));
125+
return result;
126+
}
127+
}

0 commit comments

Comments
 (0)