Loading cmd/engine-watcher/engine.go +4 −1 Original line number Diff line number Diff line Loading @@ -131,7 +131,10 @@ func (e *EngineWatcher) WorkerProcess(msg amqp.Delivery) { return } } else if event.BulkAlarmsWithEntity != nil && event.BulkType == types.AlarmChangeTypeResolve { e.References.WatcherService.ProcessResolvedAlarms(ctx, *event.BulkAlarmsWithEntity) err := e.References.WatcherService.ProcessResolvedAlarms(ctx, *event.BulkAlarmsWithEntity) if e.processWorkerError(err, msg) != nil { return } } // Encode and publish the event to the next engine Loading lib/canopsis/watcher/interface.go +1 −1 Original line number Diff line number Diff line Loading @@ -67,7 +67,7 @@ type Service interface { Process(ctx context.Context, event *types.Event, alarmChange types.AlarmChange) error // ProcessResolvedAlarms updates the watchers impacted by the resolution of provided alarms ProcessResolvedAlarms(ctx context.Context, alarms []types.AlarmWithEntity) ProcessResolvedAlarms(ctx context.Context, alarms []types.AlarmWithEntity) error // ProcessUpdateWatcherEvent updates the watchers impacted by provided updatewatcher event ProcessUpdateWatcherEvent(ctx context.Context, event *types.Event) error Loading lib/canopsis/watcher/service.go +145 −155 Original line number Diff line number Diff line Loading @@ -49,28 +49,6 @@ func NewService( return service } func (s service) ComputeAllWatchers(ctx context.Context) error { defer trace.StartRegion(ctx, "watcher.ComputeAllWatchers").End() watchersID, err := s.watcherAdapter.GetAllValidWatchersID() if err != nil { return err } watchersWithD, err := s.watcherAdapter.GetWatchersWithDependencies(watchersID) if err != nil { return err } for _, entry := range watchersWithD { err := s.computeAndSendEvent(entry, "") if err != nil { log.Printf("Computing and sending event failed during global watcher recomputing : %v", err) } } return nil } // updateWatcherState computes the state of a watcher given its AlarmCounters, // and sends an event to update the corresponding alarm. func (s service) updateWatcherState( Loading Loading @@ -103,85 +81,6 @@ func (s service) updateWatcherState( }) } // Computes the provided watcher with dependencies, returning the state, output, and eventual error func (s service) Compute(watcherD WatcherWithDependencies) (int, string, error) { var alarms []types.Alarm s.alarmAdapter.GetOpenedAlarmsByIDs(watcherD.Dependencies, &alarms) var activePbhsByEntity map[string][]types.PBehavior now := time.Now() activePbhsByEntity = make(map[string][]types.PBehavior) for _, entityID := range watcherD.Dependencies { var pbhs []types.PBehavior pbhs, err := s.pbehaviorService.GetByEntityIds([]string{entityID}, true) if err != nil { return 0, "", err } for _, pbh := range pbhs { isActive, err := pbh.IsActive(now) if err != nil { fmt.Printf("Pbehavior error %+v", err) isActive = false } if isActive == true { activePbhsByEntity[entityID] = append(activePbhsByEntity[entityID], pbh) } } } data := AlarmCounters{} for _, a := range alarms { data.Alarms++ if a.Value.State.Value == types.AlarmStateOK { data.State.Info++ } else if a.Value.State.Value == 1 { data.State.Minor++ } else if a.Value.State.Value == 2 { data.State.Major++ } else if a.Value.State.Value == 3 { data.State.Critical++ } if a.Value.ACK != nil { data.Acknowledged++ } else { data.NotAcknowledged++ } } output, err := watcherD.Watcher.GetOutput(data) if err != nil { log.Println(err) return 0, "", err } state, err := watcherD.Watcher.GetState(data) if err != nil { log.Println(err) return 0, "", err } return state, output, nil } // worst returns the state of the worst alarm state func (s service) worst(watcher Watcher, alarms []types.Alarm, activePbhsByEntity map[string][]types.PBehavior) types.CpsNumber { var worstState types.CpsNumber for _, alarm := range alarms { if _, hasKey := activePbhsByEntity[alarm.EntityID]; hasKey == false { if alarm.Value.State.Value > worstState { worstState = alarm.Value.State.Value } } } return worstState } // sendEvent sends a watcher event. func (s service) sendEvent(event types.Event) error { jevt, err := s.jsonEncoder.Encode(event) Loading @@ -206,30 +105,6 @@ func (s service) sendEvent(event types.Event) error { return nil } // computeAndSendEvent computes the provided watcher, and sends the event to Axe with the provided author as event author, // returning error if any happens func (s service) computeAndSendEvent(watcherD WatcherWithDependencies, author string) error { // Recompute state and output state, output, err := s.Compute(watcherD) if err != nil { return err } // Send check event to Axe checkEvent := types.Event{ EventType: types.EventTypeCheck, SourceType: types.SourceTypeComponent, Component: watcherD.Watcher.ID, Connector: "watcher", ConnectorName: "watcher", State: types.CpsNumber(state), Output: output, Author: author, Timestamp: types.CpsTime{Time: time.Now()}, } return s.sendEvent(checkEvent) } // Process processes an event and updates the watchers impacted by the event func (s service) Process(ctx context.Context, event *types.Event, alarmChange types.AlarmChange) error { defer trace.StartRegion(ctx, "watcher.Process").End() Loading @@ -238,66 +113,181 @@ func (s service) Process(ctx context.Context, event *types.Event, alarmChange ty return errt.NewUnknownError(fmt.Errorf("Event entity is nil : %v", event)) } if alarmChange.Type == types.AlarmChangeTypeAck || alarmChange.Type == types.AlarmChangeTypeAckremove || alarmChange.Type == types.AlarmChangeTypeStateIncrease || alarmChange.Type == types.AlarmChangeTypeStateDecrease || alarmChange.Type == types.AlarmChangeTypeCreate { if alarmChange.Type != types.AlarmChangeTypeAck && alarmChange.Type != types.AlarmChangeTypeAckremove && alarmChange.Type != types.AlarmChangeTypeStateIncrease && alarmChange.Type != types.AlarmChangeTypeStateDecrease && alarmChange.Type != types.AlarmChangeTypeCreate { return nil } watchersWithD, err := s.watcherAdapter.GetWatchersWithDependencies(event.Entity.Impacts.List()) watchers := []Watcher{} err := s.watcherAdapter.GetAll(&watchers) if err != nil { return err } for _, entry := range watchersWithD { err := s.computeAndSendEvent(entry, event.Author) isWatcher := map[string]bool{} for _, watcher := range watchers { isWatcher[watcher.ID] = true } pbehaviors, err := s.pbehaviorService.GetByEntityIds([]string{event.Entity.ID}, true) if err != nil { return err } entity := AnnotatedEntity{ Entity: *event.Entity, Alarm: event.Alarm, PBehaviors: pbehaviors, } dependencyState := NewDependencyStateFromAnnotatedEntity( entity, isWatcher, time.Now()) if len(dependencyState.ImpactedWatchers) == 0 { return nil } watcherCounters, err := s.countService.ProcessState(dependencyState) if err != nil { log.Printf("Computing and sending event failed during alarm change : %v", err) log.Printf("Unable to process state : %+v", err) } for _, watcher := range watchers { counters, exists := watcherCounters[watcher.ID] if exists { err := s.updateWatcherState(watcher, counters) if err != nil { log.Printf("Unable to update watcher state %+v", err) } } } return nil } func (s service) ProcessUpdateWatcherEvent(ctx context.Context, event *types.Event) error { defer trace.StartRegion(ctx, "watcher.ProcessUpdateWatcherEvent").End() if event.EventType == types.EventTypeUpdateWatcher { watchersWithD, err := s.watcherAdapter.GetWatchersWithDependencies([]string{event.Entity.ID}) s.ComputeAllWatchers(ctx) //if event.EventType == types.EventTypeUpdateWatcher { // watchersWithD, err := s.watcherAdapter.GetWatchersWithDependencies([]string{event.Entity.ID}) // if err != nil { // return err // } // for _, entry := range watchersWithD { // err := s.computeAndSendEvent(entry, event.Author) // if err != nil { // log.Printf("Computing and sending event failed during updatewatcher : %v", err) // } // } //} return nil } func (s service) ProcessResolvedAlarms(ctx context.Context, alarms []types.AlarmWithEntity) error { defer trace.StartRegion(ctx, "watcher.ProcessResolvedAlarms").End() watchers := []Watcher{} err := s.watcherAdapter.GetAll(&watchers) if err != nil { return err } for _, entry := range watchersWithD { err := s.computeAndSendEvent(entry, event.Author) isWatcher := map[string]bool{} for _, watcher := range watchers { isWatcher[watcher.ID] = true } now := time.Now() watcherCounters := map[string]AlarmCounters{} for _, alarm := range alarms { pbehaviors, err := s.pbehaviorService.GetByEntityIds([]string{alarm.Entity.ID}, true) if err != nil { // TODO: log continue } entity := AnnotatedEntity{ Entity: alarm.Entity, Alarm: &alarm.Alarm, PBehaviors: pbehaviors, } dependencyState := NewDependencyStateFromAnnotatedEntity( entity, isWatcher, now) if len(dependencyState.ImpactedWatchers) == 0 { continue } impactCounters, err := s.countService.ProcessState(dependencyState) if err != nil { log.Printf("Computing and sending event failed during updatewatcher : %v", err) log.Printf("Unable to process state : %+v", err) } for watcherID, counters := range impactCounters { watcherCounters[watcherID] = counters } } for _, watcher := range watchers { counters, exists := watcherCounters[watcher.ID] if exists { err := s.updateWatcherState(watcher, counters) if err != nil { log.Printf("Unable to update watcher state %+v", err) } } } return nil } func (s service) ProcessResolvedAlarms(ctx context.Context, alarms []types.AlarmWithEntity) { defer trace.StartRegion(ctx, "watcher.ProcessResolvedAlarms").End() func (s service) ComputeAllWatchers(ctx context.Context) error { defer trace.StartRegion(ctx, "watcher.ComputeAllWatchers").End() watchers := []Watcher{} err := s.watcherAdapter.GetAll(&watchers) if err != nil { return err } // impacts contains all the entities IDs impacted by resolved alarms impacts := []string{} isWatcher := map[string]bool{} for _, watcher := range watchers { isWatcher[watcher.ID] = true } for _, alarm := range alarms { impacts = append(impacts, alarm.Entity.Impacts.List()...) now := time.Now() entities, err := s.watcherAdapter.GetAnnotatedEntities() if err != nil { return err } // Get the watchers from the resolved alarms watchersWithD, err := s.watcherAdapter.GetWatchersWithDependencies(impacts) watcherCounters := map[string]AlarmCounters{} for _, entity := range entities { dependencyState := NewDependencyStateFromAnnotatedEntity( entity, isWatcher, now) if len(dependencyState.ImpactedWatchers) == 0 { continue } impactCounters, err := s.countService.ProcessState(dependencyState) if err != nil { log.Printf("Getting watchers and dependencies failed : %v", err) log.Printf("Unable to process state : %+v", err) } for watcherID, counters := range impactCounters { watcherCounters[watcherID] = counters } } for _, entry := range watchersWithD { err = s.computeAndSendEvent(entry, "") for _, watcher := range watchers { counters, exists := watcherCounters[watcher.ID] if exists { err := s.updateWatcherState(watcher, counters) if err != nil { log.Printf("Computing and sending event failed : %v", err) log.Printf("Unable to update watcher state %+v", err) } } } return nil } Loading
cmd/engine-watcher/engine.go +4 −1 Original line number Diff line number Diff line Loading @@ -131,7 +131,10 @@ func (e *EngineWatcher) WorkerProcess(msg amqp.Delivery) { return } } else if event.BulkAlarmsWithEntity != nil && event.BulkType == types.AlarmChangeTypeResolve { e.References.WatcherService.ProcessResolvedAlarms(ctx, *event.BulkAlarmsWithEntity) err := e.References.WatcherService.ProcessResolvedAlarms(ctx, *event.BulkAlarmsWithEntity) if e.processWorkerError(err, msg) != nil { return } } // Encode and publish the event to the next engine Loading
lib/canopsis/watcher/interface.go +1 −1 Original line number Diff line number Diff line Loading @@ -67,7 +67,7 @@ type Service interface { Process(ctx context.Context, event *types.Event, alarmChange types.AlarmChange) error // ProcessResolvedAlarms updates the watchers impacted by the resolution of provided alarms ProcessResolvedAlarms(ctx context.Context, alarms []types.AlarmWithEntity) ProcessResolvedAlarms(ctx context.Context, alarms []types.AlarmWithEntity) error // ProcessUpdateWatcherEvent updates the watchers impacted by provided updatewatcher event ProcessUpdateWatcherEvent(ctx context.Context, event *types.Event) error Loading
lib/canopsis/watcher/service.go +145 −155 Original line number Diff line number Diff line Loading @@ -49,28 +49,6 @@ func NewService( return service } func (s service) ComputeAllWatchers(ctx context.Context) error { defer trace.StartRegion(ctx, "watcher.ComputeAllWatchers").End() watchersID, err := s.watcherAdapter.GetAllValidWatchersID() if err != nil { return err } watchersWithD, err := s.watcherAdapter.GetWatchersWithDependencies(watchersID) if err != nil { return err } for _, entry := range watchersWithD { err := s.computeAndSendEvent(entry, "") if err != nil { log.Printf("Computing and sending event failed during global watcher recomputing : %v", err) } } return nil } // updateWatcherState computes the state of a watcher given its AlarmCounters, // and sends an event to update the corresponding alarm. func (s service) updateWatcherState( Loading Loading @@ -103,85 +81,6 @@ func (s service) updateWatcherState( }) } // Computes the provided watcher with dependencies, returning the state, output, and eventual error func (s service) Compute(watcherD WatcherWithDependencies) (int, string, error) { var alarms []types.Alarm s.alarmAdapter.GetOpenedAlarmsByIDs(watcherD.Dependencies, &alarms) var activePbhsByEntity map[string][]types.PBehavior now := time.Now() activePbhsByEntity = make(map[string][]types.PBehavior) for _, entityID := range watcherD.Dependencies { var pbhs []types.PBehavior pbhs, err := s.pbehaviorService.GetByEntityIds([]string{entityID}, true) if err != nil { return 0, "", err } for _, pbh := range pbhs { isActive, err := pbh.IsActive(now) if err != nil { fmt.Printf("Pbehavior error %+v", err) isActive = false } if isActive == true { activePbhsByEntity[entityID] = append(activePbhsByEntity[entityID], pbh) } } } data := AlarmCounters{} for _, a := range alarms { data.Alarms++ if a.Value.State.Value == types.AlarmStateOK { data.State.Info++ } else if a.Value.State.Value == 1 { data.State.Minor++ } else if a.Value.State.Value == 2 { data.State.Major++ } else if a.Value.State.Value == 3 { data.State.Critical++ } if a.Value.ACK != nil { data.Acknowledged++ } else { data.NotAcknowledged++ } } output, err := watcherD.Watcher.GetOutput(data) if err != nil { log.Println(err) return 0, "", err } state, err := watcherD.Watcher.GetState(data) if err != nil { log.Println(err) return 0, "", err } return state, output, nil } // worst returns the state of the worst alarm state func (s service) worst(watcher Watcher, alarms []types.Alarm, activePbhsByEntity map[string][]types.PBehavior) types.CpsNumber { var worstState types.CpsNumber for _, alarm := range alarms { if _, hasKey := activePbhsByEntity[alarm.EntityID]; hasKey == false { if alarm.Value.State.Value > worstState { worstState = alarm.Value.State.Value } } } return worstState } // sendEvent sends a watcher event. func (s service) sendEvent(event types.Event) error { jevt, err := s.jsonEncoder.Encode(event) Loading @@ -206,30 +105,6 @@ func (s service) sendEvent(event types.Event) error { return nil } // computeAndSendEvent computes the provided watcher, and sends the event to Axe with the provided author as event author, // returning error if any happens func (s service) computeAndSendEvent(watcherD WatcherWithDependencies, author string) error { // Recompute state and output state, output, err := s.Compute(watcherD) if err != nil { return err } // Send check event to Axe checkEvent := types.Event{ EventType: types.EventTypeCheck, SourceType: types.SourceTypeComponent, Component: watcherD.Watcher.ID, Connector: "watcher", ConnectorName: "watcher", State: types.CpsNumber(state), Output: output, Author: author, Timestamp: types.CpsTime{Time: time.Now()}, } return s.sendEvent(checkEvent) } // Process processes an event and updates the watchers impacted by the event func (s service) Process(ctx context.Context, event *types.Event, alarmChange types.AlarmChange) error { defer trace.StartRegion(ctx, "watcher.Process").End() Loading @@ -238,66 +113,181 @@ func (s service) Process(ctx context.Context, event *types.Event, alarmChange ty return errt.NewUnknownError(fmt.Errorf("Event entity is nil : %v", event)) } if alarmChange.Type == types.AlarmChangeTypeAck || alarmChange.Type == types.AlarmChangeTypeAckremove || alarmChange.Type == types.AlarmChangeTypeStateIncrease || alarmChange.Type == types.AlarmChangeTypeStateDecrease || alarmChange.Type == types.AlarmChangeTypeCreate { if alarmChange.Type != types.AlarmChangeTypeAck && alarmChange.Type != types.AlarmChangeTypeAckremove && alarmChange.Type != types.AlarmChangeTypeStateIncrease && alarmChange.Type != types.AlarmChangeTypeStateDecrease && alarmChange.Type != types.AlarmChangeTypeCreate { return nil } watchersWithD, err := s.watcherAdapter.GetWatchersWithDependencies(event.Entity.Impacts.List()) watchers := []Watcher{} err := s.watcherAdapter.GetAll(&watchers) if err != nil { return err } for _, entry := range watchersWithD { err := s.computeAndSendEvent(entry, event.Author) isWatcher := map[string]bool{} for _, watcher := range watchers { isWatcher[watcher.ID] = true } pbehaviors, err := s.pbehaviorService.GetByEntityIds([]string{event.Entity.ID}, true) if err != nil { return err } entity := AnnotatedEntity{ Entity: *event.Entity, Alarm: event.Alarm, PBehaviors: pbehaviors, } dependencyState := NewDependencyStateFromAnnotatedEntity( entity, isWatcher, time.Now()) if len(dependencyState.ImpactedWatchers) == 0 { return nil } watcherCounters, err := s.countService.ProcessState(dependencyState) if err != nil { log.Printf("Computing and sending event failed during alarm change : %v", err) log.Printf("Unable to process state : %+v", err) } for _, watcher := range watchers { counters, exists := watcherCounters[watcher.ID] if exists { err := s.updateWatcherState(watcher, counters) if err != nil { log.Printf("Unable to update watcher state %+v", err) } } } return nil } func (s service) ProcessUpdateWatcherEvent(ctx context.Context, event *types.Event) error { defer trace.StartRegion(ctx, "watcher.ProcessUpdateWatcherEvent").End() if event.EventType == types.EventTypeUpdateWatcher { watchersWithD, err := s.watcherAdapter.GetWatchersWithDependencies([]string{event.Entity.ID}) s.ComputeAllWatchers(ctx) //if event.EventType == types.EventTypeUpdateWatcher { // watchersWithD, err := s.watcherAdapter.GetWatchersWithDependencies([]string{event.Entity.ID}) // if err != nil { // return err // } // for _, entry := range watchersWithD { // err := s.computeAndSendEvent(entry, event.Author) // if err != nil { // log.Printf("Computing and sending event failed during updatewatcher : %v", err) // } // } //} return nil } func (s service) ProcessResolvedAlarms(ctx context.Context, alarms []types.AlarmWithEntity) error { defer trace.StartRegion(ctx, "watcher.ProcessResolvedAlarms").End() watchers := []Watcher{} err := s.watcherAdapter.GetAll(&watchers) if err != nil { return err } for _, entry := range watchersWithD { err := s.computeAndSendEvent(entry, event.Author) isWatcher := map[string]bool{} for _, watcher := range watchers { isWatcher[watcher.ID] = true } now := time.Now() watcherCounters := map[string]AlarmCounters{} for _, alarm := range alarms { pbehaviors, err := s.pbehaviorService.GetByEntityIds([]string{alarm.Entity.ID}, true) if err != nil { // TODO: log continue } entity := AnnotatedEntity{ Entity: alarm.Entity, Alarm: &alarm.Alarm, PBehaviors: pbehaviors, } dependencyState := NewDependencyStateFromAnnotatedEntity( entity, isWatcher, now) if len(dependencyState.ImpactedWatchers) == 0 { continue } impactCounters, err := s.countService.ProcessState(dependencyState) if err != nil { log.Printf("Computing and sending event failed during updatewatcher : %v", err) log.Printf("Unable to process state : %+v", err) } for watcherID, counters := range impactCounters { watcherCounters[watcherID] = counters } } for _, watcher := range watchers { counters, exists := watcherCounters[watcher.ID] if exists { err := s.updateWatcherState(watcher, counters) if err != nil { log.Printf("Unable to update watcher state %+v", err) } } } return nil } func (s service) ProcessResolvedAlarms(ctx context.Context, alarms []types.AlarmWithEntity) { defer trace.StartRegion(ctx, "watcher.ProcessResolvedAlarms").End() func (s service) ComputeAllWatchers(ctx context.Context) error { defer trace.StartRegion(ctx, "watcher.ComputeAllWatchers").End() watchers := []Watcher{} err := s.watcherAdapter.GetAll(&watchers) if err != nil { return err } // impacts contains all the entities IDs impacted by resolved alarms impacts := []string{} isWatcher := map[string]bool{} for _, watcher := range watchers { isWatcher[watcher.ID] = true } for _, alarm := range alarms { impacts = append(impacts, alarm.Entity.Impacts.List()...) now := time.Now() entities, err := s.watcherAdapter.GetAnnotatedEntities() if err != nil { return err } // Get the watchers from the resolved alarms watchersWithD, err := s.watcherAdapter.GetWatchersWithDependencies(impacts) watcherCounters := map[string]AlarmCounters{} for _, entity := range entities { dependencyState := NewDependencyStateFromAnnotatedEntity( entity, isWatcher, now) if len(dependencyState.ImpactedWatchers) == 0 { continue } impactCounters, err := s.countService.ProcessState(dependencyState) if err != nil { log.Printf("Getting watchers and dependencies failed : %v", err) log.Printf("Unable to process state : %+v", err) } for watcherID, counters := range impactCounters { watcherCounters[watcherID] = counters } } for _, entry := range watchersWithD { err = s.computeAndSendEvent(entry, "") for _, watcher := range watchers { counters, exists := watcherCounters[watcher.ID] if exists { err := s.updateWatcherState(watcher, counters) if err != nil { log.Printf("Computing and sending event failed : %v", err) log.Printf("Unable to update watcher state %+v", err) } } } return nil }