Skip to content

fix: fix codis ut bug && add to pipeline #3073

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: 3.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/codis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,27 @@ jobs:

- name: Test
run: |
cd codis && make -j
cd codis/pkg && go test -v ./...

build_codis_image:
name: Build Codis Docker image
runs-on: ubuntu-latest
steps:
- name: Check out the repo
uses: actions/checkout@v4

- name: Set up QEMU
uses: docker/setup-qemu-action@v2

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2

- name: Extract metadata (tags, labels) for Docker
id: meta
uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7
with:
images: pikadb/codis

- name: Build Docker image
uses: docker/build-push-action@3b5e8027fcad23fda98b2e3ac259d8d67585f671
with:
Expand Down
53 changes: 37 additions & 16 deletions codis/pkg/proxy/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,27 +282,48 @@ func (bc *BackendConn) loopReader(tasks <-chan *Request, c *redis.Conn, round in
log.WarnErrorf(err, "backend conn [%p] to %s, db-%d reader-[%d] exit",
bc, bc.addr, bc.database, round)
}()

var timeout_resp_cnt int
for r := range tasks {
resp, err := c.Decode()
r.ReceiveFromServerTime = time.Now().UnixNano()
if err != nil {
return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))
}
if resp != nil && resp.IsError() {
switch {
case bytes.HasPrefix(resp.Value, errRespMasterDown):
if bc.state.CompareAndSwap(stateConnected, stateDataStale) {
log.Warnf("backend conn [%p] to %s, db-%d state = DataStale, caused by 'MASTERDOWN'",
bc, bc.addr, bc.database)
for {
resp, err := c.Decode()
r.ReceiveFromServerTime = time.Now().UnixNano()
if err != nil {
if ne, ok := errors.Cause(err).(net.Error); ok && ne.Timeout() {
timeout_resp_cnt++
if timeout_resp_cnt%10 == 0 {
log.Warnf(`backend conn [%p] to %s, db-%d, reader-[%d]
accumulated timeout request num: %d`,
bc, bc.addr, bc.database, round, timeout_resp_cnt)
}
bc.setResponse(r, nil, fmt.Errorf("backend request timout, %s", err))
break
}
case bytes.HasPrefix(resp.Value, errRespLoading):
if bc.state.CompareAndSwap(stateConnected, stateDataStale) {
log.Warnf("backend conn [%p] to %s, db-%d state = DataStale, caused by 'LOADING'",
bc, bc.addr, bc.database)
return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))
}

if timeout_resp_cnt != 0 {
timeout_resp_cnt--
continue
}

if resp != nil && resp.IsError() {
switch {
case bytes.HasPrefix(resp.Value, errRespMasterDown):
if bc.state.CompareAndSwap(stateConnected, stateDataStale) {
log.Warnf("backend conn [%p] to %s, db-%d state = DataStale, caused by 'MASTERDOWN'",
bc, bc.addr, bc.database)
}
case bytes.HasPrefix(resp.Value, errRespLoading):
if bc.state.CompareAndSwap(stateConnected, stateDataStale) {
log.Warnf("backend conn [%p] to %s, db-%d state = DataStale, caused by 'LOADING'",
bc, bc.addr, bc.database)
}
}
}
bc.setResponse(r, resp, nil)
break
}
bc.setResponse(r, resp, nil)
}
return nil
}
Expand Down
58 changes: 58 additions & 0 deletions codis/pkg/proxy/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,61 @@ func TestBackend(t *testing.T) {
assert.Must(string(r.Resp.Value) == strconv.Itoa(i))
}
}

func TestBackendTimeOut(t *testing.T) {
config := NewDefaultConfig()
config.BackendMaxPipeline = 3
config.BackendSendTimeout.Set(10 * time.Second)
config.BackendRecvTimeout.Set(1 * time.Second)

conn, bc := newConnPair(config)
defer bc.Close()

var array = make([]*Request, 5)
for i := range array {
array[i] = &Request{Batch: &sync.WaitGroup{}}
}

// mock backend server, sleep 1.1s for 50% requests
// to simulate request timeout
go func() {
defer conn.Close()
time.Sleep(time.Millisecond * 20)
for i := range array {
_, err := conn.Decode()
if i%2 == 0 {
time.Sleep(time.Millisecond * 1100)
}
assert.MustNoError(err)
resp := redis.NewString([]byte(strconv.Itoa(i)))
assert.MustNoError(conn.Encode(resp, true))
}
}()

ticker := time.NewTicker(time.Second)
defer ticker.Stop()
go func() {
for i := 0; i < 10; i++ {
<-ticker.C
}
log.Panicf("timeout")
}()

for _, r := range array {
bc.PushBack(r)
}

for i, r := range array {
r.Batch.Wait()
if i%2 == 0 {
// request timeout
assert.Must(r.Err != nil)
assert.Must(r.Resp == nil)
} else {
// request succeed and response value matches
assert.MustNoError(r.Err)
assert.Must(r.Resp != nil)
assert.Must(string(r.Resp.Value) == strconv.Itoa(i))
}
}
}
27 changes: 20 additions & 7 deletions codis/pkg/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
package proxy

import (
"strconv"
"testing"
"time"

"pika/codis/v2/pkg/models"
"pika/codis/v2/pkg/utils/assert"
Expand All @@ -19,21 +21,32 @@ func init() {

func newProxyConfig() *Config {
config := NewDefaultConfig()
config.ProxyAddr = "0.0.0.0:0"
config.AdminAddr = "0.0.0.0:0"
config.ProxyHeapPlaceholder = 0
config.ProxyMaxOffheapBytes = 0
return config
}

func openProxy() (*Proxy, string) {
func openProxy(proxy_port, admin_port int) (*Proxy, string) {
config.ProxyAddr = "0.0.0.0:" + strconv.Itoa(1024+proxy_port)
config.AdminAddr = "0.0.0.0:" + strconv.Itoa(1024+admin_port)

models.SetMaxSlotNum(config.MaxSlotNum)
s, err := New(config)
assert.MustNoError(err)

var c = NewApiClient(s.Model().AdminAddr)
for retry := 0; retry < 10; retry++ {
_, err = c.Model()
if err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
return s, s.Model().AdminAddr
}

func TestModel(x *testing.T) {
s, addr := openProxy()
s, addr := openProxy(0, 1)
defer s.Close()

var c = NewApiClient(addr)
Expand All @@ -45,7 +58,7 @@ func TestModel(x *testing.T) {
}

func TestStats(x *testing.T) {
s, addr := openProxy()
s, addr := openProxy(2, 3)
defer s.Close()

var c = NewApiClient(addr)
Expand Down Expand Up @@ -76,7 +89,7 @@ func verifySlots(c *ApiClient, expect map[int]*models.Slot) {
}

func TestFillSlot(x *testing.T) {
s, addr := openProxy()
s, addr := openProxy(3, 4)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的端口和上面一个测试用例的有冲突

defer s.Close()

var c = NewApiClient(addr)
Expand Down Expand Up @@ -111,7 +124,7 @@ func TestFillSlot(x *testing.T) {
}

func TestStartAndShutdown(x *testing.T) {
s, addr := openProxy()
s, addr := openProxy(5, 6)
defer s.Close()

var c = NewApiClient(addr)
Expand Down
10 changes: 8 additions & 2 deletions codis/pkg/proxy/redis/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package redis
import (
"bytes"
"io"
"net"
"strconv"

"pika/codis/v2/pkg/utils/bufio2"
Expand Down Expand Up @@ -87,9 +88,14 @@ func (d *Decoder) Decode() (*Resp, error) {
}
r, err := d.decodeResp()
if err != nil {
d.Err = err
// if err is timeout, we reuse this conn, so don't set d.Err
if ne, ok := errors.Cause(err).(net.Error); ok && ne.Timeout() {
d.Err = nil
} else {
d.Err = err
}
}
return r, d.Err
return r, err
}

func (d *Decoder) DecodeMultiBulk() ([]*Resp, error) {
Expand Down
12 changes: 6 additions & 6 deletions codis/pkg/proxy/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (
func TestRequestChan1(t *testing.T) {
var ch = NewRequestChanBuffer(0)
for i := 0; i < 8192; i++ {
n := ch.PushBack(&Request{UnixNano: int64(i)})
n := ch.PushBack(&Request{ReceiveTime: int64(i)})
assert.Must(n == i+1)
}
for i := 0; i < 8192; i++ {
r, ok := ch.PopFront()
assert.Must(ok && r.UnixNano == int64(i))
assert.Must(ok && r.ReceiveTime == int64(i))
}
assert.Must(ch.Buffered() == 0)

Expand All @@ -34,7 +34,7 @@ func TestRequestChan1(t *testing.T) {
func TestRequestChan2(t *testing.T) {
var ch = NewRequestChanBuffer(512)
for i := 0; i < 8192; i++ {
n := ch.PushBack(&Request{UnixNano: int64(i)})
n := ch.PushBack(&Request{ReceiveTime: int64(i)})
assert.Must(n == i+1)
}
ch.Close()
Expand All @@ -43,7 +43,7 @@ func TestRequestChan2(t *testing.T) {

for i := 0; i < 8192; i++ {
r, ok := ch.PopFront()
assert.Must(ok && r.UnixNano == int64(i))
assert.Must(ok && r.ReceiveTime == int64(i))
}
assert.Must(ch.Buffered() == 0)

Expand All @@ -61,7 +61,7 @@ func TestRequestChan3(t *testing.T) {
go func() {
defer wg.Done()
for i := 0; i < n; i++ {
ch.PushBack(&Request{UnixNano: int64(i)})
ch.PushBack(&Request{ReceiveTime: int64(i)})
if i%1024 == 0 {
runtime.Gosched()
}
Expand All @@ -73,7 +73,7 @@ func TestRequestChan3(t *testing.T) {
defer wg.Done()
for i := 0; i < n; i++ {
r, ok := ch.PopFront()
assert.Must(ok && r.UnixNano == int64(i))
assert.Must(ok && r.ReceiveTime == int64(i))
if i%4096 == 0 {
runtime.Gosched()
}
Expand Down
4 changes: 2 additions & 2 deletions codis/pkg/topom/topom_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestApiSlots(x *testing.T) {
assert.MustNoError(c.GroupAddServer(gid, "", s.Addr))
assert.MustNoError(c.SlotCreateAction(sid, gid))
assert.MustNoError(c.SlotRemoveAction(sid))
assert.MustNoError(c.SlotCreateActionRange(0, MaxSlotNum-1, gid))
assert.MustNoError(c.SlotCreateActionRange(0, t.config.MaxSlotNum-1, gid))
assert.MustNoError(c.SetSlotActionInterval(2000))
assert.MustNoError(c.SetSlotActionDisabled(true))

Expand All @@ -64,7 +64,7 @@ func TestApiSlots(x *testing.T) {

slots, err := c.Slots()
assert.MustNoError(err)
assert.Must(len(slots) == MaxSlotNum)
assert.Must(len(slots) == t.config.MaxSlotNum)
assert.Must(slots[sid].BackendAddr == s.Addr)
}

Expand Down
Loading
Loading