Skip to content

Commit

Permalink
new blockchainless agreement protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
Dave Booz committed Jun 15, 2017
1 parent a192ffc commit fa03d7b
Show file tree
Hide file tree
Showing 27 changed files with 1,116 additions and 120 deletions.
49 changes: 49 additions & 0 deletions abstractprotocol/cancel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package abstractprotocol

import (
"fmt"
)

// =======================================================================================================
// Cancel - This is the interface that Horizon uses to interact with cancel messages in the agreement
// protocol.
//

type Cancel interface {
ProtocolMessage
Reason() uint
}

// This struct is the cancel that flows from the consumer to the producer or producer to consumer.
type BaseCancel struct {
*BaseProtocolMessage
TheReason uint `json:"reason"`
}

func (bc *BaseCancel) IsValid() bool {
return bc.BaseProtocolMessage.IsValid() && bc.MsgType == MsgTypeCancel
}

func (bc *BaseCancel) String() string {
return bc.BaseProtocolMessage.String() + fmt.Sprintf(", Reason: %v", bc.TheReason)
}

func (bc *BaseCancel) ShortString() string {
return bc.BaseProtocolMessage.ShortString() + fmt.Sprintf(", Reason: %v", bc.TheReason)
}

func (bc *BaseCancel) Reason() uint {
return bc.TheReason
}

func NewBaseCancel(name string, version int, id string, reason uint) *BaseCancel {
return &BaseCancel{
BaseProtocolMessage: &BaseProtocolMessage{
MsgType: MsgTypeCancel,
AProtocol: name,
AVersion: version,
AgreeId: id,
},
TheReason: reason,
}
}
34 changes: 33 additions & 1 deletion abstractprotocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const MsgTypeReplyAck = "replyack"
const MsgTypeDataReceived = "dataverification"
const MsgTypeDataReceivedAck = "dataverificationack"
const MsgTypeNotifyMetering = "meteringnotification"
const MsgTypeCancel = "cancel"

// All protocol message have the following header info.
type ProtocolMessage interface {
Expand Down Expand Up @@ -132,7 +133,9 @@ type ProtocolHandler interface {
TerminateAgreement(policy *policy.Policy,
counterParty string,
agreementId string,
reason uint) error
reason uint,
messageTarget interface{},
sendMessage func(mt interface{}, pay []byte) error) error

VerifyAgreement(agreementId string,
counterParty string,
Expand All @@ -148,6 +151,7 @@ type ProtocolHandler interface {
ValidateDataReceived(dr string) (DataReceived, error)
ValidateDataReceivedAck(dra string) (DataReceivedAck, error)
ValidateMeterNotification(mn string) (NotifyMetering, error)
ValidateCancel(can string) (Cancel, error)
}

type BaseProtocolHandler struct {
Expand Down Expand Up @@ -496,6 +500,34 @@ func ValidateMeterNotification(mn string) (NotifyMetering, error) {

}

func ValidateCancel(can string) (Cancel, error) {

// attempt deserialization of message from msg payload
c := new(BaseCancel)

if err := json.Unmarshal([]byte(can), c); err != nil {
return nil, errors.New(fmt.Sprintf("Error deserializing cancel: %s, error: %v", can, err))
} else if c.IsValid() {
return c, nil
} else {
return nil, errors.New(fmt.Sprintf("Message is not a Cancel."))
}

}

func DemarshalProposal(proposal string) (Proposal, error) {

// attempt deserialization of the proposal
prop := new(BaseProposal)

if err := json.Unmarshal([]byte(proposal), &prop); err != nil {
return nil, errors.New(fmt.Sprintf("Error deserializing proposal: %s, error: %v", proposal, err))
} else {
return prop, nil
}

}

var AAPlogString = func(p string, v interface{}) string {
return fmt.Sprintf("AbstractProtocol (%v): %v", p, v)
}
2 changes: 1 addition & 1 deletion agreement/agreement.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (w *AgreementWorker) start() {
} else if _, ok := w.producerPH[msgProtocol]; !ok {
glog.Infof(logString(fmt.Sprintf("unable to direct exchange message %v to a protocol handler, deleting it.", protocolMsg)))
} else if p, err := w.producerPH[msgProtocol].AgreementProtocolHandler().ValidateProposal(protocolMsg); err != nil {
glog.Warningf(logString(fmt.Sprintf("Proposal handler ignoring non-proposal message: %s due to %v", cmd.Msg.ShortProtocolMessage(), err)))
glog.V(5).Infof(logString(fmt.Sprintf("Proposal handler ignoring non-proposal message: %s due to %v", cmd.Msg.ShortProtocolMessage(), err)))
deleteMessage = false
} else {
deleteMessage = w.producerPH[msgProtocol].HandleProposalMessage(p, protocolMsg, exchangeMsg)
Expand Down
3 changes: 2 additions & 1 deletion agreementbot/agreementbot.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,13 +469,14 @@ func (w *AgreementBotWorker) alreadyMakingAgreementWith(dev *exchange.SearchResu
// Check to see if we're already doing something with this device
pendingAgreementFilter := func() AFilter {
return func(a Agreement) bool {
return a.DeviceId == dev.Id && a.PolicyName == consumerPolicy.Header.Name && a.AgreementFinalizedTime == 0 && a.AgreementTimedout == 0
return a.DeviceId == dev.Id && a.PolicyName == consumerPolicy.Header.Name && a.AgreementTimedout == 0
}
}

// Search all agreement protocol buckets
for _, agp := range policy.AllAgreementProtocols() {
// Find all agreements that are in progress. They might be waiting for a reply or not yet finalized on blockchain.
// TODO: To support more than 1 agreement (maxagreements > 1) with this device for this policy, we need to adjust this logic.
if agreements, err := FindAgreements(w.db, []AFilter{UnarchivedAFilter(), pendingAgreementFilter()}, agp); err != nil {
glog.Errorf("AgreementBotWorker received error trying to find pending agreements for protocol %v: %v", agp, err)
} else if len(agreements) != 0 {
Expand Down
26 changes: 11 additions & 15 deletions agreementbot/agreementworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (b *BaseAgreementWorker) InitiateNewAgreement(cph ConsumerProtocolHandler,
}

// Create pending agreement in database
if err := AgreementAttempt(b.db, agreementIdString, wi.Device.Id, wi.ConsumerPolicy.Header.Name, "Citizen Scientist"); err != nil {
if err := AgreementAttempt(b.db, agreementIdString, wi.Device.Id, wi.ConsumerPolicy.Header.Name, cph.Name()); err != nil {
glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("error persisting agreement attempt: %v", err)))

// Create message target for protocol message
Expand All @@ -174,7 +174,7 @@ func (b *BaseAgreementWorker) InitiateNewAgreement(cph ConsumerProtocolHandler,

}

func (b *BaseAgreementWorker) HandleAgreementReply(cph ConsumerProtocolHandler, wi *HandleReply, workerId string) {
func (b *BaseAgreementWorker) HandleAgreementReply(cph ConsumerProtocolHandler, wi *HandleReply, workerId string) bool {

reply := wi.Reply
protocolHandler := cph.AgreementProtocolHandler()
Expand All @@ -190,10 +190,11 @@ func (b *BaseAgreementWorker) HandleAgreementReply(cph ConsumerProtocolHandler,
// The lock is dropped at the end of this function or right before the blockchain write. Early exit from this function is NOT allowed.
droppedLock := false

if reply.ProposalAccepted() {
// Assume we will ack negatively unless we find out that everything is ok.
ackReplyAsValid := false
sendReply := true

// The producer is happy with the proposal. Assume we will ack negatively unless we find out that everything is ok.
ackReplyAsValid := false
if reply.ProposalAccepted() {

// Find the saved agreement in the database
if agreement, err := FindSingleAgreementByAgreementId(b.db, reply.AgreementId(), cph.Name(), []AFilter{UnarchivedAFilter()}); err != nil {
Expand All @@ -203,7 +204,7 @@ func (b *BaseAgreementWorker) HandleAgreementReply(cph ConsumerProtocolHandler,
} else if agreement.CounterPartyAddress != "" {
glog.V(5).Infof(BAWlogstring(workerId, fmt.Sprintf("discarding reply, agreement id %v already received a reply", agreement.CurrentAgreementId)))
// this will cause us to not send a reply ack, which is what we want in this case
ackReplyAsValid = true
sendReply = false

// Now we need to write the info to the exchange and the database
} else if proposal, err := protocolHandler.DemarshalProposal(agreement.Proposal); err != nil {
Expand Down Expand Up @@ -284,7 +285,7 @@ func (b *BaseAgreementWorker) HandleAgreementReply(cph ConsumerProtocolHandler,
}

// Always send an ack for a reply with a positive decision in it
if !ackReplyAsValid {
if !ackReplyAsValid && sendReply {
if mt, err := exchange.CreateMessageTarget(wi.SenderId, nil, wi.SenderPubKey, wi.From); err != nil {
glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("error creating message target: %v", err)))
} else if err := protocolHandler.Confirm(ackReplyAsValid, reply.AgreementId(), mt, cph.GetSendMessage()); err != nil {
Expand All @@ -310,6 +311,8 @@ func (b *BaseAgreementWorker) HandleAgreementReply(cph ConsumerProtocolHandler,
}
}

return ackReplyAsValid

}

func (b *BaseAgreementWorker) HandleDataReceivedAck(cph ConsumerProtocolHandler, wi *HandleDataReceivedAck, workerId string) {
Expand Down Expand Up @@ -443,14 +446,7 @@ func (b *BaseAgreementWorker) CancelAgreement(cph ConsumerProtocolHandler, agree

func (b *BaseAgreementWorker) DoBlockchainCancel(cph ConsumerProtocolHandler, ag *Agreement, reason uint, workerId string) {

protocolHandler := cph.AgreementProtocolHandler()

// Remove from the blockchain
if pol, err := policy.DemarshalPolicy(ag.Policy); err != nil {
glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("unable to demarshal policy while trying to cancel %v, error %v", ag.CurrentAgreementId, err)))
} else if err := protocolHandler.TerminateAgreement(pol, ag.CounterPartyAddress, ag.CurrentAgreementId, reason); err != nil {
glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("error terminating agreement %v on the blockchain: %v", ag.CurrentAgreementId, err)))
}
cph.TerminateAgreement(ag, reason, workerId)
}

func generateAgreementId(random *rand.Rand) []byte {
Expand Down
91 changes: 91 additions & 0 deletions agreementbot/basic_agreement_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package agreementbot

import (
"fmt"
"github.com/boltdb/bolt"
"github.com/golang/glog"
"github.com/open-horizon/anax/config"
"github.com/open-horizon/anax/policy"
"github.com/satori/go.uuid"
"math/rand"
"net/http"
"runtime"
"time"
)

type BasicAgreementWorker struct {
*BaseAgreementWorker
protocolHandler *BasicProtocolHandler
}

func NewBasicAgreementWorker(c *BasicProtocolHandler, cfg *config.HorizonConfig, db *bolt.DB, pm *policy.PolicyManager, alm *AgreementLockManager) *BasicAgreementWorker {

p := &BasicAgreementWorker{
BaseAgreementWorker: &BaseAgreementWorker{
pm: pm,
db: db,
config: cfg,
alm: alm,
workerID: uuid.NewV4().String(),
httpClient: &http.Client{Timeout: time.Duration(config.HTTPDEFAULTTIMEOUT * time.Millisecond)},
},
protocolHandler: c,
}

return p
}

// This function receives an event to "make a new agreement" from the Process function, and then synchronously calls a function
// to actually work through the agreement protocol.
func (a *BasicAgreementWorker) start(work chan AgreementWork, random *rand.Rand) {

for {
glog.V(5).Infof(bwlogstring(a.workerID, fmt.Sprintf("blocking for work")))
workItem := <-work // block waiting for work
glog.V(2).Infof(bwlogstring(a.workerID, fmt.Sprintf("received work: %v", workItem)))

if workItem.Type() == INITIATE {
wi := workItem.(InitiateAgreement)
a.InitiateNewAgreement(a.protocolHandler, &wi, random, a.workerID)

} else if workItem.Type() == REPLY {
wi := workItem.(HandleReply)
if ok := a.HandleAgreementReply(a.protocolHandler, &wi, a.workerID); ok {
// Update state in the database
if ag, err := AgreementFinalized(a.db, wi.Reply.AgreementId(), a.protocolHandler.Name()); err != nil {
glog.Errorf(bwlogstring(a.workerID, fmt.Sprintf("error persisting agreement %v finalized: %v", wi.Reply.AgreementId(), err)))

// Update state in exchange
} else if pol, err := policy.DemarshalPolicy(ag.Policy); err != nil {
glog.Errorf(bwlogstring(a.workerID, fmt.Sprintf("error demarshalling policy from agreement %v, error: %v", wi.Reply.AgreementId(), err)))
} else if err := a.protocolHandler.RecordConsumerAgreementState(wi.Reply.AgreementId(), pol.APISpecs[0].SpecRef, "Finalized Agreement", a.workerID); err != nil {
glog.Errorf(bwlogstring(a.workerID, fmt.Sprintf("error setting agreement %v finalized state in exchange: %v", wi.Reply.AgreementId(), err)))
}
}

} else if workItem.Type() == DATARECEIVEDACK {
wi := workItem.(HandleDataReceivedAck)
a.HandleDataReceivedAck(a.protocolHandler, &wi, a.workerID)

} else if workItem.Type() == CANCEL {
wi := workItem.(CancelAgreement)
a.CancelAgreementWithLock(a.protocolHandler, wi.AgreementId, wi.Reason, a.workerID)

} else if workItem.Type() == WORKLOAD_UPGRADE {
// upgrade a workload on a device
wi := workItem.(HandleWorkloadUpgrade)
a.HandleWorkloadUpgrade(a.protocolHandler, &wi, a.workerID)

} else {
glog.Errorf(bwlogstring(a.workerID, fmt.Sprintf("received unknown work request: %v", workItem)))
}

glog.V(5).Infof(bwlogstring(a.workerID, fmt.Sprintf("handled work: %v", workItem)))
runtime.Gosched()

}
}

var bwlogstring = func(workerID string, v interface{}) string {
return fmt.Sprintf("BasicAgreementWorker (%v): %v", workerID, v)
}
Loading

0 comments on commit fa03d7b

Please sign in to comment.