mirror of https://github.com/bjdgyc/anylink.git
145 lines
2.8 KiB
Go
145 lines
2.8 KiB
Go
// Package dpipe provides the pipe works like datagram protocol on memory.
|
|
package dpipe
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"net"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pion/transport/deadline"
|
|
)
|
|
|
|
// Pipe creates pair of non-stream conn on memory.
|
|
// Close of the one end doesn't make effect to the other end.
|
|
func Pipe() (net.Conn, net.Conn) {
|
|
ch0 := make(chan []byte, 1000)
|
|
ch1 := make(chan []byte, 1000)
|
|
return &conn{
|
|
rCh: ch0,
|
|
wCh: ch1,
|
|
closed: make(chan struct{}),
|
|
closing: make(chan struct{}),
|
|
readDeadline: deadline.New(),
|
|
writeDeadline: deadline.New(),
|
|
}, &conn{
|
|
rCh: ch1,
|
|
wCh: ch0,
|
|
closed: make(chan struct{}),
|
|
closing: make(chan struct{}),
|
|
readDeadline: deadline.New(),
|
|
writeDeadline: deadline.New(),
|
|
}
|
|
}
|
|
|
|
type pipeAddr struct{}
|
|
|
|
func (pipeAddr) Network() string { return "pipe" }
|
|
func (pipeAddr) String() string { return ":1" }
|
|
|
|
type conn struct {
|
|
rCh chan []byte
|
|
wCh chan []byte
|
|
closed chan struct{}
|
|
closing chan struct{}
|
|
closeOnce sync.Once
|
|
|
|
readDeadline *deadline.Deadline
|
|
writeDeadline *deadline.Deadline
|
|
}
|
|
|
|
func (*conn) LocalAddr() net.Addr { return pipeAddr{} }
|
|
func (*conn) RemoteAddr() net.Addr { return pipeAddr{} }
|
|
|
|
func (c *conn) SetDeadline(t time.Time) error {
|
|
c.readDeadline.Set(t)
|
|
c.writeDeadline.Set(t)
|
|
return nil
|
|
}
|
|
|
|
func (c *conn) SetReadDeadline(t time.Time) error {
|
|
c.readDeadline.Set(t)
|
|
return nil
|
|
}
|
|
|
|
func (c *conn) SetWriteDeadline(t time.Time) error {
|
|
c.writeDeadline.Set(t)
|
|
return nil
|
|
}
|
|
|
|
func (c *conn) Read(data []byte) (n int, err error) {
|
|
select {
|
|
case <-c.closed:
|
|
return 0, io.EOF
|
|
case <-c.closing:
|
|
if len(c.rCh) == 0 {
|
|
return 0, io.EOF
|
|
}
|
|
case <-c.readDeadline.Done():
|
|
return 0, context.DeadlineExceeded
|
|
default:
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case d := <-c.rCh:
|
|
if len(d) <= len(data) {
|
|
copy(data, d)
|
|
return len(d), nil
|
|
}
|
|
copy(data, d[:len(data)])
|
|
return len(data), nil
|
|
case <-c.closed:
|
|
return 0, io.EOF
|
|
case <-c.closing:
|
|
if len(c.rCh) == 0 {
|
|
return 0, io.EOF
|
|
}
|
|
case <-c.readDeadline.Done():
|
|
return 0, context.DeadlineExceeded
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *conn) cleanWriteBuffer() {
|
|
for {
|
|
select {
|
|
case <-c.wCh:
|
|
default:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *conn) Write(data []byte) (n int, err error) {
|
|
select {
|
|
case <-c.closed:
|
|
return 0, io.ErrClosedPipe
|
|
case <-c.writeDeadline.Done():
|
|
c.cleanWriteBuffer()
|
|
return 0, context.DeadlineExceeded
|
|
default:
|
|
}
|
|
|
|
cData := make([]byte, len(data))
|
|
copy(cData, data)
|
|
|
|
select {
|
|
case <-c.closed:
|
|
return 0, io.ErrClosedPipe
|
|
case <-c.writeDeadline.Done():
|
|
c.cleanWriteBuffer()
|
|
return 0, context.DeadlineExceeded
|
|
case c.wCh <- cData:
|
|
return len(cData), nil
|
|
}
|
|
}
|
|
|
|
func (c *conn) Close() error {
|
|
c.closeOnce.Do(func() {
|
|
close(c.closed)
|
|
})
|
|
return nil
|
|
}
|