Skip to content

Commit

Permalink
shared-cache implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
yukinying committed Oct 9, 2015
1 parent 86b4066 commit aee6cf1
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 26 deletions.
48 changes: 46 additions & 2 deletions cmd/gryffin-distributed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"flag"
"fmt"
"io"
"math/rand"
"net"
"net/http"
"os"
Expand All @@ -33,6 +34,7 @@ var wq chan bool
var t *gryffin.Scan

var logWriter io.Writer
var store *gryffin.GryffinStore

// var method = flag.String("method", "GET", "the HTTP method for the request.")
// var url string
Expand Down Expand Up @@ -72,9 +74,15 @@ func newProducer() *nsq.Producer {
}

func newConsumer(topic, channel string, handler nsq.HandlerFunc) *nsq.Consumer {
consumer, _ := nsq.NewConsumer(topic, channel, nsq.NewConfig())
var err error
consumer, err := nsq.NewConsumer(topic, channel, nsq.NewConfig())
if err != nil {
fmt.Println("Cannot create consumer", err)
return nil
}

consumer.AddHandler(handler)
err := consumer.ConnectToNSQLookupd("127.0.0.1:4161")
err = consumer.ConnectToNSQLookupd("127.0.0.1:4161")
if err != nil {
fmt.Println("Cannot connect to NSQ for consuming message", err)
return nil
Expand All @@ -100,6 +108,38 @@ func seed(url string) {

}

func shareCache() {

var producer *nsq.Producer
var consumer *nsq.Consumer

handler := nsq.HandlerFunc(func(m *nsq.Message) error {
store.GetRcvChan() <- m.Body
return nil
})

producer = newProducer()

go func() {
for {
// fmt.Println("SndChan: ", store.GetSndChan(), string(json))
err := producer.Publish("share-cache", <-store.GetSndChan())
if err != nil {
fmt.Println("Could not publish", "share-cache", err)
}
}
}()

rand.Seed(time.Now().UnixNano())

consumer = newConsumer("share-cache", fmt.Sprintf("%06d#ephemeral", rand.Int()%999999), handler)
_ = consumer

// defer producer.Stop()
// defer consumer.Stop()

}

func crawl() {

var producer *nsq.Producer
Expand Down Expand Up @@ -237,11 +277,15 @@ func main() {
return
}

store = gryffin.NewSharedGryffinStore()
gryffin.SetMemoryStore(store)

captureCtrlC()

switch service {

case "crawl":
shareCache()
crawl()

case "fuzz-sqlmap":
Expand Down
2 changes: 1 addition & 1 deletion global.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
// "io/ioutil"
)

var memoryStore = NewGryffinStore(false)
var memoryStore *GryffinStore
var logWriter io.Writer

func SetMemoryStore(m *GryffinStore) {
Expand Down
11 changes: 9 additions & 2 deletions gryffin.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,20 @@ type Renderer interface {
type LogMessage struct {
Service string
Msg string
Method string
Url string
JobID string
// Fingerprint Fingerprint
Method string
Url string
}

// NewScan creates a scan.
func NewScan(method, url, post string) *Scan {

// ensure we got a memory store..
if memoryStore == nil {
memoryStore = NewGryffinStore()
}

id := GenRandomID()

job := &Job{ID: GenRandomID()}
Expand Down Expand Up @@ -385,6 +391,7 @@ func (s *Scan) Logm(service, msg string) {
// Fingerprint: s.Fingerprint,
Method: s.Request.Method,
Url: s.Request.URL.String(),
JobID: s.Job.ID,
}
s.Log(m)
}
Expand Down
5 changes: 5 additions & 0 deletions serialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ import (
)

func NewScanFromJson(b []byte) *Scan {
// ensure we got a memory store..
if memoryStore == nil {
memoryStore = NewGryffinStore()
}

var scan Scan
json.Unmarshal(b, &scan)
return &scan
Expand Down
41 changes: 26 additions & 15 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,21 @@ type GryffinStore struct {
}

type PublishMessage struct {
F string // function, i.e. See or Seen
T string // type (kind), i.e. oracle or hash
K string // key
V interface{} // value
F string // function, i.e. See or Seen
T string // type (kind), i.e. oracle or hash
K string // key
V string // value
}

func NewGryffinStore(shared bool) *GryffinStore {
func NewSharedGryffinStore() *GryffinStore {
return newGryffinStore(true)
}

func NewGryffinStore() *GryffinStore {
return newGryffinStore(false)
}

func newGryffinStore(shared bool) *GryffinStore {

store := GryffinStore{
Oracles: make(map[string]*distance.Oracle),
Expand Down Expand Up @@ -65,38 +73,40 @@ func (s *GryffinStore) processRcvMsg() {
fmt.Println("Error in processRcvMsg")
continue
}
fmt.Println("Got a RcvMsg: ", m) // DEBUG
if m.F == "See" {
v, _ := strconv.ParseUint(m.V, 16, 64)
switch m.T {
case "hash":
s.hashesSee(m.K, uint64(m.V.(float64)), true)
s.hashesSee(m.K, v, true)
case "oracle":
s.oracleSee(m.K, uint64(m.V.(float64)), true)
s.oracleSee(m.K, v, true)
}
}
}
}

func (s *GryffinStore) See(prefix string, kind string, v interface{}) {
func (s *GryffinStore) See(prefix string, kind string, v uint64) {

if kind == "oracle" {
s.oracleSee(prefix, v.(uint64), false)
s.oracleSee(prefix, v, false)
return
}
if kind == "hash" {
s.hashesSee(prefix, v.(uint64), false)
s.hashesSee(prefix, v, false)
return
}
}

func (s *GryffinStore) Seen(prefix string, kind string, v interface{}, r uint8) bool {
func (s *GryffinStore) Seen(prefix string, kind string, v uint64, r uint8) bool {

switch kind {
case "oracle":
if oracle, ok := s.Oracles[prefix]; ok {
return oracle.Seen(v.(uint64), r)
return oracle.Seen(v, r)
}
case "hash":
k := prefix + "/" + strconv.FormatUint(v.(uint64), 10)
k := prefix + "/" + strconv.FormatUint(v, 10)
_, ok := s.Hashes[k]
return ok
}
Expand All @@ -116,7 +126,8 @@ func (s *GryffinStore) oracleSee(prefix string, f uint64, localOnly bool) {
// Remote update
if !localOnly && s.snd != nil {
go func() {
jsonPayload, _ := json.Marshal(&PublishMessage{F: "See", T: "oracle", K: k, V: f})
jsonPayload, _ := json.Marshal(&PublishMessage{F: "See", T: "oracle", K: prefix, V: fmt.Sprintf("%x", f)})
// fmt.Println("Sending... ", s.snd, string(jsonPayload))
s.snd <- jsonPayload
}()
}
Expand All @@ -128,7 +139,7 @@ func (s *GryffinStore) hashesSee(prefix string, h uint64, localOnly bool) {
// Remote update
if !localOnly && s.snd != nil {
go func() {
jsonPayload, _ := json.Marshal(&PublishMessage{F: "See", T: "hash", K: k, V: h})
jsonPayload, _ := json.Marshal(&PublishMessage{F: "See", T: "hash", K: prefix, V: fmt.Sprintf("%x", h)})
s.snd <- jsonPayload
}()
}
Expand Down
27 changes: 22 additions & 5 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,21 @@ func TestNewGryffinStore(t *testing.T) {

t.Parallel()

store1 := NewGryffinStore(true)
store2 := NewGryffinStore(true)
store1 := NewSharedGryffinStore()
store2 := NewSharedGryffinStore()

var wg sync.WaitGroup
wg.Add(1)

go func() {
store1.See("foo", "oracle", uint64(0x1234))
b := <-store1.GetSndChan()
var b []byte
b = <-store1.GetSndChan()
t.Log("Store1 got ", string(b))
store2.GetRcvChan() <- b

store1.See("foo", "hash", uint64(0x5678))
b = <-store1.GetSndChan()
t.Log("Store1 got ", string(b))
store2.GetRcvChan() <- b
wg.Done()
Expand All @@ -27,14 +33,25 @@ func TestNewGryffinStore(t *testing.T) {
wg.Wait()
for i := 0; i < 100000; i++ {
if store2.Seen("foo", "oracle", uint64(0x1234), 2) {
t.Logf("Store2 see the new value in %d microseconds.", i)
t.Logf("Store2 see the new oracle value in %d microseconds.", i)
break
}
time.Sleep(1 * time.Microsecond)
}

if !store2.Seen("foo", "oracle", uint64(0x1234), 2) {
t.Error("2nd store should see the value in oracle.", store2.Oracles)
t.Error("2nd store should see the oracle value in oracle.", store2.Oracles)
}

for i := 0; i < 100000; i++ {
if store2.Seen("foo", "hash", uint64(0x5678), 2) {
t.Logf("Store2 see the new hash value in %d microseconds.", i)
break
}
time.Sleep(1 * time.Microsecond)
}

if !store2.Seen("foo", "hash", uint64(0x5678), 2) {
t.Error("2nd store should see the hash value in hashes.", store2.Hashes)
}
}
2 changes: 1 addition & 1 deletion util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ func GenRandomID() string {
// UUID generation is trivial per RSC in https://groups.google.com/d/msg/golang-dev/zwB0k2mpshc/l3zS3oxXuNwJ
buf := make([]byte, 16)
io.ReadFull(rand.Reader, buf)
return fmt.Sprintf("%x", buf)
return fmt.Sprintf("%X", buf)
}

0 comments on commit aee6cf1

Please sign in to comment.