Unverified Commit 2c6d6463 authored by Xin.Zh's avatar Xin.Zh Committed by GitHub

Merge pull request #44 from aliiohs/feature/addTlsSupport

Feature/add tls support
parents 4965ba8b 169328c4
...@@ -125,7 +125,7 @@ func NewWSSClient(opts ...ClientOption) Client { ...@@ -125,7 +125,7 @@ func NewWSSClient(opts ...ClientOption) Client {
c := newClient(WSS_CLIENT, opts...) c := newClient(WSS_CLIENT, opts...)
if c.cert == "" { if c.cert == "" {
panic(fmt.Sprintf("@cert:%s", c.cert)) panic(fmt.Sprintf("@certs:%s", c.cert))
} }
if !strings.HasPrefix(c.addr, "wss://") { if !strings.HasPrefix(c.addr, "wss://") {
panic(fmt.Sprintf("the prefix @serverAddr:%s is not wss://", c.addr)) panic(fmt.Sprintf("the prefix @serverAddr:%s is not wss://", c.addr))
...@@ -152,7 +152,14 @@ func (c *client) dialTCP() Session { ...@@ -152,7 +152,14 @@ func (c *client) dialTCP() Session {
if c.IsClosed() { if c.IsClosed() {
return nil return nil
} }
if c.sslEnabled {
if sslConfig, err := c.tlsConfigBuilder.BuildTlsConfig(); err == nil && sslConfig != nil {
d := &net.Dialer{Timeout: connectTimeout}
conn, err = tls.DialWithDialer(d, "tcp", c.addr, sslConfig)
}
} else {
conn, err = net.DialTimeout("tcp", c.addr, connectTimeout) conn, err = net.DialTimeout("tcp", c.addr, connectTimeout)
}
if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) { if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close() conn.Close()
err = errSelfConnect err = errSelfConnect
...@@ -277,7 +284,7 @@ func (c *client) dialWSS() Session { ...@@ -277,7 +284,7 @@ func (c *client) dialWSS() Session {
if c.cert != "" { if c.cert != "" {
certPEMBlock, err := ioutil.ReadFile(c.cert) certPEMBlock, err := ioutil.ReadFile(c.cert)
if err != nil { if err != nil {
panic(fmt.Sprintf("ioutil.ReadFile(cert:%s) = error:%+v", c.cert, perrors.WithStack(err))) panic(fmt.Sprintf("ioutil.ReadFile(certs:%s) = error:%+v", c.cert, perrors.WithStack(err)))
} }
var cert tls.Certificate var cert tls.Certificate
...@@ -299,7 +306,7 @@ func (c *client) dialWSS() Session { ...@@ -299,7 +306,7 @@ func (c *client) dialWSS() Session {
for _, c := range config.Certificates { for _, c := range config.Certificates {
roots, err = x509.ParseCertificates(c.Certificate[len(c.Certificate)-1]) roots, err = x509.ParseCertificates(c.Certificate[len(c.Certificate)-1])
if err != nil { if err != nil {
panic(fmt.Sprintf("error parsing server's root cert: %+v\n", perrors.WithStack(err))) panic(fmt.Sprintf("error parsing server's root certs: %+v\n", perrors.WithStack(err)))
} }
for _, root = range roots { for _, root = range roots {
certPool.AddCert(root) certPool.AddCert(root)
......
...@@ -37,7 +37,7 @@ import ( ...@@ -37,7 +37,7 @@ import (
var ( var (
launchTime = time.Now() launchTime = time.Now()
// ErrInvalidConnection = perrors.New("connection has been closed.") // ErrInvalidConnection = perrors.New("connection has been closed.")
) )
///////////////////////////////////////// /////////////////////////////////////////
...@@ -322,8 +322,13 @@ func (t *gettyTCPConn) close(waitSec int) { ...@@ -322,8 +322,13 @@ func (t *gettyTCPConn) close(waitSec int) {
log.Errorf("snappy.Writer.Close() = error:%+v", err) log.Errorf("snappy.Writer.Close() = error:%+v", err)
} }
} }
t.conn.(*net.TCPConn).SetLinger(waitSec) if conn, ok := t.conn.(*net.TCPConn); ok {
t.conn.Close() _ = conn.SetLinger(waitSec)
_ = conn.Close()
} else {
_ = t.conn.(*tls.Conn).Close()
}
t.conn = nil t.conn = nil
} }
} }
......
-----BEGIN PRIVATE KEY-----
MIICdQIBADANBgkqhkiG9w0BAQEFAASCAl8wggJbAgEAAoGBAMBA3wVeTGHZR1Ry
e/i+J8a2cu5gXwFV6TnObzGM7bLFCO5i9v4mLo4iFzPsHmWDUxKS3Y8iXbu0eYBl
LoNY0lSvxDx33O+DuwMmVN+DzSD+Eod9zfvwOWHsazYCZT2PhNxnVWIuJXViY4JA
HUGodjx+QAi6yCAurUZGvYXGgZSBAgMBAAECgYAxRi8i9BlFlufGSBVoGmydbJOm
bwLKl9dP3o33ODSP9hok5y6A0w5plWk3AJSF1hPLleK9VcSKYGYnt0clmPVHF35g
bx2rVK8dOT0mn7rz9Zr70jcSz1ETA2QonHZ+Y+niLmcic9At6hRtWiewblUmyFQm
GwggIzi7LOyEUHrEcQJBAOXxyQvnLvtKzXiqcsW/K6rExqVJVk+KF0fzzVyMzTJx
HRBxUVgvGdEJT7j+7P2kcTyafve0BBzDSPIaDyiJ+Y0CQQDWCb7jASFSbu5M3Zcd
Gkr4ZKN1XO3VLQX10b22bQYdF45hrTN2tnzRvVUR4q86VVnXmiGiTqmLkXcA2WWf
pHfFAkAhv9olUBo6MeF0i3frBEMRfm41hk0PwZHnMqZ6pgPcGnQMnMU2rzsXzkkQ
OwJnvAIOxhJKovZTjmofdqmw5odlAkBYVUdRWjsNUTjJwj3GRf6gyq/nFMYWz3EB
RWFdM1ttkDYzu45ctO2IhfHg4sPceDMO1s6AtKQmNI9/azkUjITdAkApNa9yFRzc
TBaDNPd5KVd58LVIzoPQ6i7uMHteLXJUWqSroji6S3s4gKMFJ/dO+ZXIlgQgfJJJ
ZDL4cdrdkeoM
-----END PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
MIICSjCCAbOgAwIBAgIJAJHGGR4dGioHMA0GCSqGSIb3DQEBCwUAMFYxCzAJBgNV
BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQxDzANBgNVBAMTBnRlc3RjYTAeFw0xNDExMTEyMjMxMjla
Fw0yNDExMDgyMjMxMjlaMFYxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0
YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMT
BnRlc3RjYTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAwEDfBV5MYdlHVHJ7
+L4nxrZy7mBfAVXpOc5vMYztssUI7mL2/iYujiIXM+weZYNTEpLdjyJdu7R5gGUu
g1jSVK/EPHfc74O7AyZU34PNIP4Sh33N+/A5YexrNgJlPY+E3GdVYi4ldWJjgkAd
Qah2PH5ACLrIIC6tRka9hcaBlIECAwEAAaMgMB4wDAYDVR0TBAUwAwEB/zAOBgNV
HQ8BAf8EBAMCAgQwDQYJKoZIhvcNAQELBQADgYEAHzC7jdYlzAVmddi/gdAeKPau
sPBG/C2HCWqHzpCUHcKuvMzDVkY/MP2o6JIW2DBbY64bO/FceExhjcykgaYtCH/m
oIU63+CFOTtR7otyQAWHqXa7q4SbCDlG7DyRFxqG0txPtGvy12lgldA2+RgcigQG
Dfcog5wrJytaQ6UA0wE=
-----END CERTIFICATE-----
-----BEGIN PRIVATE KEY-----
MIICeQIBADANBgkqhkiG9w0BAQEFAASCAmMwggJfAgEAAoGBAOxUR9uhvhbeVUIM
s5WbH0px0mehl2+6sZpNjzvE2KimZpHzMJHukVH0Ffkvhs0b8+S5Ut9VNUAqd3IM
JCCAEGtRNoQhM1t9Yr2zAckSvbRacp+FL/Cj9eDmyo00KsVGaeefA4Dh4OW+ZhkT
NKcldXqkSuj1sEf244JZYuqZp6/tAgMBAAECgYEAi2NSVqpZMafE5YYUTcMGe6QS
k2jtpsqYgggI2RnLJ/2tNZwYI5pwP8QVSbnMaiF4gokD5hGdrNDfTnb2v+yIwYEH
0w8+oG7Z81KodsiZSIDJfTGsAZhVNwOz9y0VD8BBZZ1/274Zh52AUKLjZS/ZwIbS
W2ywya855dPnH/wj+0ECQQD9X8D920kByTNHhBG18biAEZ4pxs9f0OAG8333eVcI
w2lJDLsYDZrCB2ocgA3lUdozlzPC7YDYw8reg0tkiRY5AkEA7sdNzOeQsQRn7++5
0bP9DtT/iON1gbfxRzCfCfXdoOtfQWIzTePWtURt9X/5D9NofI0Rg5W2oGy/MLe5
/sXHVQJBAIup5XrJDkQywNZyAUU2ecn2bCWBFjwtqd+LBmuMciI9fOKsZtEKZrz/
U0lkeMRoSwvXE8wmGLjjrAbdfohrXFkCQQDZEx/LtIl6JINJQiswVe0tWr6k+ASP
1WXoTm+HYpoF/XUvv9LccNF1IazFj34hwRQwhx7w/V52Ieb+p0jUMYGxAkEAjDhd
9pBO1fKXWiXzi9ZKfoyTNcUq3eBSVKwPG2nItg5ycXengjT5sgcWDnciIzW7BIVI
JiqOszq9GWESErAatg==
-----END PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
MIIC6TCCAlKgAwIBAgIBCjANBgkqhkiG9w0BAQsFADBWMQswCQYDVQQGEwJBVTET
MBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50ZXJuZXQgV2lkZ2l0cyBQ
dHkgTHRkMQ8wDQYDVQQDEwZ0ZXN0Y2EwHhcNMTUxMTEwMDEwOTU4WhcNMjUxMTA3
MDEwOTU4WjBaMQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8G
A1UECgwYSW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMRMwEQYDVQQDDAp0ZXN0Y2xp
ZW50MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDsVEfbob4W3lVCDLOVmx9K
cdJnoZdvurGaTY87xNiopmaR8zCR7pFR9BX5L4bNG/PkuVLfVTVAKndyDCQggBBr
UTaEITNbfWK9swHJEr20WnKfhS/wo/Xg5sqNNCrFRmnnnwOA4eDlvmYZEzSnJXV6
pEro9bBH9uOCWWLqmaev7QIDAQABo4HCMIG/MAkGA1UdEwQCMAAwCwYDVR0PBAQD
AgXgMB0GA1UdDgQWBBQAdbW5Vml/CnYwqdP3mOHDARU+8zBwBgNVHSMEaTBnoVqk
WDBWMQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMY
SW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMQ8wDQYDVQQDEwZ0ZXN0Y2GCCQCRxhke
HRoqBzAJBgNVHREEAjAAMAkGA1UdEgQCMAAwDQYJKoZIhvcNAQELBQADgYEAf4MM
k+sdzd720DfrQ0PF2gDauR3M9uBubozDuMuF6ufAuQBJSKGQEGibXbUelrwHmnql
UjTyfolVcxEBVaF4VFHmn7u6vP7S1NexIDdNUHcULqxIb7Tzl8JYq8OOHD2rQy4H
s8BXaVIzw4YcaCGAMS0iDX052Sy7e2JhP8Noxvo=
-----END CERTIFICATE-----
-----BEGIN PRIVATE KEY-----
MIICdwIBADANBgkqhkiG9w0BAQEFAASCAmEwggJdAgEAAoGBANOmffupIGC8YDau
rOF4eKnHwPszgpkkhWzKsVxhNDBxCVYx4TEjG0XWIO0iyRXupZbUC+7N/8HnEVNa
8F1jYhng14Iiq99cNQbbnuHHhIztmpocrJTxmnhGzoAnRa1Tb+GnAuRoIHRA/V2c
VUE9tbikQugFx/SPgXAw6tfWB+YvAgMBAAECgYEAoEq9qzUBgoHoVEGiSPiWWe8g
5p6yUA1qx2QTQyWTAwT4z0DjjfVKmG99bFsl8+hTnJFnoCp/gnjflEOROwkjp5kG
m0drqOPx1jeipJjpXYTBu49h+WpZ1PF+KhVtxsIm3OOCvh67iWaKyyOVb5Og8aiR
jl6dn/TdG/dlGD8AfUECQQDuNMle6p0oU8amC6O9wIMBroxx2nFstzE6O35PLEzG
/tj0kxxn9Jp2TS9mGaLCzSuXmpjlF4+NOWiBPkrLC2TfAkEA43Xg7uEUkaJAz2/W
m1lIBTLt+4rIQY/2emh33bDcA+rv8rwwrMMIv17/xPx7bs49YqGG5xufD+Rwl6TL
qFXYsQJAPrOwagax1aKvwJeBw3oAQhoTKAkLIEXcdGqipe6QSzVcIIz0xjxxyEAr
AOIwoLxnBCISqwMXq2H4K0UdZPMb2wJAdhdYLY1L6YRMk6XjzImg25oidisKZweA
FvMv8DgHMj2CUAqmVrt3SivfLH1M9C09L3zfFhOAFHcsgX58gav4MQJBANSBnrHj
tIq4l8z79CPUIuu3QyeEh+XwY8s5qE5CNTck0U59lzp9NvENHbkx3KO896TTerko
+8bXHMLkJkHPXms=
-----END PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
MIIC8zCCAlygAwIBAgIBCzANBgkqhkiG9w0BAQsFADBWMQswCQYDVQQGEwJBVTET
MBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50ZXJuZXQgV2lkZ2l0cyBQ
dHkgTHRkMQ8wDQYDVQQDEwZ0ZXN0Y2EwHhcNMTUxMTEwMDExNDU1WhcNMjUxMTA3
MDExNDU1WjBkMQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8G
A1UECgwYSW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMR0wGwYDVQQDDBQqLnRlc3Qu
Z29vZ2xlLmNvbS5hdTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA06Z9+6kg
YLxgNq6s4Xh4qcfA+zOCmSSFbMqxXGE0MHEJVjHhMSMbRdYg7SLJFe6lltQL7s3/
wecRU1rwXWNiGeDXgiKr31w1Btue4ceEjO2amhyslPGaeEbOgCdFrVNv4acC5Ggg
dED9XZxVQT21uKRC6AXH9I+BcDDq19YH5i8CAwEAAaOBwjCBvzAJBgNVHRMEAjAA
MAsGA1UdDwQEAwIF4DAdBgNVHQ4EFgQUbyZIbUvqmePzv40xa0mMaDxLToYwcAYD
VR0jBGkwZ6FapFgwVjELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUtU3RhdGUx
ITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEPMA0GA1UEAxMGdGVz
dGNhggkAkcYZHh0aKgcwCQYDVR0RBAIwADAJBgNVHRIEAjAAMA0GCSqGSIb3DQEB
CwUAA4GBAJ21MwMf4WwAjafPKn+8Ng7ordtdp6tlkjt+Xub4l4zMr6FCp6dc/Ceh
6Hj43zYcKpAe5I6eaVcMc9qcYfUb9i4NVX82dMQpAwpNHgqTzqYt6GYEjF3YhKA7
uOFdA0OvOFJa14SNdNRk9E1Cd/tElXnLnSE4DOguMNvXz8mRKfnD
-----END CERTIFICATE-----
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"flag"
"path/filepath"
)
import (
"github.com/dubbogo/getty"
"github.com/dubbogo/gost/sync"
)
import (
"github.com/dubbogo/getty/demo/hello"
tls "github.com/dubbogo/getty/demo/hello/tls"
"github.com/dubbogo/getty/demo/util"
)
var (
ip = flag.String("ip", "127.0.0.1", "server IP")
connections = flag.Int("conn", 1, "number of tcp connections")
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolQueueLength = flag.Int("task_queue_length", 100, "task queue length")
taskPoolQueueNumber = flag.Int("task_queue_number", 4, "task queue number")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65431, "pprof http port")
)
var (
taskPool *gxsync.TaskPool
)
func main() {
flag.Parse()
util.SetLimit()
util.Profiling(*pprofPort)
if *taskPoolMode {
taskPool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength),
gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize),
)
}
keyPath, _ := filepath.Abs("./demo/hello/tls/certs/ca.key")
caPemPath, _ := filepath.Abs("./demo/hello/tls/certs/ca.pem")
config := &getty.ClientTlsConfigBuilder{
ClientTrustCertCollectionPath: caPemPath,
ClientPrivateKeyPath: keyPath,
}
client := getty.NewTCPClient(
getty.WithServerAddress(*ip+":8090"),
getty.WithClientSslEnabled(true),
getty.WithClientTlsConfigBuilder(config),
getty.WithConnectionNumber(*connections),
)
client.RunEventLoop(NewHelloClientSession)
go hello.ClientRequest()
util.WaitCloseSignals(client)
taskPool.Close()
}
// NewHelloClientSession use for init client session
func NewHelloClientSession(session getty.Session) (err error) {
tls.EventListener.SessionOnOpen = func(session getty.Session) {
hello.Sessions = append(hello.Sessions, session)
}
err = tls.InitialSession(session)
if err != nil {
return
}
return
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package tcp
import (
"crypto/tls"
"time"
)
import (
"github.com/dubbogo/getty"
)
import (
"github.com/dubbogo/getty/demo/hello"
)
var (
pkgHandler = &hello.PackageHandler{}
// EventListener register event callback
EventListener = &hello.MessageHandler{}
)
func InitialSession(session getty.Session) (err error) {
//session.SetCompressType(getty.CompressZip)
_, ok := session.Conn().(*tls.Conn)
if ok {
session.SetName("hello")
session.SetMaxMsgLen(128)
// session.SetRQLen(1024)
session.SetWQLen(512)
session.SetReadTimeout(time.Second)
session.SetWriteTimeout(5 * time.Second)
session.SetCronPeriod(int(hello.CronPeriod / 1e6))
session.SetWaitTime(time.Second)
session.SetPkgHandler(pkgHandler)
session.SetEventListener(EventListener)
}
return nil
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"flag"
tls "github.com/dubbogo/getty/demo/hello/tls"
"path/filepath"
)
import (
"github.com/dubbogo/getty"
gxsync "github.com/dubbogo/gost/sync"
)
import (
"github.com/dubbogo/getty/demo/util"
)
var (
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolQueueLength = flag.Int("task_queue_length", 100, "task queue length")
taskPoolQueueNumber = flag.Int("task_queue_number", 4, "task queue number")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65432, "pprof http port")
Sessions []getty.Session
)
var (
taskPool *gxsync.TaskPool
)
func main() {
flag.Parse()
util.SetLimit()
util.Profiling(*pprofPort)
serverPemPath, _ := filepath.Abs("./demo/hello/tls/certs/server0.pem")
serverKeyPath, _ := filepath.Abs("./demo/hello/tls/certs/server0.key")
caPemPath, _ := filepath.Abs("./demo/hello/tls/certs/ca.pem")
c := &getty.ServerTlsConfigBuilder{
ServerKeyCertChainPath: serverPemPath,
ServerPrivateKeyPath: serverKeyPath,
ServerTrustCertCollectionPath: caPemPath,
}
options := []getty.ServerOption{getty.WithLocalAddress(":8090"),
getty.WithServerSslEnabled(true),
getty.WithServerTlsConfigBuilder(c),
}
if *taskPoolMode {
taskPool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength),
gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize),
)
}
server := getty.NewTCPServer(options...)
go server.RunEventLoop(NewHelloServerSession)
util.WaitCloseSignals(server)
}
func NewHelloServerSession(session getty.Session) (err error) {
err = tls.InitialSession(session)
Sessions = append(Sessions, session)
if err != nil {
return
}
session.SetTaskPool(taskPool)
return
}
module github.com/dubbogo/getty module github.com/dubbogo/getty
go 1.14
require ( require (
github.com/dubbogo/gost v1.9.0 github.com/dubbogo/gost v1.9.0
github.com/golang/snappy v0.0.1 github.com/golang/snappy v0.0.1
...@@ -8,5 +10,3 @@ require ( ...@@ -8,5 +10,3 @@ require (
github.com/stretchr/testify v1.5.1 github.com/stretchr/testify v1.5.1
go.uber.org/zap v1.15.0 go.uber.org/zap v1.15.0
) )
go 1.13
...@@ -25,7 +25,9 @@ type ServerOption func(*ServerOptions) ...@@ -25,7 +25,9 @@ type ServerOption func(*ServerOptions)
type ServerOptions struct { type ServerOptions struct {
addr string addr string
//tls
sslEnabled bool
tlsConfigBuilder TlsConfigBuilder
// websocket // websocket
path string path string
cert string cert string
...@@ -47,7 +49,7 @@ func WithWebsocketServerPath(path string) ServerOption { ...@@ -47,7 +49,7 @@ func WithWebsocketServerPath(path string) ServerOption {
} }
} }
// @cert: server certificate file // @certs: server certificate file
func WithWebsocketServerCert(cert string) ServerOption { func WithWebsocketServerCert(cert string) ServerOption {
return func(o *ServerOptions) { return func(o *ServerOptions) {
o.cert = cert o.cert = cert
...@@ -61,13 +63,27 @@ func WithWebsocketServerPrivateKey(key string) ServerOption { ...@@ -61,13 +63,27 @@ func WithWebsocketServerPrivateKey(key string) ServerOption {
} }
} }
// @cert is the root certificate file to verify the legitimacy of server // @certs is the root certificate file to verify the legitimacy of server
func WithWebsocketServerRootCert(cert string) ServerOption { func WithWebsocketServerRootCert(cert string) ServerOption {
return func(o *ServerOptions) { return func(o *ServerOptions) {
o.caCert = cert o.caCert = cert
} }
} }
// @WithSslEnabled enable use tls
func WithServerSslEnabled(sslEnabled bool) ServerOption {
return func(o *ServerOptions) {
o.sslEnabled = sslEnabled
}
}
// @WithServerKeyCertChainPath sslConfig is tls config
func WithServerTlsConfigBuilder(tlsConfigBuilder TlsConfigBuilder) ServerOption {
return func(o *ServerOptions) {
o.tlsConfigBuilder = tlsConfigBuilder
}
}
///////////////////////////////////////// /////////////////////////////////////////
// Client Options // Client Options
///////////////////////////////////////// /////////////////////////////////////////
...@@ -79,7 +95,11 @@ type ClientOptions struct { ...@@ -79,7 +95,11 @@ type ClientOptions struct {
number int number int
reconnectInterval int // reConnect Interval reconnectInterval int // reConnect Interval
// the cert file of wss server which may contain server domain, server ip, the starting effective date, effective //tls
sslEnabled bool
tlsConfigBuilder TlsConfigBuilder
// the certs file of wss server which may contain server domain, server ip, the starting effective date, effective
// duration, the hash alg, the len of the private key. // duration, the hash alg, the len of the private key.
// wss client will use it. // wss client will use it.
cert string cert string
...@@ -110,9 +130,23 @@ func WithConnectionNumber(num int) ClientOption { ...@@ -110,9 +130,23 @@ func WithConnectionNumber(num int) ClientOption {
} }
} }
// @cert is client certificate file. it can be empty. // @certs is client certificate file. it can be empty.
func WithRootCertificateFile(cert string) ClientOption { func WithRootCertificateFile(cert string) ClientOption {
return func(o *ClientOptions) { return func(o *ClientOptions) {
o.cert = cert o.cert = cert
} }
} }
// @WithSslEnabled enable use tls
func WithClientSslEnabled(sslEnabled bool) ClientOption {
return func(o *ClientOptions) {
o.sslEnabled = sslEnabled
}
}
// @WithClientKeyCertChainPath sslConfig is tls config
func WithClientTlsConfigBuilder(tlsConfigBuilder TlsConfigBuilder) ClientOption {
return func(o *ClientOptions) {
o.tlsConfigBuilder = tlsConfigBuilder
}
}
...@@ -56,7 +56,6 @@ type server struct { ...@@ -56,7 +56,6 @@ type server struct {
lock sync.Mutex // for server lock sync.Mutex // for server
endPointType EndPointType endPointType EndPointType
server *http.Server // for ws or wss server server *http.Server // for ws or wss server
sync.Once sync.Once
done chan struct{} done chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
...@@ -80,7 +79,7 @@ func newServer(t EndPointType, opts ...ServerOption) *server { ...@@ -80,7 +79,7 @@ func newServer(t EndPointType, opts ...ServerOption) *server {
return s return s
} }
// NewTCServer builds a tcp server. // NewTCPServer builds a tcp server.
func NewTCPServer(opts ...ServerOption) Server { func NewTCPServer(opts ...ServerOption) Server {
return newServer(TCP_SERVER, opts...) return newServer(TCP_SERVER, opts...)
} }
...@@ -100,7 +99,7 @@ func NewWSSServer(opts ...ServerOption) Server { ...@@ -100,7 +99,7 @@ func NewWSSServer(opts ...ServerOption) Server {
s := newServer(WSS_SERVER, opts...) s := newServer(WSS_SERVER, opts...)
if s.addr == "" || s.cert == "" || s.privateKey == "" { if s.addr == "" || s.cert == "" || s.privateKey == "" {
panic(fmt.Sprintf("@addr:%s, @cert:%s, @privateKey:%s, @caCert:%s", panic(fmt.Sprintf("@addr:%s, @certs:%s, @privateKey:%s, @caCert:%s",
s.addr, s.cert, s.privateKey, s.caCert)) s.addr, s.cert, s.privateKey, s.caCert))
} }
...@@ -175,7 +174,13 @@ func (s *server) listenTCP() error { ...@@ -175,7 +174,13 @@ func (s *server) listenTCP() error {
return perrors.Wrapf(err, "gxnet.ListenOnTCPRandomPort(addr:%s)", s.addr) return perrors.Wrapf(err, "gxnet.ListenOnTCPRandomPort(addr:%s)", s.addr)
} }
} else { } else {
if s.sslEnabled {
if sslConfig, err := s.tlsConfigBuilder.BuildTlsConfig(); err == nil && sslConfig != nil {
streamListener, err = tls.Listen("tcp", s.addr, sslConfig)
}
} else {
streamListener, err = net.Listen("tcp", s.addr) streamListener, err = net.Listen("tcp", s.addr)
}
if err != nil { if err != nil {
return perrors.Wrapf(err, "net.Listen(tcp, addr:%s)", s.addr) return perrors.Wrapf(err, "net.Listen(tcp, addr:%s)", s.addr)
} }
...@@ -409,12 +414,12 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) { ...@@ -409,12 +414,12 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
defer s.wg.Done() defer s.wg.Done()
if certificate, err = tls.LoadX509KeyPair(s.cert, s.privateKey); err != nil { if certificate, err = tls.LoadX509KeyPair(s.cert, s.privateKey); err != nil {
panic(fmt.Sprintf("tls.LoadX509KeyPair(cert{%s}, privateKey{%s}) = err:%+v", panic(fmt.Sprintf("tls.LoadX509KeyPair(certs{%s}, privateKey{%s}) = err:%+v",
s.cert, s.privateKey, perrors.WithStack(err))) s.cert, s.privateKey, perrors.WithStack(err)))
return return
} }
config = &tls.Config{ config = &tls.Config{
InsecureSkipVerify: true, // do not verify peer cert InsecureSkipVerify: true, // do not verify peer certs
ClientAuth: tls.NoClientCert, ClientAuth: tls.NoClientCert,
NextProtos: []string{"http/1.1"}, NextProtos: []string{"http/1.1"},
Certificates: []tls.Certificate{certificate}, Certificates: []tls.Certificate{certificate},
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package getty package getty
import ( import (
"path/filepath"
"testing" "testing"
"time" "time"
) )
...@@ -75,6 +76,75 @@ func testTCPServer(t *testing.T, address string) { ...@@ -75,6 +76,75 @@ func testTCPServer(t *testing.T, address string) {
server.Close() server.Close()
assert.True(t, server.IsClosed()) assert.True(t, server.IsClosed())
} }
func testTCPTlsServer(t *testing.T, address string) {
var (
server *server
serverMsgHandler MessageHandler
)
serverPemPath, _ := filepath.Abs("./demo/hello/tls/certs/server0.pem")
serverKeyPath, _ := filepath.Abs("./demo/hello/tls/certs/server0.key")
caPemPath, _ := filepath.Abs("./demo/hello/tls/certs/ca.pem")
configBuilder := &ServerTlsConfigBuilder{
ServerKeyCertChainPath: serverPemPath,
ServerPrivateKeyPath: serverKeyPath,
ServerTrustCertCollectionPath: caPemPath,
}
func() {
server = newServer(
TCP_SERVER,
WithLocalAddress(address),
WithServerSslEnabled(true),
WithServerTlsConfigBuilder(configBuilder),
)
newServerSession := func(session Session) error {
return newSessionCallback(session, &serverMsgHandler)
}
server.RunEventLoop(newServerSession)
assert.True(t, server.ID() > 0)
assert.True(t, server.EndPointType() == TCP_SERVER)
assert.NotNil(t, server.streamListener)
}()
time.Sleep(500e6)
addr := server.streamListener.Addr().String()
t.Logf("@address:%s, tcp server addr: %v", address, addr)
keyPath, _ := filepath.Abs("./demo/hello/tls/certs/ca.key")
clientCaPemPath, _ := filepath.Abs("./demo/hello/tls/certs/ca.pem")
clientConfig := &ClientTlsConfigBuilder{
ClientTrustCertCollectionPath: clientCaPemPath,
ClientPrivateKeyPath: keyPath,
}
clt := newClient(TCP_CLIENT,
WithServerAddress(addr),
WithReconnectInterval(5e8),
WithConnectionNumber(1),
WithClientTlsConfigBuilder(clientConfig),
)
assert.NotNil(t, clt)
assert.True(t, clt.ID() > 0)
assert.Equal(t, clt.endPointType, TCP_CLIENT)
var (
msgHandler MessageHandler
)
cb := func(session Session) error {
return newSessionCallback(session, &msgHandler)
}
clt.RunEventLoop(cb)
time.Sleep(1e9)
assert.Equal(t, 1, msgHandler.SessionNumber())
clt.Close()
assert.True(t, clt.IsClosed())
server.Close()
assert.True(t, server.IsClosed())
}
func testUDPServer(t *testing.T, address string) { func testUDPServer(t *testing.T, address string) {
var ( var (
...@@ -113,4 +183,6 @@ func TestServer(t *testing.T) { ...@@ -113,4 +183,6 @@ func TestServer(t *testing.T) {
addr = "127.0.0.1" addr = "127.0.0.1"
testTCPServer(t, addr) testTCPServer(t, addr)
testUDPServer(t, addr) testUDPServer(t, addr)
addr = "127.0.0.9999"
testTCPTlsServer(t, addr)
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package getty
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
)
import (
perrors "github.com/pkg/errors"
)
type TlsConfigBuilder interface {
BuildTlsConfig() (*tls.Config, error)
}
type ServerTlsConfigBuilder struct {
ServerKeyCertChainPath string
ServerPrivateKeyPath string
ServerKeyPassword string
ServerTrustCertCollectionPath string
}
func (s *ServerTlsConfigBuilder) BuildTlsConfig() (*tls.Config, error) {
var (
err error
certPem []byte
certificate tls.Certificate
certPool *x509.CertPool
config *tls.Config
)
if certificate, err = tls.LoadX509KeyPair(s.ServerKeyCertChainPath, s.ServerPrivateKeyPath); err != nil {
log.Error(fmt.Sprintf("tls.LoadX509KeyPair(certs{%s}, privateKey{%s}) = err:%+v",
s.ServerKeyCertChainPath, s.ServerPrivateKeyPath, perrors.WithStack(err)))
return nil, err
}
config = &tls.Config{
InsecureSkipVerify: true, // do not verify peer certs
ClientAuth: tls.RequireAnyClientCert,
Certificates: []tls.Certificate{certificate},
}
if s.ServerTrustCertCollectionPath != "" {
certPem, err = ioutil.ReadFile(s.ServerTrustCertCollectionPath)
if err != nil {
log.Error(fmt.Errorf("ioutil.ReadFile(certFile{%s}) = err:%+v", s.ServerTrustCertCollectionPath, perrors.WithStack(err)))
return nil, err
}
certPool = x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(certPem); !ok {
log.Error("failed to parse root certificate file")
return nil, err
}
config.ClientCAs = certPool
config.ClientAuth = tls.RequireAnyClientCert
config.InsecureSkipVerify = false
}
return config, nil
}
type ClientTlsConfigBuilder struct {
ClientKeyCertChainPath string
ClientPrivateKeyPath string
ClientKeyPassword string
ClientTrustCertCollectionPath string
}
func (c *ClientTlsConfigBuilder) BuildTlsConfig() (*tls.Config, error) {
cert, err := tls.LoadX509KeyPair(c.ClientTrustCertCollectionPath, c.ClientPrivateKeyPath)
if err != nil {
log.Error(fmt.Sprintf("Unable to load X509 Key Pair %v", err))
return nil, err
}
certBytes, err := ioutil.ReadFile(c.ClientTrustCertCollectionPath)
if err != nil {
log.Error(fmt.Sprintf("Unable to read pem file: %s", c.ClientTrustCertCollectionPath))
return nil, err
}
clientCertPool := x509.NewCertPool()
ok := clientCertPool.AppendCertsFromPEM(certBytes)
if !ok {
log.Error("failed to parse root certificate")
return nil, err
}
return &tls.Config{
RootCAs: clientCertPool,
Certificates: []tls.Certificate{cert},
InsecureSkipVerify: true,
}, nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment