Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel propagation #306

Closed

Conversation

fahadnaeemkhan
Copy link
Contributor

@fahadnaeemkhan fahadnaeemkhan commented Dec 1, 2023

Following three issues are addressed:
1-
errChan in gnmi_server.go.Subscribe() is only used for the case SubscriptionList_ONCE so moved it under SubscriptionList_ONCE case

2-
notification listener on a.c.Subscribe(sc.stream.Context(), ro) can't return on err
as this could block other senders on the same channel. Sender (subscription goroutine) closes the channel
just before returning (using defer) and that would do.

3-
in case of stream cancelation, error are popagated to gnmi_server.go.Subscribe()
for proper clean up.

karimra and others added 2 commits December 1, 2023 00:52
…ionList_ONCE.

notification listener on a.c.Subscribe(sc.stream.Context(), ro) can't return on err
as this could block other sender on the same channel. Sender closes the channel
after sending the error and that would do.

in case of stream cancelation, error are popagated to gnmi_server.go.Subscribe()
for proper clean up.
@karimra
Copy link
Collaborator

karimra commented Dec 10, 2023

2- notification listener on a.c.Subscribe(sc.stream.Context(), ro) can't return on err as this could block other senders on the same channel. Sender (subscription goroutine) closes the channel just before returning (using defer) and that would do.

Which other senders are you considering here ?

@fahadnaeemkhan
Copy link
Contributor Author

fahadnaeemkhan commented Dec 10, 2023

2- notification listener on a.c.Subscribe(sc.stream.Context(), ro) can't return on err as this could block other senders on the same channel. Sender (subscription goroutine) closes the channel just before returning (using defer) and that would do.

Which other senders are your considering here ?

in Cache we create goroutine per subscription and if we have more than one subscription then we are doing to have more that one cache side goroutine (senders) and anyone of them can stop the listener by sending error.

I think better way to stop listener is to close the channel when needed and this is happening once oc_cache.subscribe returns

func (gc *gnmiCache) subscribe(ctx context.Context, ro *ReadOpts, ch chan *Notification) {
	defer close(ch)
	switch ro.Mode {
	case ReadMode_Once:
		gc.handleSingleQuery(ctx, ro, ch)
	case ReadMode_StreamOnChange: // default:
		ro.SuppressRedundant = false
		gc.handleOnChangeQuery(ctx, ro, ch)
	case ReadMode_StreamSample:
		gc.handleSampledQuery(ctx, ro, ch)
	}
}

@karimra
Copy link
Collaborator

karimra commented Dec 10, 2023

2- notification listener on a.c.Subscribe(sc.stream.Context(), ro) can't return on err as this could block other senders on the same channel. Sender (subscription goroutine) closes the channel just before returning (using defer) and that would do.

Which other senders are your considering here ?

in Cache we create goroutine per subscription and if we have more than one subscription then we are doing to have more that one cache side goroutine (senders) and anyone of them can stop the listener by sending error.

I think better way to stop listener is to close the channel when needed and this is happening once oc_cache.subscribe returns

func (gc *gnmiCache) subscribe(ctx context.Context, ro *ReadOpts, ch chan *Notification) {
	defer close(ch)
	switch ro.Mode {
	case ReadMode_Once:
		gc.handleSingleQuery(ctx, ro, ch)
	case ReadMode_StreamOnChange: // default:
		ro.SuppressRedundant = false
		gc.handleOnChangeQuery(ctx, ro, ch)
	case ReadMode_StreamSample:
		gc.handleSampledQuery(ctx, ro, ch)
	}
}

Right but each subscription sends to a different channel. There is only one sender per channel, if it returns an error the channel will be closed right after.

@fahadnaeemkhan
Copy link
Contributor Author

fahadnaeemkhan commented Dec 10, 2023

2- notification listener on a.c.Subscribe(sc.stream.Context(), ro) can't return on err as this could block other senders on the same channel. Sender (subscription goroutine) closes the channel just before returning (using defer) and that would do.

Which other senders are your considering here ?

in Cache we create goroutine per subscription and if we have more than one subscription then we are doing to have more that one cache side goroutine (senders) and anyone of them can stop the listener by sending error.
I think better way to stop listener is to close the channel when needed and this is happening once oc_cache.subscribe returns

func (gc *gnmiCache) subscribe(ctx context.Context, ro *ReadOpts, ch chan *Notification) {
	defer close(ch)
	switch ro.Mode {
	case ReadMode_Once:
		gc.handleSingleQuery(ctx, ro, ch)
	case ReadMode_StreamOnChange: // default:
		ro.SuppressRedundant = false
		gc.handleOnChangeQuery(ctx, ro, ch)
	case ReadMode_StreamSample:
		gc.handleSampledQuery(ctx, ro, ch)
	}
}

Right but each subscription sends to a different channel. There is only one sender per channel, if it returns an error the channel will be closed right after.

I think each subscription sends to same channel ch:

func (gc *gnmiCache) handleSingleQuery(ctx context.Context, ro *ReadOpts, ch chan *Notification) {
	if gc.debug {
		gc.logger.Printf("running single query for target %q", ro.Target)
	}

	caches := gc.getCaches(ro.Subscription)

	if gc.debug {
		gc.logger.Printf("single query got %d caches", len(caches))
	}
	wg := new(sync.WaitGroup)
	wg.Add(len(caches))

	for name, c := range caches {
		go func(name string, c *subCache) {
			defer wg.Done()
			for _, p := range ro.Paths {
				fp, err := path.CompletePath(p, nil)
				if err != nil {
					gc.logger.Printf("failed to generate CompletePath from %v", p)
					ch <- &Notification{Name: name, Err: err}
					return
				}
				err = c.c.Query(ro.Target, fp,
					func(_ []string, l *ctree.Leaf, _ interface{}) error {
						if err != nil {
							return err
						}
						switch gl := l.Value().(type) {
						case *gnmi.Notification:
							if ro.OverrideTS {
								// override timestamp
								gl = proto.Clone(gl).(*gnmi.Notification)
								gl.Timestamp = time.Now().UnixNano()
							}
							//no suppress redundant, send to channel and return
							if !ro.SuppressRedundant {
								ch <- &Notification{Name: name, Notification: gl}
								return nil
							}
							// suppress redundant part
							if ro.lastSent == nil {
								ro.lastSent = make(map[string]*gnmi.TypedValue)
								ro.m = new(sync.RWMutex)
							}

							prefix := utils.GnmiPathToXPath(gl.GetPrefix(), true)
							target := gl.GetPrefix().GetTarget()
							for _, upd := range gl.GetUpdate() {
								path := utils.GnmiPathToXPath(upd.GetPath(), true)
								valXPath := strings.Join([]string{target, prefix, path}, "/")
								ro.m.RLock()
								sv, ok := ro.lastSent[valXPath]
								ro.m.RUnlock()
								if !ok || !proto.Equal(sv, upd.Val) {
									ch <- &Notification{
										Name: name,
										Notification: &gnmi.Notification{
											Timestamp: gl.GetTimestamp(),
											Prefix:    gl.GetPrefix(),
											Update:    []*gnmi.Update{upd},
										},
									}
									ro.m.Lock()
									ro.lastSent[valXPath] = upd.Val
									ro.m.Unlock()
								}
							}

							if gl.GetDelete() != nil {
								ch <- &Notification{
									Name: name,
									Notification: &gnmi.Notification{
										Timestamp: gl.GetTimestamp(),
										Prefix:    gl.GetPrefix(),
										Delete:    gl.GetDelete(),
									},
								}
							}
							return nil
						}
						return nil
					})
				if err != nil {
					gc.logger.Printf("target %q failed internal cache query: %v", ro.Target, err)
					ch <- &Notification{Name: name, Err: err}
					return
				}
			}
		}(name, c)
	}
	wg.Wait()
}

@karimra
Copy link
Collaborator

karimra commented Dec 10, 2023

The subscribe we are talking about is here.
In the case of the gnmiCache implementation it results in [this fn call](

2- notification listener on a.c.Subscribe(sc.stream.Context(), ro) can't return on err as this could block other senders on the same channel. Sender (subscription goroutine) closes the channel just before returning (using defer) and that would do.

Which other senders are your considering here ?

in Cache we create goroutine per subscription and if we have more than one subscription then we are doing to have more that one cache side goroutine (senders) and anyone of them can stop the listener by sending error.
I think better way to stop listener is to close the channel when needed and this is happening once oc_cache.subscribe returns

func (gc *gnmiCache) subscribe(ctx context.Context, ro *ReadOpts, ch chan *Notification) {
	defer close(ch)
	switch ro.Mode {
	case ReadMode_Once:
		gc.handleSingleQuery(ctx, ro, ch)
	case ReadMode_StreamOnChange: // default:
		ro.SuppressRedundant = false
		gc.handleOnChangeQuery(ctx, ro, ch)
	case ReadMode_StreamSample:
		gc.handleSampledQuery(ctx, ro, ch)
	}
}

Right but each subscription sends to a different channel. There is only one sender per channel, if it returns an error the channel will be closed right after.

I think each subscription sends to same channel ch:

func (gc *gnmiCache) handleSingleQuery(ctx context.Context, ro *ReadOpts, ch chan *Notification) {
	if gc.debug {
		gc.logger.Printf("running single query for target %q", ro.Target)
	}

	caches := gc.getCaches(ro.Subscription)

	if gc.debug {
		gc.logger.Printf("single query got %d caches", len(caches))
	}
	wg := new(sync.WaitGroup)
	wg.Add(len(caches))

	for name, c := range caches {
		go func(name string, c *subCache) {
			defer wg.Done()
			for _, p := range ro.Paths {
				fp, err := path.CompletePath(p, nil)
				if err != nil {
					gc.logger.Printf("failed to generate CompletePath from %v", p)
					ch <- &Notification{Name: name, Err: err}
					return
				}
				err = c.c.Query(ro.Target, fp,
					func(_ []string, l *ctree.Leaf, _ interface{}) error {
						if err != nil {
							return err
						}
						switch gl := l.Value().(type) {
						case *gnmi.Notification:
							if ro.OverrideTS {
								// override timestamp
								gl = proto.Clone(gl).(*gnmi.Notification)
								gl.Timestamp = time.Now().UnixNano()
							}
							//no suppress redundant, send to channel and return
							if !ro.SuppressRedundant {
								ch <- &Notification{Name: name, Notification: gl}
								return nil
							}
							// suppress redundant part
							if ro.lastSent == nil {
								ro.lastSent = make(map[string]*gnmi.TypedValue)
								ro.m = new(sync.RWMutex)
							}

							prefix := utils.GnmiPathToXPath(gl.GetPrefix(), true)
							target := gl.GetPrefix().GetTarget()
							for _, upd := range gl.GetUpdate() {
								path := utils.GnmiPathToXPath(upd.GetPath(), true)
								valXPath := strings.Join([]string{target, prefix, path}, "/")
								ro.m.RLock()
								sv, ok := ro.lastSent[valXPath]
								ro.m.RUnlock()
								if !ok || !proto.Equal(sv, upd.Val) {
									ch <- &Notification{
										Name: name,
										Notification: &gnmi.Notification{
											Timestamp: gl.GetTimestamp(),
											Prefix:    gl.GetPrefix(),
											Update:    []*gnmi.Update{upd},
										},
									}
									ro.m.Lock()
									ro.lastSent[valXPath] = upd.Val
									ro.m.Unlock()
								}
							}

							if gl.GetDelete() != nil {
								ch <- &Notification{
									Name: name,
									Notification: &gnmi.Notification{
										Timestamp: gl.GetTimestamp(),
										Prefix:    gl.GetPrefix(),
										Delete:    gl.GetDelete(),
									},
								}
							}
							return nil
						}
						return nil
					})
				if err != nil {
					gc.logger.Printf("target %q failed internal cache query: %v", ro.Target, err)
					ch <- &Notification{Name: name, Err: err}
					return
				}
			}
		}(name, c)
	}
	wg.Wait()
}

That is a single subscription that goes to multiple subcaches, in which case you are right multiple goroutines send to the same channel but that should not be solved there (return->continue). I think it should be solved within each cache implementation.
In the case of the gnmiCache maybe adding a ctx with cancel, cancel it when a subCache query sends an error and select on ctx.Done() as well as sending to the channel would solve it ?

@fahadnaeemkhan
Copy link
Contributor Author

The subscribe we are talking about is here. In the case of the gnmiCache implementation it results in [this fn call](

2- notification listener on a.c.Subscribe(sc.stream.Context(), ro) can't return on err as this could block other senders on the same channel. Sender (subscription goroutine) closes the channel just before returning (using defer) and that would do.

Which other senders are your considering here ?

in Cache we create goroutine per subscription and if we have more than one subscription then we are doing to have more that one cache side goroutine (senders) and anyone of them can stop the listener by sending error.
I think better way to stop listener is to close the channel when needed and this is happening once oc_cache.subscribe returns

func (gc *gnmiCache) subscribe(ctx context.Context, ro *ReadOpts, ch chan *Notification) {
	defer close(ch)
	switch ro.Mode {
	case ReadMode_Once:
		gc.handleSingleQuery(ctx, ro, ch)
	case ReadMode_StreamOnChange: // default:
		ro.SuppressRedundant = false
		gc.handleOnChangeQuery(ctx, ro, ch)
	case ReadMode_StreamSample:
		gc.handleSampledQuery(ctx, ro, ch)
	}
}

Right but each subscription sends to a different channel. There is only one sender per channel, if it returns an error the channel will be closed right after.

I think each subscription sends to same channel ch:

func (gc *gnmiCache) handleSingleQuery(ctx context.Context, ro *ReadOpts, ch chan *Notification) {
	if gc.debug {
		gc.logger.Printf("running single query for target %q", ro.Target)
	}

	caches := gc.getCaches(ro.Subscription)

	if gc.debug {
		gc.logger.Printf("single query got %d caches", len(caches))
	}
	wg := new(sync.WaitGroup)
	wg.Add(len(caches))

	for name, c := range caches {
		go func(name string, c *subCache) {
			defer wg.Done()
			for _, p := range ro.Paths {
				fp, err := path.CompletePath(p, nil)
				if err != nil {
					gc.logger.Printf("failed to generate CompletePath from %v", p)
					ch <- &Notification{Name: name, Err: err}
					return
				}
				err = c.c.Query(ro.Target, fp,
					func(_ []string, l *ctree.Leaf, _ interface{}) error {
						if err != nil {
							return err
						}
						switch gl := l.Value().(type) {
						case *gnmi.Notification:
							if ro.OverrideTS {
								// override timestamp
								gl = proto.Clone(gl).(*gnmi.Notification)
								gl.Timestamp = time.Now().UnixNano()
							}
							//no suppress redundant, send to channel and return
							if !ro.SuppressRedundant {
								ch <- &Notification{Name: name, Notification: gl}
								return nil
							}
							// suppress redundant part
							if ro.lastSent == nil {
								ro.lastSent = make(map[string]*gnmi.TypedValue)
								ro.m = new(sync.RWMutex)
							}

							prefix := utils.GnmiPathToXPath(gl.GetPrefix(), true)
							target := gl.GetPrefix().GetTarget()
							for _, upd := range gl.GetUpdate() {
								path := utils.GnmiPathToXPath(upd.GetPath(), true)
								valXPath := strings.Join([]string{target, prefix, path}, "/")
								ro.m.RLock()
								sv, ok := ro.lastSent[valXPath]
								ro.m.RUnlock()
								if !ok || !proto.Equal(sv, upd.Val) {
									ch <- &Notification{
										Name: name,
										Notification: &gnmi.Notification{
											Timestamp: gl.GetTimestamp(),
											Prefix:    gl.GetPrefix(),
											Update:    []*gnmi.Update{upd},
										},
									}
									ro.m.Lock()
									ro.lastSent[valXPath] = upd.Val
									ro.m.Unlock()
								}
							}

							if gl.GetDelete() != nil {
								ch <- &Notification{
									Name: name,
									Notification: &gnmi.Notification{
										Timestamp: gl.GetTimestamp(),
										Prefix:    gl.GetPrefix(),
										Delete:    gl.GetDelete(),
									},
								}
							}
							return nil
						}
						return nil
					})
				if err != nil {
					gc.logger.Printf("target %q failed internal cache query: %v", ro.Target, err)
					ch <- &Notification{Name: name, Err: err}
					return
				}
			}
		}(name, c)
	}
	wg.Wait()
}

That is a single subscription that goes to multiple subcaches, in which case you are right multiple goroutines send to the same channel but that should not be solved there (return->continue). I think it should be solved within each cache implementation. In the case of the gnmiCache maybe adding a ctx with cancel, cancel it when a subCache query sends an error and select on ctx.Done() as well as sending to the channel would solve it ?

So cleanup if any of the cache-goroutine hit the error?

@fahadnaeemkhan
Copy link
Contributor Author

fahadnaeemkhan commented Dec 10, 2023

On a separate note, what is the use of sc.errChan in gnmi_server.Subscribe as this is send only so no one is reading from it.

@karimra
Copy link
Collaborator

karimra commented Dec 15, 2023

So cleanup if any of the cache-goroutine hit the error?

yes

@karimra
Copy link
Collaborator

karimra commented Dec 15, 2023

On a separate note, what is the use of sc.errChan in gnmi_server.Subscribe as this is send only so no one is reading from it.

It probably can replace the errChan in that function, I have to find some time to look into that part of the code again.

@fahadnaeemkhan fahadnaeemkhan deleted the cancel_propagation branch December 16, 2023 00:10
@fahadnaeemkhan
Copy link
Contributor Author

PR got closed as i had to delete the branch.

New PR: #32

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants