package com.sunvote.udptransfer.stream; import com.sunvote.udptransfer.UDPModule; import com.sunvote.util.LogUtil; import java.io.IOException; import java.io.InputStream; import java.util.LinkedList; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * Created by Elvis on 2017/8/8. * Email:Eluis@psunsky.com * Description: 实现对应用层调用的输入输出流。 * 对接应用层进行的输入输出流 */ public class SunVoteInputStream extends InputStream { public static final String TAG = SunVoteInputStream.class.getSimpleName(); private LinkedList linkedList = new LinkedList(); public static long MAX_CACHE_SIZE = 4 * 1024; private Lock lock = new ReentrantLock(); private Condition empty = lock.newCondition(); private boolean close = false; public SunVoteInputStream() { } public OnBytesReceiver onBytesReceiver; public void setOnBytesReceiver(OnBytesReceiver onBytesReceiver) { this.onBytesReceiver = onBytesReceiver; } public interface OnBytesReceiver{ void onBytesReceiver(byte[] datas,int length); } public void pushDatas(byte[] datas, int length) { if(onBytesReceiver != null){ onBytesReceiver.onBytesReceiver(datas,length); }else { try { /*try { lock.lock(); if (linkedList.size() < MAX_CACHE_SIZE) { for (int i = length - 1; i >= 0; i--) { linkedList.push(datas[i]); } } else { LogUtil.v(UDPModule.TAG, "SDK还没有接收完数据,缓冲区的数据大小:" + linkedList.size() + ",丢失数据:", datas, length); } } finally { empty.signal(); lock.unlock(); }*/ } catch (Exception e) { LogUtil.e(TAG, "UDP receiver message", e); close = true; } } } @Override public int read() throws IOException { if (close) { throw new IOException("the stream has closed"); } try { lock.lock(); while (linkedList.isEmpty()) { try { empty.await(); } catch (InterruptedException e) { LogUtil.e(TAG,e); } } return linkedList.pop(); } finally { lock.unlock(); } } public boolean isClose() { return close; } @Override public void close() throws IOException { close = true; super.close(); } @Override public int read(byte[] buffer) throws IOException { try { lock.lock(); if (buffer == null) { throw new IOException("buffer is empty"); } if (close) { throw new IOException("the stream has closed"); } while (linkedList.isEmpty()) { try { empty.await(); } catch (InterruptedException e) { e.printStackTrace(); } } int index = 0; while (index < buffer.length && !linkedList.isEmpty()) { buffer[index++] = linkedList.pop(); } // LogUtil.v(UDPModule.TAG, "SDK接收数据,接收的数据大小:" + index + ",数据:", buffer,index); return index; } finally { lock.unlock(); } } @Override public int read(byte[] buffer, int byteOffset, int byteCount) throws IOException { try { lock.lock(); if (buffer == null) { throw new IOException("buffer is empty"); } if (close) { throw new IOException("the stream has closed"); } while (linkedList.isEmpty()) { try { empty.await(); } catch (InterruptedException e) { e.printStackTrace(); } } int index = 0; while (index < byteCount && !linkedList.isEmpty()) { index++; buffer[byteOffset++] = linkedList.pop(); } return index; } finally { lock.unlock(); } } }