License Public Domain
Lines 92
Keywords
Buffer (2) Java (1) Socket (3) Thread (4)
Included in this Library
Permissions
Group Owner: Threading
Viewable by Everyone
Editable by Spencer Ruport
Hide
Stay up to dateembedded code automagically updates, each snippet and article has a feed Join Siafoo Now or Learn More

Thread safe byte buffer queue Atom Feed 0

In Brief Byte buffer queues are extremely useful in socket applications. I use this one in a few the projects I've worked on.
# 's
 1import java.util.concurrent.Semaphore;
2
3// Thread safe byte buffer queue
4public class BufferQueue {
5 private byte[] buff;
6 private volatile int head, tail, count, size;
7 private Semaphore appendsem, readsem, countsem;
8
9 // Constructor
10 public BufferQueue(int size) {
11 appendsem = new Semaphore(1, true);
12 readsem = new Semaphore(1, true);
13 countsem = new Semaphore(1, true);
14 buff = new byte[size];
15 this.size = size;
16 head = 0;
17 tail = 0;
18 count = 0;
19 }
20
21 // Get the number of bytes in the buffer
22 public int getCount() { return count; }
23
24 // Append bytes to the buffer
25 public void append(byte[] data) { if(data != null) append(data, 0, data.length); }
26 public void append(byte[] data, int offset, int length) {
27 if(data == null) return;
28 if(data.length < offset + length) { throw new RuntimeException("array index out of bounds. offset + length extends beyond the length of the array."); }
29
30 try { appendsem.acquire(); } catch (InterruptedException e) { return; }
31 // We need to acquire the semaphore so that this.tail doesn't change.
32 for(int i=0; i<length; i++)
33 buff[(i + this.tail) % this.size] = data[i + offset];
34 this.tail = (length + this.tail) % this.size;
35 try { countsem.acquire(); } catch (InterruptedException e) { return; }
36 // We need to acquire the semaphore so that this.count doesn't change.
37 this.count = this.count + length;
38 if(this.count > this.size)
39 throw new RuntimeException("Buffer overflow error.");
40 countsem.release();
41 appendsem.release();
42 }
43
44 // Read bytes from the buffer
45 public int read(byte[] data){ if(data != null) return read(data, 0, data.length); else return 0; }
46 public int read(byte[] data, int offset, int length) {
47 if(data == null) return 0;
48 if(data.length < offset + length) throw new RuntimeException("array index out of bounds. offset + length extends beyond the length of the array.");
49
50 int readlength = 0;
51
52 try { readsem.acquire(); } catch (InterruptedException e) { return 0; }
53 // We need to acquire the semaphore so that this.head doesn't change.
54 for(int i=0; i<length; i++) {
55 if(i == count) break;
56 data[i + offset] = buff[(i + head) % this.size];
57 readlength++;
58 }
59 this.head = (readlength + this.head) % this.size;
60 try { countsem.acquire(); } catch (InterruptedException e) { readsem.release(); return 0; }
61 // We need to acquire the semaphore so that this.count doesn't change.
62 this.count = this.count - readlength;
63 countsem.release();
64 readsem.release();
65
66 return readlength;
67 }
68
69 public int peek(byte[] data){ if(data != null) return peek(data, 0, data.length); else return 0; }
70 public int peek(byte[] data, int offset, int length) {
71 if(data == null) return 0;
72 if(data.length < offset + length) throw new RuntimeException("array index out of bounds. offset + length extends beyond the length of the array.");
73
74 int readlength = 0;
75
76 try { readsem.acquire(); } catch (InterruptedException e) { return 0; }
77 // We need to acquire the semaphore so that this.head doesn't change.
78 for(int i=0; i<length; i++) {
79 if(i == count) break;
80 data[i + offset] = buff[(i + head) % this.size];
81 readlength++;
82 }
83
84 readsem.release();
85
86 return readlength;
87 }
88 public byte[] readBytes()
89 {
90 byte[] data = new byte[count];
91 try { read(data); } catch(Exception ex) {}
92 return data;
93 }
94}

Byte buffer queues are extremely useful in socket applications. I use this one in a few the projects I've worked on.