Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support client stop #24

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 44 additions & 36 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type ClientEndpoint struct {
ServerEndpointSocket string
TokenSource token.TokenSourcePlugin
TlsConfig *tls.Config
DoneCh chan struct{}
}

func (c *ClientEndpoint) Start() {
Expand All @@ -37,44 +38,51 @@ func (c *ClientEndpoint) Start() {
}
defer listener.Close()
log.Infow("Client endpoint start up successful", "listen address", listener.Addr())
for {
// Accept client application connectin request
conn, err := listener.Accept()
if err != nil {
log.Errorw("Client app connect failed", "error", err.Error())
} else {
logger := log.WithValues(constants.ClientAppAddr, conn.RemoteAddr().String())
logger.Info("Client connection accepted, prepare to entablish tunnel with server endpint for this connection.")
go func() {
defer func() {
conn.Close()
logger.Info("Tunnel closed")
}()
// Open a quic stream for each client application connection.
stream, err := session.OpenStreamSync(context.Background())
if err != nil {
logger.Errorw("Failed to open stream to server endpoint.", "error", err.Error())
return
}
defer stream.Close()
logger = logger.WithValues(constants.StreamID, stream.StreamID())
// Create a context argument for each new tunnel
ctx := context.WithValue(
logger.WithContext(parent_ctx),
constants.CtxClientAppAddr, conn.RemoteAddr().String())
hsh := tunnel.NewHandshakeHelper(constants.TokenLength, handshake)
hsh.TokenSource = &c.TokenSource
// Create a new tunnel for the new client application connection.
tun := tunnel.NewTunnel(&stream, constants.ClientEndpoint)
tun.Conn = &conn
tun.Hsh = &hsh
if !tun.HandShake(ctx) {
return
go func() {
for {
// Accept client application connectin request
conn, err := listener.Accept()
if err != nil {
log.Errorw("Client app connect failed", "error", err.Error())
if oe, ok := err.(*net.OpError); ok && oe.Op == "accept" {
break
}
tun.Establish(ctx)
}()
} else {
logger := log.WithValues(constants.ClientAppAddr, conn.RemoteAddr().String())
logger.Info("Client connection accepted, prepare to entablish tunnel with server endpint for this connection.")
go func() {
defer func() {
conn.Close()
logger.Info("Tunnel closed")
}()
// Open a quic stream for each client application connection.
stream, err := session.OpenStreamSync(context.Background())
if err != nil {
logger.Errorw("Failed to open stream to server endpoint.", "error", err.Error())
return
}
defer stream.Close()
logger = logger.WithValues(constants.StreamID, stream.StreamID())
// Create a context argument for each new tunnel
ctx := context.WithValue(
logger.WithContext(parent_ctx),
constants.CtxClientAppAddr, conn.RemoteAddr().String())
hsh := tunnel.NewHandshakeHelper(constants.TokenLength, handshake)
hsh.TokenSource = &c.TokenSource
// Create a new tunnel for the new client application connection.
tun := tunnel.NewTunnel(&stream, constants.ClientEndpoint)
tun.Conn = &conn
tun.Hsh = &hsh
if !tun.HandShake(ctx) {
return
}
tun.Establish(ctx)
}()
}
}
}
}()
<-c.DoneCh
log.Info("The client is going to close")
}

func handshake(ctx context.Context, stream *quic.Stream, hsh *tunnel.HandshakeHelper) (bool, *net.Conn) {
Expand Down
8 changes: 8 additions & 0 deletions client/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
apiOptions *options.RestfulAPIOptions
secOptions *options.SecureOptions
logOptions *log.Options
doneCh chan struct{}
)

func buildCommand(basename string) *cobra.Command {
Expand Down Expand Up @@ -93,6 +94,7 @@ func runFunc(co *options.ClientOptions, ao *options.RestfulAPIOptions, seco *opt
caFile := seco.CaFile
verifyServer := seco.VerifyRemoteEndpoint
apiListenOn := ao.HttpdListenOn
doneCh = make(chan struct{})

tlsConfig := &tls.Config{
InsecureSkipVerify: !verifyServer,
Expand Down Expand Up @@ -133,6 +135,7 @@ func runFunc(co *options.ClientOptions, ao *options.RestfulAPIOptions, seco *opt
ServerEndpointSocket: serverEndpointSocket,
TokenSource: loadTokenSourcePlugin(tokenPlugin, tokenSource),
TlsConfig: tlsConfig,
DoneCh: doneCh,
}
c.Start()
}
Expand Down Expand Up @@ -162,3 +165,8 @@ func main() {
os.Exit(1)
}
}

//export Stop
func Stop() {
close(doneCh)
}