diff --git a/go.mod b/go.mod index d0db24b..465df18 100644 --- a/go.mod +++ b/go.mod @@ -8,19 +8,22 @@ require ( github.com/mailru/mapstructure v0.0.0-20230117153631-a4140f9ccc45 github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.8.4 + github.com/tarantool/go-tarantool v1.6.0 golang.org/x/mod v0.7.0 golang.org/x/net v0.7.0 - golang.org/x/sync v0.1.0 golang.org/x/sys v0.5.0 golang.org/x/text v0.7.0 golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 golang.org/x/tools v0.5.0 - gopkg.in/yaml.v3 v3.0.1 gotest.tools v2.2.0+incompatible ) require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/golang/protobuf v1.3.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/objx v0.5.0 // indirect + google.golang.org/appengine v1.6.7 // indirect + gopkg.in/vmihailenco/msgpack.v2 v2.9.2 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 4d072af..8eb6c71 100644 --- a/go.sum +++ b/go.sum @@ -3,10 +3,18 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og= github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= +github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/mailru/mapstructure v0.0.0-20230117153631-a4140f9ccc45 h1:x3Zw96Gt6HbEPUWsTbQYj/nfaNv5lWHy6CeEkl8gwqw= github.com/mailru/mapstructure v0.0.0-20230117153631-a4140f9ccc45/go.mod h1:guLmlFj8yjd0hoz+QWxRU4Gn+VOb2nOQZ4EqRmMHarw= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -19,22 +27,33 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/tarantool/go-tarantool v1.6.0 h1:D/GW7hw9r8MbvSfcHqr6tT7brO7nqwhtWKJMj6OtArw= +github.com/tarantool/go-tarantool v1.6.0/go.mod h1:SFamRDArn3Be+qwzMzdzQqGz9/D9mgSb3xCpKxbqMjQ= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/mod v0.7.0 h1:LapD9S96VoQRhi/GrNTqeBJFrUjs5UHCAtTlgwA5oZA= golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 h1:Hir2P/De0WpUhtrKGGjvSb2YxUgyZ7EFOSLIcSSpiwE= golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.5.0 h1:+bSpV5HIeWkuvgaMfI3UmKRThoTA5ODJTUd8T17NO+4= golang.org/x/tools v0.5.0/go.mod h1:N+Kgy78s5I24c24dU8OfWNEotWjutIs8SnJvn5IDq+k= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= +google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/vmihailenco/msgpack.v2 v2.9.2 h1:gjPqo9orRVlSAH/065qw3MsFCDpH7fa1KpiizXyllY4= +gopkg.in/vmihailenco/msgpack.v2 v2.9.2/go.mod h1:/3Dn1Npt9+MYyLpYYXjInO/5jvMLamn+AEGwNEOatn8= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/activerecord/hash.go b/pkg/activerecord/hash.go new file mode 100644 index 0000000..649119e --- /dev/null +++ b/pkg/activerecord/hash.go @@ -0,0 +1,49 @@ +package activerecord + +import ( + "encoding/binary" + "encoding/hex" + "fmt" + "hash" +) + +type GroupHash struct { + hash hash.Hash32 + calculated bool +} + +func NewGroupHash(hash hash.Hash32) *GroupHash { + return &GroupHash{hash: hash} +} + +func (o *GroupHash) UpdateHash(data ...interface{}) error { + if o.calculated { + return fmt.Errorf("can't update hash after calculate") + } + + for _, v := range data { + var err error + + switch v := v.(type) { + case string: + err = binary.Write(o.hash, binary.LittleEndian, []byte(v)) + case int: + err = binary.Write(o.hash, binary.LittleEndian, int64(v)) + default: + err = binary.Write(o.hash, binary.LittleEndian, v) + } + + if err != nil { + return fmt.Errorf("can't calculate connectionID: %w", err) + } + } + + return nil +} + +func (o *GroupHash) GetHash() string { + o.calculated = true + hashInBytes := o.hash.Sum(nil)[:] + + return hex.EncodeToString(hashInBytes) +} diff --git a/pkg/octopus/options.go b/pkg/octopus/options.go index 8580795..5e75c1c 100644 --- a/pkg/octopus/options.go +++ b/pkg/octopus/options.go @@ -1,10 +1,7 @@ package octopus import ( - "encoding/binary" - "encoding/hex" "fmt" - "hash" "hash/crc32" "time" @@ -36,11 +33,10 @@ const ( // ConnectionOptions - опции используемые для подключения type ConnectionOptions struct { - server string - Mode ServerModeType - poolCfg *iproto.PoolConfig - connectionHash hash.Hash32 - calculated bool + *activerecord.GroupHash + server string + Mode ServerModeType + poolCfg *iproto.PoolConfig } // NewOptions - cоздание структуры с опциями и дефолтными значениями. Для мидификации значений по умолчанию, @@ -65,9 +61,10 @@ func NewOptions(server string, mode ServerModeType, opts ...ConnectionOption) (* PingInterval: DefaultPingInterval, }, }, - connectionHash: crc32.New(crc32table), } + octopusOpts.GroupHash = activerecord.NewGroupHash(crc32.New(crc32table)) + for _, opt := range opts { if err := opt.apply(octopusOpts); err != nil { return nil, fmt.Errorf("error apply options: %w", err) @@ -84,25 +81,8 @@ func NewOptions(server string, mode ServerModeType, opts ...ConnectionOption) (* // UpdateHash - функция расчета ConnectionID, необходима для шаринга конектов между моделями. func (o *ConnectionOptions) UpdateHash(data ...interface{}) error { - if o.calculated { - return fmt.Errorf("can't update hash after calculate") - } - - for _, data := range data { - var err error - - switch v := data.(type) { - case string: - err = binary.Write(o.connectionHash, binary.LittleEndian, []byte(v)) - case int: - err = binary.Write(o.connectionHash, binary.LittleEndian, int64(v)) - default: - err = binary.Write(o.connectionHash, binary.LittleEndian, v) - } - - if err != nil { - return fmt.Errorf("can't calculate connectionID: %w", err) - } + if err := o.GroupHash.UpdateHash(data...); err != nil { + return fmt.Errorf("can't calculate group hash: %w", err) } return nil @@ -110,10 +90,7 @@ func (o *ConnectionOptions) UpdateHash(data ...interface{}) error { // GetConnectionID - получение ConnecitionID. После первого получения, больше нельзя его модифицировать. Можно только новый Options создать func (o *ConnectionOptions) GetConnectionID() string { - o.calculated = true - hashInBytes := o.connectionHash.Sum(nil)[:] - - return hex.EncodeToString(hashInBytes) + return o.GroupHash.GetHash() } // InstanceMode - метод для получения режима аботы инстанса RO или RW diff --git a/pkg/tarantool/box.go b/pkg/tarantool/box.go new file mode 100644 index 0000000..6f025ef --- /dev/null +++ b/pkg/tarantool/box.go @@ -0,0 +1,112 @@ +package tarantool + +import ( + "context" + "fmt" + + "github.com/mailru/activerecord/pkg/activerecord" +) + +var DefaultOptionCreator = func(sic activerecord.ShardInstanceConfig) (activerecord.OptionInterface, error) { + return NewOptions(sic.Addr, sic.Mode, WithTimeout(sic.Timeout)) +} + +func Box(ctx context.Context, shard int, instType activerecord.ShardInstanceType, configPath string, optionCreator func(activerecord.ShardInstanceConfig) (activerecord.OptionInterface, error)) (*Connection, error) { + if optionCreator == nil { + optionCreator = DefaultOptionCreator + } + + clusterInfo, err := activerecord.ConfigCacher().Get( + ctx, + configPath, + DefaultConnectionParams, + optionCreator, + ) + if err != nil { + return nil, fmt.Errorf("can't get cluster %s info: %w", configPath, err) + } + + if clusterInfo.Shards() < shard { + return nil, fmt.Errorf("invalid shard num %d, max = %d", shard, clusterInfo.Shards()) + } + + var ( + configBox activerecord.ShardInstance + ok bool + ) + + switch instType { + case activerecord.ReplicaInstanceType: + configBox, ok = clusterInfo.NextReplica(shard) + if !ok { + return nil, fmt.Errorf("replicas not set") + } + case activerecord.ReplicaOrMasterInstanceType: + configBox, ok = clusterInfo.NextReplica(shard) + if ok { + break + } + + fallthrough + case activerecord.MasterInstanceType: + configBox = clusterInfo.NextMaster(shard) + } + + conn, err := activerecord.ConnectionCacher().GetOrAdd(configBox, func(options interface{}) (activerecord.ConnectionInterface, error) { + octopusOpt, ok := options.(*ConnectionOptions) + if !ok { + return nil, fmt.Errorf("invalit type of options %T, want Options", options) + } + + return GetConnection(ctx, octopusOpt) + }) + if err != nil { + return nil, fmt.Errorf("error from connectionCacher: %w", err) + } + + box, ex := conn.(*Connection) + if !ex { + return nil, fmt.Errorf("invalid connection type %T, want *tarantool.Connection", conn) + } + + return box, nil +} + +func CheckShardInstance(ctx context.Context, instance activerecord.ShardInstance) (activerecord.OptionInterface, error) { + opts, ok := instance.Options.(*ConnectionOptions) + if !ok { + return nil, fmt.Errorf("invalit type of options %T, want Options", instance.Options) + } + + var err error + c := activerecord.ConnectionCacher().Get(instance) + if c == nil { + c, err = GetConnection(ctx, opts) + if err != nil { + return nil, fmt.Errorf("error from connectionCacher: %w", err) + } + } + + conn, ok := c.(*Connection) + if !ok { + return nil, fmt.Errorf("invalid connection type %T, want *tarantool.Connection", conn) + } + + var res []bool + + if err = conn.Call17Typed("dostring", []interface{}{"return box.info.ro"}, &res); err != nil { + return nil, fmt.Errorf("can't get instance status: %w", err) + } + + if len(res) == 1 { + ret := res[0] + switch ret { + case false: + return NewOptions(opts.server, activerecord.ModeMaster) + default: + return NewOptions(opts.server, activerecord.ModeReplica) + } + } + + return nil, fmt.Errorf("can't parse instance status: %w", err) +} diff --git a/pkg/tarantool/connection.go b/pkg/tarantool/connection.go new file mode 100644 index 0000000..0956164 --- /dev/null +++ b/pkg/tarantool/connection.go @@ -0,0 +1,49 @@ +package tarantool + +import ( + "context" + "fmt" + + "github.com/mailru/activerecord/pkg/activerecord" + "github.com/tarantool/go-tarantool" +) + +var DefaultConnectionParams = activerecord.MapGlobParam{ + Timeout: DefaultConnectionTimeout, +} + +type Connection struct { + *tarantool.Connection + opts *ConnectionOptions +} + +func GetConnection(_ context.Context, opts *ConnectionOptions) (*Connection, error) { + conn, err := tarantool.Connect(opts.server, opts.cfg) + if err != nil { + return nil, fmt.Errorf("error connect to tarantool %s with connect timeout '%d': %s", opts.server, opts.cfg.Timeout, err) + } + + return &Connection{ + Connection: conn, + opts: opts, + }, nil +} + +func (c *Connection) InstanceMode() any { + return c.opts.InstanceMode() +} + +func (c *Connection) Close() { + if err := c.Connection.Close(); err != nil { + panic(err) + } + +} + +func (c *Connection) Done() <-chan struct{} { + return nil +} + +func (c *Connection) Info() string { + return fmt.Sprintf("Server: %s, timeout; %d, user: %s", c.opts.server, c.opts.cfg.Timeout, c.opts.cfg.User) +} diff --git a/pkg/tarantool/options.go b/pkg/tarantool/options.go new file mode 100644 index 0000000..f8eefef --- /dev/null +++ b/pkg/tarantool/options.go @@ -0,0 +1,85 @@ +package tarantool + +import ( + "fmt" + "hash/crc32" + "time" + + "github.com/mailru/activerecord/pkg/activerecord" + "github.com/tarantool/go-tarantool" +) + +const DefaultConnectionTimeout = 20 * time.Millisecond + +type ConnectionOptions struct { + *activerecord.GroupHash + cfg tarantool.Opts + server string + Mode activerecord.ServerModeType +} + +type ConnectionOption interface { + apply(*ConnectionOptions) error +} + +type optionConnectionFunc func(*ConnectionOptions) error + +func (o optionConnectionFunc) apply(c *ConnectionOptions) error { + return o(c) +} + +// WithTimeout - опция для изменений таймаутов +func WithTimeout(request time.Duration) ConnectionOption { + return optionConnectionFunc(func(opts *ConnectionOptions) error { + opts.cfg.Timeout = request + + return opts.UpdateHash("T", request) + }) +} + +// WithCredential - опция авторизации +func WithCredential(user, pass string) ConnectionOption { + return optionConnectionFunc(func(opts *ConnectionOptions) error { + opts.cfg.User = user + opts.cfg.Pass = pass + + return opts.UpdateHash("L", user, pass) + }) +} + +func NewOptions(server string, mode activerecord.ServerModeType, opts ...ConnectionOption) (*ConnectionOptions, error) { + if server == "" { + return nil, fmt.Errorf("invalid param: server is empty") + } + + connectionOpts := &ConnectionOptions{ + cfg: tarantool.Opts{ + Timeout: DefaultConnectionTimeout, + }, + server: server, + Mode: mode, + } + + connectionOpts.GroupHash = activerecord.NewGroupHash(crc32.NewIEEE()) + + for _, opt := range opts { + if err := opt.apply(connectionOpts); err != nil { + return nil, fmt.Errorf("error apply options: %w", err) + } + } + + err := connectionOpts.UpdateHash("S", server) + if err != nil { + return nil, fmt.Errorf("can't get pool: %w", err) + } + + return connectionOpts, nil +} + +func (c *ConnectionOptions) GetConnectionID() string { + return c.GetHash() +} + +func (c *ConnectionOptions) InstanceMode() activerecord.ServerModeType { + return c.Mode +}