Commit 3fc9f8e8 authored by Alexandre Moevi's avatar Alexandre Moevi
Browse files

POC to check AMQP performances with che and axe

parent 9ea51652
Loading
Loading
Loading
Loading
Loading
+9 −4
Original line number Diff line number Diff line
@@ -20,6 +20,7 @@ type EngineAxe struct {
	canopsis.DefaultEngine
	Options    Options
	References References
	Messages   int
}

func (e *EngineAxe) ConsumerChan() (<-chan amqp.Delivery, error) {
@@ -185,12 +186,16 @@ func (e *EngineAxe) PeriodicalProcess() {
}

func (e *EngineAxe) ackMsg(msg amqp.Delivery) error {
	err := e.References.ChannelSub.Ack(msg.DeliveryTag, false)
	e.Messages++
	if e.Messages%10 == 0 {
		err := e.References.ChannelSub.Ack(msg.DeliveryTag, true)
		if err != nil {
			e.DebugPrint("ack error: %+v", err)
		}
		return err
	}
	return nil
}

func (e *EngineAxe) processWorkerError(err error, msg amqp.Delivery) error {
	if err == nil {
+9 −4
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@ type EngineChe struct {
	canopsis.DefaultEngine
	Options    Options
	References References
	Messages   int
}

func (e *EngineChe) ConsumerChan() (<-chan amqp.Delivery, error) {
@@ -87,12 +88,16 @@ func (e *EngineChe) PeriodicalProcess() {
}

func (e *EngineChe) ackMsg(msg amqp.Delivery) error {
	err := e.References.ChannelSub.Ack(msg.DeliveryTag, false)
	e.Messages++
	if e.Messages%10 == 0 {
		err := e.References.ChannelSub.Ack(msg.DeliveryTag, true)
		if err != nil {
			e.DebugPrint("ack error: %+v", err)
		}
		return err
	}
	return nil
}

// processWorkerError read errors, ack amqp messages and stop the engine if needed
func (e *EngineChe) processWorkerError(err error, msg amqp.Delivery) error {