View Javadoc

1   /**
2    *  BlueCove - Java library for Bluetooth
3    *  Copyright (C) 2008 Michael Lifshits
4    *  Copyright (C) 2008 Vlad Skarzhevskyy
5    *
6    *  Licensed to the Apache Software Foundation (ASF) under one
7    *  or more contributor license agreements.  See the NOTICE file
8    *  distributed with this work for additional information
9    *  regarding copyright ownership.  The ASF licenses this file
10   *  to you under the Apache License, Version 2.0 (the
11   *  "License"); you may not use this file except in compliance
12   *  with the License.  You may obtain a copy of the License at
13   *
14   *    http://www.apache.org/licenses/LICENSE-2.0
15   *
16   *  Unless required by applicable law or agreed to in writing,
17   *  software distributed under the License is distributed on an
18   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
19   *  KIND, either express or implied.  See the License for the
20   *  specific language governing permissions and limitations
21   *  under the License.
22   *
23   *  @author vlads
24   *  @version $Id: ConnectedInputStream.java 2599 2008-12-16 21:34:57Z skarzhevskyy $
25   */
26  package com.intel.bluetooth.emu;
27  
28  import java.io.IOException;
29  import java.io.InputStream;
30  import java.io.InterruptedIOException;
31  
32  /**
33   *
34   *
35   */
36  class ConnectedInputStream extends InputStream {
37  
38  	/**
39  	 * The circular buffer which receives data.
40  	 */
41  	private byte buffer[];
42  
43  	private boolean closed = false;
44  
45  	private boolean receiverClosed = false;
46  
47  	/**
48  	 * The index of the position in the circular buffer at which the byte of data will be stored.
49  	 */
50  	private int write = 0;
51  
52  	/**
53  	 * The index of the position in the circular buffer from which the next byte of data will be read.
54  	 */
55  	private int read = 0;
56  
57  	private int available = 0;
58  
59  	private final boolean senderFlushBlock;
60  
61  	public ConnectedInputStream(int size, boolean senderFlushBlock) {
62  		buffer = new byte[size];
63  		this.senderFlushBlock = senderFlushBlock;
64  	}
65  
66  	/*
67  	 * (non-Javadoc)
68  	 * 
69  	 * @see java.io.InputStream#read()
70  	 */
71  	@Override
72  	public synchronized int read() throws IOException {
73  		while (available == 0) {
74  			if (closed) {
75  				throw new IOException("Stream closed");
76  			}
77  			if (receiverClosed) {
78  				// EOF
79  				return -1;
80  			}
81  			// Let the receive run
82  			notifyAll();
83  			try {
84  				wait(1000);
85  			} catch (InterruptedException e) {
86  				throw new InterruptedIOException();
87  			}
88  		}
89  		int r = buffer[read++] & 0xFF;
90  		if (read >= buffer.length) {
91  			read = 0;
92  		}
93  		available--;
94  		notifyAll();
95  		return r;
96  	}
97  
98  	/**
99  	 * Reads up to <code>len</code> bytes of data from this input stream into an array of bytes. Less than
100 	 * <code>len</code> bytes will be read if the end of the data stream is reached. This method blocks until at least
101 	 * one byte of input is available.
102 	 */
103 	@Override
104 	public synchronized int read(byte b[], int off, int len) throws IOException {
105 		if (off < 0 || len < 0 || off + len > b.length) {
106 			throw new IndexOutOfBoundsException();
107 		}
108 		if (len == 0) {
109 			if ((closed) && (available == 0)) {
110 				throw new IOException("Stream closed");
111 			}
112 			return 0;
113 		}
114 		// wait only on first byte
115 		int c = read();
116 		if (c < 0) {
117 			return -1;
118 		}
119 		b[off] = (byte) (c & 0xFF);
120 		int rlen = 1;
121 		while ((available > 0) && (--len > 0)) {
122 			b[off + rlen] = buffer[read++];
123 			rlen++;
124 			if (read >= buffer.length) {
125 				read = 0;
126 			}
127 			available--;
128 			notifyAll();
129 		}
130 		return rlen;
131 	}
132 
133 	public synchronized int available() throws IOException {
134 		return available;
135 	}
136 
137 	/**
138 	 * Block sender till client reads all.
139 	 */
140 	void receiveFlush() throws IOException {
141 		if (this.senderFlushBlock) {
142 			receiveFlushBlock();
143 		}
144 	}
145 
146 	void receiveFlushBlock() throws IOException {
147 		while (available != 0) {
148 			if (closed) {
149 				throw new IOException("Stream closed");
150 			}
151 			if (receiverClosed) {
152 				throw new IOException("Connection closed");
153 			}
154 			synchronized (this) {
155 				try {
156 					wait(1000);
157 				} catch (InterruptedException e) {
158 					throw new InterruptedIOException();
159 				}
160 			}
161 		}
162 	}
163 
164 	synchronized void receive(int b) throws IOException {
165 		if (closed) {
166 			throw new IOException("Connection closed");
167 		}
168 		if (available == buffer.length) {
169 			waitFreeBuffer();
170 		}
171 		buffer[write++] = (byte) (b & 0xFF);
172 		if (write >= buffer.length) {
173 			write = 0;
174 		}
175 		available++;
176 		notifyAll();
177 	}
178 
179 	synchronized public void receive(byte b[], int off, int len) throws IOException {
180 		for (int i = 0; i < len; i++) {
181 			if (closed) {
182 				throw new IOException("Connection closed");
183 			}
184 			if (available == buffer.length) {
185 				waitFreeBuffer();
186 			}
187 			buffer[write++] = b[off + i];
188 			if (write >= buffer.length) {
189 				write = 0;
190 			}
191 			available++;
192 		}
193 		notifyAll();
194 	}
195 
196 	private void waitFreeBuffer() throws IOException {
197 		while (available == buffer.length) {
198 			if (receiverClosed || closed) {
199 				throw new IOException("Receiver closed");
200 			}
201 			// Let the read run
202 			notifyAll();
203 			try {
204 				wait(1000);
205 			} catch (InterruptedException e) {
206 				throw new InterruptedIOException();
207 			}
208 		}
209 	}
210 
211 	synchronized void receiverClose() throws IOException {
212 		receiverClosed = true;
213 		notifyAll();
214 	}
215 
216 	@Override
217 	public synchronized void close() throws IOException {
218 		closed = true;
219 		notifyAll();
220 	}
221 }