Skip to content

Commit

Permalink
init file
Browse files Browse the repository at this point in the history
  • Loading branch information
dotwoo committed May 19, 2017
1 parent 9db6e6f commit aa54ee4
Show file tree
Hide file tree
Showing 4 changed files with 683 additions and 0 deletions.
131 changes: 131 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package transfer

import (
"context"
"fmt"
"net"
"net/rpc"
"net/rpc/jsonrpc"
"sync"
"time"
)

var (
// LatencyInit the latency init is 3 seconds
LatencyInit = time.Second * 3
)

// client the rpc client
// +gen * slice:"MinBy,SortBy,DistinctBy,Aggregate[int],Shuffle"
type client struct {
client *rpc.Client
connTime time.Time
latency time.Duration
lock sync.RWMutex

connTimeout time.Duration
callTimeout time.Duration
addr string
}

// get get the rpc net conn
func (cn *client) get() (*rpc.Client, error) {
cn.lock.Lock()
defer cn.lock.Unlock()

var err error
//将对应latency 设置为最大避免,同时调用
cn.latency = time.Minute * 3
if cn.client == nil {
if cn.client, err = cn.newConn(); err != nil {
return nil, err
}
return cn.client, nil
}
if time.Since(cn.connTime).Hours() >= 2 {
cn.client.Close()
if cn.client, err = cn.newConn(); err != nil {
return nil, err
}
return cn.client, nil
}
return cn.client, err
}

func (cn *client) newConn() (*rpc.Client, error) {
conn, err := net.DialTimeout("tcp", cn.addr, cn.connTimeout)
if err != nil {
return nil, err
}
cn.connTime = time.Now()
return jsonrpc.NewClient(conn), nil
}

// close close the conn
func (cn *client) close() {
if cn.client != nil {
cn.client.Close()
cn.client = nil
}
}

// call rpc method
func (cn *client) call(method string, args interface{}, resp interface{}) error {
cl, err := cn.get()
if err != nil {
return err
}

ctx, cancelFn := context.WithTimeout(context.Background(), cn.callTimeout)
defer cancelFn()

done := make(chan error, 2)
start := time.Now()
go func() {
done <- cl.Call(method, args, resp)
}()

select {
case err := <-done:
if err != nil {
cn.close()
cn.latency = time.Minute
return fmt.Errorf("rpc call %s error:%s", cn.addr, err.Error())
}

case <-ctx.Done():
cn.close()
cn.latency = time.Minute * 2
return fmt.Errorf("rpc call %s timeout", cn.addr)
}
cn.latency = time.Now().Sub(start)
return nil
}

// NewClient create the rpc client
func NewClient(address string, connTW, callTW time.Duration) *client {
c := &client{
addr: address,
connTimeout: connTW,
callTimeout: callTW,
latency: 3 * time.Second,
}
return c
}

func quick(a, b *client) bool {
return a.latency < b.latency
}

func aggregate(i int, c *client) int {
c.latency = LatencyInit
return i + 1
}

func addrLess(a, b *client) bool {
return a.addr < b.addr
}

func distinctBy(a, b *client) bool {
return a.addr == b.addr
}
257 changes: 257 additions & 0 deletions client_slice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
// Generated by: gen
// TypeWriter: slice
// Directive: +gen on *Client

package transfer

import (
"errors"
"math/rand"
)

// Sort implementation is a modification of http://golang.org/pkg/sort/#Sort
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found at http://golang.org/LICENSE.

// clientSlice is a slice of type *Client. Use it where you would use []*Client.
type clientSlice []*client

// minBy returns an element of ClientSlice containing the minimum value, when compared to other elements using a passed func defining ‘less’. In the case of multiple items being equally minimal, the first such element is returned. Returns error if no elements. See: http://clipperhouse.github.io/gen/#minBy
func (rcv clientSlice) minBy(less func(*client, *client) bool) (result *client, err error) {
l := len(rcv)
if l == 0 {
err = errors.New("cannot determine the Min of an empty slice")
return
}
m := 0
for i := 1; i < l; i++ {
if less(rcv[i], rcv[m]) {
m = i
}
}
result = rcv[m]
return
}

// sortBy returns a new ordered ClientSlice, determined by a func defining ‘less’. See: http://clipperhouse.github.io/gen/#sortBy
func (rcv clientSlice) sortBy(less func(*client, *client) bool) clientSlice {
result := make(clientSlice, len(rcv))
copy(result, rcv)
// Switch to heapsort if depth of 2*ceil(lg(n+1)) is reached.
n := len(result)
maxDepth := 0
for i := n; i > 0; i >>= 1 {
maxDepth++
}
maxDepth *= 2
quickSortClientSlice(result, less, 0, n, maxDepth)
return result
}

// distinctBy returns a new ClientSlice whose elements are unique, where equality is defined by a passed func. See: http://clipperhouse.github.io/gen/#distinctBy
func (rcv clientSlice) distinctBy(equal func(*client, *client) bool) (result clientSlice) {
Outer:
for _, v := range rcv {
for _, r := range result {
if equal(v, r) {
continue Outer
}
}
result = append(result, v)
}
return result
}

// aggregateInt iterates over ClientSlice, operating on each element while maintaining ‘state’. See: http://clipperhouse.github.io/gen/#Aggregate
func (rcv clientSlice) aggregateInt(fn func(int, *client) int) (result int) {
for _, v := range rcv {
result = fn(result, v)
}
return
}

// shuffle returns a shuffled copy of ClientSlice, using a version of the Fisher-Yates shuffle. See: http://clipperhouse.github.io/gen/#shuffle
func (rcv clientSlice) shuffle() clientSlice {
numItems := len(rcv)
result := make(clientSlice, numItems)
copy(result, rcv)
for i := 0; i < numItems; i++ {
r := i + rand.Intn(numItems-i)
result[r], result[i] = result[i], result[r]
}
return result
}

// Sort implementation based on http://golang.org/pkg/sort/#Sort, see top of this file

func swapClientSlice(rcv clientSlice, a, b int) {
rcv[a], rcv[b] = rcv[b], rcv[a]
}

// Insertion sort
func insertionSortClientSlice(rcv clientSlice, less func(*client, *client) bool, a, b int) {
for i := a + 1; i < b; i++ {
for j := i; j > a && less(rcv[j], rcv[j-1]); j-- {
swapClientSlice(rcv, j, j-1)
}
}
}

// siftDown implements the heap property on rcv[lo, hi).
// first is an offset into the array where the root of the heap lies.
func siftDownClientSlice(rcv clientSlice, less func(*client, *client) bool, lo, hi, first int) {
root := lo
for {
child := 2*root + 1
if child >= hi {
break
}
if child+1 < hi && less(rcv[first+child], rcv[first+child+1]) {
child++
}
if !less(rcv[first+root], rcv[first+child]) {
return
}
swapClientSlice(rcv, first+root, first+child)
root = child
}
}

func heapSortClientSlice(rcv clientSlice, less func(*client, *client) bool, a, b int) {
first := a
lo := 0
hi := b - a

// Build heap with greatest element at top.
for i := (hi - 1) / 2; i >= 0; i-- {
siftDownClientSlice(rcv, less, i, hi, first)
}

// Pop elements, largest first, into end of rcv.
for i := hi - 1; i >= 0; i-- {
swapClientSlice(rcv, first, first+i)
siftDownClientSlice(rcv, less, lo, i, first)
}
}

// Quicksort, following Bentley and McIlroy,
// Engineering a Sort Function, SP&E November 1993.

// medianOfThree moves the median of the three values rcv[a], rcv[b], rcv[c] into rcv[a].
func medianOfThreeClientSlice(rcv clientSlice, less func(*client, *client) bool, a, b, c int) {
m0 := b
m1 := a
m2 := c
// bubble sort on 3 elements
if less(rcv[m1], rcv[m0]) {
swapClientSlice(rcv, m1, m0)
}
if less(rcv[m2], rcv[m1]) {
swapClientSlice(rcv, m2, m1)
}
if less(rcv[m1], rcv[m0]) {
swapClientSlice(rcv, m1, m0)
}
// now rcv[m0] <= rcv[m1] <= rcv[m2]
}

func swapRangeClientSlice(rcv clientSlice, a, b, n int) {
for i := 0; i < n; i++ {
swapClientSlice(rcv, a+i, b+i)
}
}

func doPivotClientSlice(rcv clientSlice, less func(*client, *client) bool, lo, hi int) (midlo, midhi int) {
m := lo + (hi-lo)/2 // Written like this to avoid integer overflow.
if hi-lo > 40 {
// Tukey's Ninther, median of three medians of three.
s := (hi - lo) / 8
medianOfThreeClientSlice(rcv, less, lo, lo+s, lo+2*s)
medianOfThreeClientSlice(rcv, less, m, m-s, m+s)
medianOfThreeClientSlice(rcv, less, hi-1, hi-1-s, hi-1-2*s)
}
medianOfThreeClientSlice(rcv, less, lo, m, hi-1)

// Invariants are:
// rcv[lo] = pivot (set up by ChoosePivot)
// rcv[lo <= i < a] = pivot
// rcv[a <= i < b] < pivot
// rcv[b <= i < c] is unexamined
// rcv[c <= i < d] > pivot
// rcv[d <= i < hi] = pivot
//
// Once b meets c, can swap the "= pivot" sections
// into the middle of the slice.
pivot := lo
a, b, c, d := lo+1, lo+1, hi, hi
for {
for b < c {
if less(rcv[b], rcv[pivot]) { // rcv[b] < pivot
b++
} else if !less(rcv[pivot], rcv[b]) { // rcv[b] = pivot
swapClientSlice(rcv, a, b)
a++
b++
} else {
break
}
}
for b < c {
if less(rcv[pivot], rcv[c-1]) { // rcv[c-1] > pivot
c--
} else if !less(rcv[c-1], rcv[pivot]) { // rcv[c-1] = pivot
swapClientSlice(rcv, c-1, d-1)
c--
d--
} else {
break
}
}
if b >= c {
break
}
// rcv[b] > pivot; rcv[c-1] < pivot
swapClientSlice(rcv, b, c-1)
b++
c--
}

min := func(a, b int) int {
if a < b {
return a
}
return b
}

n := min(b-a, a-lo)
swapRangeClientSlice(rcv, lo, b-n, n)

n = min(hi-d, d-c)
swapRangeClientSlice(rcv, c, hi-n, n)

return lo + b - a, hi - (d - c)
}

func quickSortClientSlice(rcv clientSlice, less func(*client, *client) bool, a, b, maxDepth int) {
for b-a > 7 {
if maxDepth == 0 {
heapSortClientSlice(rcv, less, a, b)
return
}
maxDepth--
mlo, mhi := doPivotClientSlice(rcv, less, a, b)
// Avoiding recursion on the larger subproblem guarantees
// a stack depth of at most lg(b-a).
if mlo-a < b-mhi {
quickSortClientSlice(rcv, less, a, mlo, maxDepth)
a = mhi // i.e., quickSortClientSlice(rcv, mhi, b)
} else {
quickSortClientSlice(rcv, less, mhi, b, maxDepth)
b = mlo // i.e., quickSortClientSlice(rcv, a, mlo)
}
}
if b-a > 1 {
insertionSortClientSlice(rcv, less, a, b)
}
}
Loading

0 comments on commit aa54ee4

Please sign in to comment.