License Public Domain
Lines 92
Keywords
Buffer (2) Java (1) Socket (3) Thread (4)
Included in this Library
Permissions
Group Owner: Threading
Viewable by Everyone
Editable by Spencer Ruport
Hide
Siafoo is here to make coding less frustrating and to save you time. Join Siafoo Now or Learn More

Thread safe byte buffer queue Atom Feed 0

In Brief Byte buffer queues are extremely useful in socket applications. I use this one in a few the projects I've worked on.
# 's
 1import java.util.concurrent.Semaphore;
2
3// Thread safe byte buffer queue
4public class BufferQueue {
5 private byte[] buff;
6 private volatile int head, tail, count, size;
7 private Semaphore appendsem, readsem, countsem;
8
9 // Constructor
10 public BufferQueue(int size) {
11 appendsem = new Semaphore(1, true);
12 readsem = new Semaphore(1, true);
13 countsem = new Semaphore(1, true);
14 buff = new byte[size];
15 this.size = size;
16 head = 0;
17 tail = 0;
18 count = 0;
19 }
20
21 // Get the number of bytes in the buffer
22 public int getCount() { return count; }
23
24 // Append bytes to the buffer
25 public void append(byte[] data) { if(data != null) append(data, 0, data.length); }
26 public void append(byte[] data, int offset, int length) {
27 if(data == null) return;
28 if(data.length < offset + length) { throw new RuntimeException("array index out of bounds. offset + length extends beyond the length of the array."); }
29
30 try { appendsem.acquire(); } catch (InterruptedException e) { return; }
31 // We need to acquire the semaphore so that this.tail doesn't change.
32 for(int i=0; i<length; i++)
33 buff[(i + this.tail) % this.size] = data[i + offset];
34 this.tail = (length + this.tail) % this.size;
35 try { countsem.acquire(); } catch (InterruptedException e) { return; }
36 // We need to acquire the semaphore so that this.count doesn't change.
37 this.count = this.count + length;
38 if(this.count > this.size)
39 throw new RuntimeException("Buffer overflow error.");
40 countsem.release();
41 appendsem.release();
42 }
43
44 // Read bytes from the buffer
45 public int read(byte[] data){ if(data != null) return read(data, 0, data.length); else return 0; }
46 public int read(byte[] data, int offset, int length) {
47 if(data == null) return 0;
48 if(data.length < offset + length) throw new RuntimeException("array index out of bounds. offset + length extends beyond the length of the array.");
49
50 int readlength = 0;
51
52 try { readsem.acquire(); } catch (InterruptedException e) { return 0; }
53 // We need to acquire the semaphore so that this.head doesn't change.
54 for(int i=0; i<length; i++) {
55 if(i == count) break;
56 data[i + offset] = buff[(i + head) % this.size];
57 readlength++;
58 }
59 this.head = (readlength + this.head) % this.size;
60 try { countsem.acquire(); } catch (InterruptedException e) { readsem.release(); return 0; }
61 // We need to acquire the semaphore so that this.count doesn't change.
62 this.count = this.count - readlength;
63 countsem.release();
64 readsem.release();
65
66 return readlength;
67 }
68
69 public int peek(byte[] data){ if(data != null) return peek(data, 0, data.length); else return 0; }
70 public int peek(byte[] data, int offset, int length) {
71 if(data == null) return 0;
72 if(data.length < offset + length) throw new RuntimeException("array index out of bounds. offset + length extends beyond the length of the array.");
73
74 int readlength = 0;
75
76 try { readsem.acquire(); } catch (InterruptedException e) { return 0; }
77 // We need to acquire the semaphore so that this.head doesn't change.
78 for(int i=0; i<length; i++) {
79 if(i == count) break;
80 data[i + offset] = buff[(i + head) % this.size];
81 readlength++;
82 }
83
84 readsem.release();
85
86 return readlength;
87 }
88 public byte[] readBytes()
89 {
90 byte[] data = new byte[count];
91 try { read(data); } catch(Exception ex) {}
92 return data;
93 }
94}

Byte buffer queues are extremely useful in socket applications. I use this one in a few the projects I've worked on.