SunVoteInputStream.java
4.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package com.sunvote.udptransfer.stream;
import com.sunvote.udptransfer.UDPModule;
import com.sunvote.util.LogUtil;
import java.io.IOException;
import java.io.InputStream;
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by Elvis on 2017/8/8.
* Email:Eluis@psunsky.com
* Description: 实现对应用层调用的输入输出流。
* 对接应用层进行的输入输出流
*/
public class SunVoteInputStream extends InputStream {
public static final String TAG = SunVoteInputStream.class.getSimpleName();
private LinkedList<Byte> linkedList = new LinkedList<Byte>();
public static long MAX_CACHE_SIZE = 4 * 1024;
private Lock lock = new ReentrantLock();
private Condition empty = lock.newCondition();
private boolean close = false;
public SunVoteInputStream() {
}
public OnBytesReceiver onBytesReceiver;
public void setOnBytesReceiver(OnBytesReceiver onBytesReceiver) {
this.onBytesReceiver = onBytesReceiver;
}
public interface OnBytesReceiver{
void onBytesReceiver(byte[] datas,int length);
}
public void pushDatas(byte[] datas, int length) {
if(onBytesReceiver != null){
onBytesReceiver.onBytesReceiver(datas,length);
}else {
try {
/*try {
lock.lock();
if (linkedList.size() < MAX_CACHE_SIZE) {
for (int i = length - 1; i >= 0; i--) {
linkedList.push(datas[i]);
}
} else {
LogUtil.v(UDPModule.TAG, "SDK还没有接收完数据,缓冲区的数据大小:" + linkedList.size() + ",丢失数据:", datas, length);
}
} finally {
empty.signal();
lock.unlock();
}*/
} catch (Exception e) {
LogUtil.e(TAG, "UDP receiver message", e);
close = true;
}
}
}
@Override
public int read() throws IOException {
if (close) {
throw new IOException("the stream has closed");
}
try {
lock.lock();
while (linkedList.isEmpty()) {
try {
empty.await();
} catch (InterruptedException e) {
LogUtil.e(TAG,e);
}
}
return linkedList.pop();
} finally {
lock.unlock();
}
}
public boolean isClose() {
return close;
}
@Override
public void close() throws IOException {
close = true;
super.close();
}
@Override
public int read(byte[] buffer) throws IOException {
try {
lock.lock();
if (buffer == null) {
throw new IOException("buffer is empty");
}
if (close) {
throw new IOException("the stream has closed");
}
while (linkedList.isEmpty()) {
try {
empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int index = 0;
while (index < buffer.length && !linkedList.isEmpty()) {
buffer[index++] = linkedList.pop();
}
// LogUtil.v(UDPModule.TAG, "SDK接收数据,接收的数据大小:" + index + ",数据:", buffer,index);
return index;
} finally {
lock.unlock();
}
}
@Override
public int read(byte[] buffer, int byteOffset, int byteCount) throws IOException {
try {
lock.lock();
if (buffer == null) {
throw new IOException("buffer is empty");
}
if (close) {
throw new IOException("the stream has closed");
}
while (linkedList.isEmpty()) {
try {
empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int index = 0;
while (index < byteCount && !linkedList.isEmpty()) {
index++;
buffer[byteOffset++] = linkedList.pop();
}
return index;
} finally {
lock.unlock();
}
}
}