forked from xaxaxa/workspace
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbuffer.C
More file actions
86 lines (83 loc) · 1.62 KB
/
buffer.C
File metadata and controls
86 lines (83 loc) · 1.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#include <iostream>
#include <cpoll/cpoll.H>
#include <signal.h>
#include <list>
using namespace CP;
using namespace std;
using namespace RGC;
struct bufferManager
{
Buffer curBuf;
int curIndex=0;
Buffer beginGet() {
if(curBuf==nullptr) {
int bs=1024*16;
return curBuf=Buffer(bs);
}
else return curBuf;
}
void endGet(int usedLen) {
curBuf.clip(usedLen);
if(curBuf.length()<=0) curBuf.release();
}
};
int main() {
StandardStream ss;
struct handler {
Ref<Stream> s1,s2;
list<Buffer> queue;
list<Buffer>::iterator last_written;
bufferManager bm;
Buffer rbuf;
vector<iovec> iovecs;
bool writing=false;
bool shouldExit=false;
void startR1() {
rbuf=bm.beginGet();
s1->read(rbuf,{&handler::cbR1,this});
}
void cbR1(int r) {
if(r<=0) {
if(writing) shouldExit=true;
else exit(0);
}
bm.endGet(r);
rbuf.clip(0,r);
queue.push_back(rbuf);
rbuf.release();
startW2();
startR1();
}
void startW2() {
if(writing) return;
if(queue.size()<=0) {
if(shouldExit)exit(0);
return;
}
writing=true;
iovecs.resize(queue.size());
auto it=queue.begin();
for(int i=0;i<iovecs.size();i++) {
iovecs[i].iov_base=(*it).data();
iovecs[i].iov_len=(*it).length();
it++;
}
last_written=it;
s2->writevAll(&iovecs[0],iovecs.size(),{&handler::cbW2,this});
}
void cbW2(int r) {
//printf("cbW2: %i\n",r);
if(r<=0) exit(0);
queue.erase(queue.begin(),last_written);
writing=false;
startW2();
}
handler(Stream& s1, Stream& s2):s1(&s1),s2(&s2) {
startR1();
}
} h(ss,ss);
Poll p;
ss.addToPoll(p);
ss.setBlocking(false);
p.loop();
}