Skip to content

Commit

Permalink
[NIFI-13318] processor stop and configure UX (apache#9548)
Browse files Browse the repository at this point in the history
* [NIFI-13318] processor stop and configure UX

* handle invalid run status

* display validation errors

* invalid icon and tooltip alternative placement

* remove validation errors tooltip, move invalid icon into status button, update edit processor entity and readonly updates

* restore MAT_DIALOG_DATA and only enable/disable form controls via api

* clean up

* only allow updates by current client and poll until stopped and no active threads

* update menu options to display when available

* display processor bulletins

* align dialog header text and run status button

* update filter for incoming updated entities and submit appropriate revision on run status changes

* disable button when stopping

* code clean up

* update method name

* update error message

* update types

* add types and cleanup

* move run status action button

* review feedback

* update run status action button to consider when user cannot operate processor

* update to also handle disabled run status when user does not have operate

* clean up pollingProcessor

* disable button when stopping

* prettier

* prettier

* readd thread count

* poll when necessary

This closes apache#9548
  • Loading branch information
scottyaslan authored Dec 10, 2024
1 parent a5086a9 commit 0271e92
Show file tree
Hide file tree
Showing 17 changed files with 780 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ public void verifyRevision(final Revision revision, final NiFiUser user) {
return;
}

throw new InvalidRevisionException(revision + " is not the most up-to-date revision. This component appears to have been modified");
throw new InvalidRevisionException(revision + " is not the most up-to-date revision. This component appears to have been modified. Retrieve the most up-to-date revision and try again.");
}

@Override
Expand Down
1 change: 1 addition & 0 deletions nifi-frontend/src/main/frontend/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ yarn-error.log
/libpeerconnection.log
testem.log
/typings
/.tool-versions

# System files
.DS_Store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ import {
StartComponentRequest,
StartComponentResponse,
StartComponentsRequest,
StartPollingProcessorUntilStoppedRequest,
StartProcessGroupRequest,
StartProcessGroupResponse,
StopComponentRequest,
Expand Down Expand Up @@ -776,6 +777,20 @@ export const pollChangeVersionSuccess = createAction(

export const stopPollingChangeVersion = createAction(`${CANVAS_PREFIX} Stop Polling Change Version`);

export const startPollingProcessorUntilStopped = createAction(
`${CANVAS_PREFIX} Start Polling Processor Until Stopped`,
props<{ request: StartPollingProcessorUntilStoppedRequest }>()
);

export const pollProcessorUntilStopped = createAction(`${CANVAS_PREFIX} Poll Processor Until Stopped`);

export const pollProcessorUntilStoppedSuccess = createAction(
`${CANVAS_PREFIX} Poll Processor Until Stopped Success`,
props<{ response: LoadProcessorSuccess }>()
);

export const stopPollingProcessor = createAction(`${CANVAS_PREFIX} Stop Polling Processor`);

export const openSaveVersionDialog = createAction(
`${CANVAS_PREFIX} Open Save Flow Version Dialog`,
props<{ request: SaveVersionDialogRequest }>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import {
CreateConnectionDialogRequest,
CreateProcessGroupDialogRequest,
DeleteComponentResponse,
DisableComponentRequest,
EnableComponentRequest,
GroupComponentsDialogRequest,
ImportFromRegistryDialogRequest,
LoadProcessGroupResponse,
Expand All @@ -56,6 +58,8 @@ import {
SaveVersionRequest,
SelectedComponent,
Snippet,
StartComponentRequest,
StopComponentRequest,
StopVersionControlRequest,
StopVersionControlResponse,
UpdateComponentFailure,
Expand All @@ -80,6 +84,7 @@ import {
selectParentProcessGroupId,
selectProcessGroup,
selectProcessor,
selectPollingProcessor,
selectRefreshRpgDetails,
selectRemoteProcessGroup,
selectSaving,
Expand Down Expand Up @@ -160,6 +165,13 @@ import { selectDocumentVisibilityState } from '../../../../state/document-visibi
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { DocumentVisibility } from '../../../../state/document-visibility';
import { ErrorContextKey } from '../../../../state/error';
import {
disableComponent,
enableComponent,
startComponent,
startPollingProcessorUntilStopped,
stopComponent
} from './flow.actions';
import { CopyPasteService } from '../../service/copy-paste.service';
import { selectCopiedContent } from '../../../../state/copy/copy.selectors';
import { CopyRequestContext, CopyResponseContext } from '../../../../state/copy';
Expand Down Expand Up @@ -1428,6 +1440,7 @@ export class FlowEffects {
}),
tap(([request, parameterContext, processGroupId]) => {
const processorId: string = request.entity.id;
let runStatusChanged: boolean = false;

const editDialogReference = this.dialog.open(EditProcessor, {
...XL_DIALOG,
Expand Down Expand Up @@ -1555,6 +1568,116 @@ export class FlowEffects {
})
);
});
const startPollingIfNecessary = (processorEntity: any): boolean => {
if (
(processorEntity.status.aggregateSnapshot.runStatus === 'Stopped' &&
processorEntity.status.aggregateSnapshot.activeThreadCount > 0) ||
processorEntity.status.aggregateSnapshot.runStatus === 'Validating'
) {
this.store.dispatch(
startPollingProcessorUntilStopped({
request: {
id: processorEntity.id
}
})
);
return true;
}

return false;
};

const pollingStarted = startPollingIfNecessary(request.entity);

this.store
.select(selectProcessor(processorId))
.pipe(
takeUntil(editDialogReference.afterClosed()),
isDefinedAndNotNull(),
filter((processorEntity) => {
return (
(runStatusChanged || pollingStarted) &&
processorEntity.revision.clientId === this.client.getClientId()
);
}),
concatLatestFrom(() => this.store.select(selectPollingProcessor))
)
.subscribe(([processorEntity, pollingProcessor]) => {
editDialogReference.componentInstance.processorUpdates = processorEntity;

// if we're already polling we do not want to start polling again
if (!pollingProcessor) {
startPollingIfNecessary(processorEntity);
}
});

editDialogReference.componentInstance.stopComponentRequest
.pipe(takeUntil(editDialogReference.afterClosed()))
.subscribe((stopComponentRequest: StopComponentRequest) => {
runStatusChanged = true;
this.store.dispatch(
stopComponent({
request: {
id: stopComponentRequest.id,
uri: stopComponentRequest.uri,
type: ComponentType.Processor,
revision: stopComponentRequest.revision,
errorStrategy: 'snackbar'
}
})
);
});

editDialogReference.componentInstance.disableComponentRequest
.pipe(takeUntil(editDialogReference.afterClosed()))
.subscribe((disableComponentsRequest: DisableComponentRequest) => {
runStatusChanged = true;
this.store.dispatch(
disableComponent({
request: {
id: disableComponentsRequest.id,
uri: disableComponentsRequest.uri,
type: ComponentType.Processor,
revision: disableComponentsRequest.revision,
errorStrategy: 'snackbar'
}
})
);
});

editDialogReference.componentInstance.enableComponentRequest
.pipe(takeUntil(editDialogReference.afterClosed()))
.subscribe((enableComponentsRequest: EnableComponentRequest) => {
runStatusChanged = true;
this.store.dispatch(
enableComponent({
request: {
id: enableComponentsRequest.id,
uri: enableComponentsRequest.uri,
type: ComponentType.Processor,
revision: enableComponentsRequest.revision,
errorStrategy: 'snackbar'
}
})
);
});

editDialogReference.componentInstance.startComponentRequest
.pipe(takeUntil(editDialogReference.afterClosed()))
.subscribe((startComponentRequest: StartComponentRequest) => {
runStatusChanged = true;
this.store.dispatch(
startComponent({
request: {
id: startComponentRequest.id,
uri: startComponentRequest.uri,
type: ComponentType.Processor,
revision: startComponentRequest.revision,
errorStrategy: 'snackbar'
}
})
);
});

editDialogReference.afterClosed().subscribe((response) => {
this.store.dispatch(resetPropertyVerificationState());
Expand All @@ -1578,6 +1701,57 @@ export class FlowEffects {
{ dispatch: false }
);

startPollingProcessorUntilStopped = createEffect(() =>
this.actions$.pipe(
ofType(FlowActions.startPollingProcessorUntilStopped),
switchMap(() =>
interval(2000, asyncScheduler).pipe(
takeUntil(this.actions$.pipe(ofType(FlowActions.stopPollingProcessor)))
)
),
switchMap(() => of(FlowActions.pollProcessorUntilStopped()))
)
);

pollProcessorUntilStopped$ = createEffect(() =>
this.actions$.pipe(
ofType(FlowActions.pollProcessorUntilStopped),
concatLatestFrom(() => [this.store.select(selectPollingProcessor).pipe(isDefinedAndNotNull())]),
switchMap(([, pollingProcessor]) => {
return from(
this.flowService.getProcessor(pollingProcessor.id).pipe(
map((response) =>
FlowActions.pollProcessorUntilStoppedSuccess({
response: {
id: pollingProcessor.id,
processor: response
}
})
),
catchError((errorResponse: HttpErrorResponse) => {
this.store.dispatch(FlowActions.stopPollingProcessor());
return of(this.snackBarOrFullScreenError(errorResponse));
})
)
);
})
)
);

pollProcessorUntilStoppedSuccess$ = createEffect(() =>
this.actions$.pipe(
ofType(FlowActions.pollProcessorUntilStoppedSuccess),
map((action) => action.response),
filter((response) => {
return (
response.processor.status.runStatus === 'Stopped' &&
response.processor.status.aggregateSnapshot.activeThreadCount === 0
);
}),
switchMap(() => of(FlowActions.stopPollingProcessor()))
)
);

openEditConnectionDialog$ = createEffect(
() =>
this.actions$.pipe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import {
navigateWithoutTransform,
pasteSuccess,
pollChangeVersionSuccess,
pollProcessorUntilStoppedSuccess,
pollRevertChangesSuccess,
requestRefreshRemoteProcessGroup,
resetFlowState,
Expand All @@ -68,10 +69,12 @@ import {
setTransitionRequired,
startComponent,
startComponentSuccess,
startPollingProcessorUntilStopped,
startProcessGroupSuccess,
startRemoteProcessGroupPolling,
stopComponent,
stopComponentSuccess,
stopPollingProcessor,
stopProcessGroupSuccess,
stopRemoteProcessGroupPolling,
stopVersionControl,
Expand All @@ -92,6 +95,7 @@ import { produce } from 'immer';
export const initialState: FlowState = {
id: 'root',
changeVersionRequest: null,
pollingProcessor: null,
flow: {
revision: {
version: 0
Expand Down Expand Up @@ -297,7 +301,7 @@ export const flowReducer = createReducer(
}
});
}),
on(loadProcessorSuccess, (state, { response }) => {
on(loadProcessorSuccess, pollProcessorUntilStoppedSuccess, (state, { response }) => {
return produce(state, (draftState) => {
const proposedProcessor = response.processor;
const componentIndex: number = draftState.flow.processGroupFlow.flow.processors.findIndex(
Expand Down Expand Up @@ -373,6 +377,14 @@ export const flowReducer = createReducer(
saving: false,
versionSaving: false
})),
on(startPollingProcessorUntilStopped, (state, { request }) => ({
...state,
pollingProcessor: request
})),
on(stopPollingProcessor, (state) => ({
...state,
pollingProcessor: null
})),
on(
createProcessor,
createProcessGroup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ export const selectChangeVersionRequest = createSelector(
(state: FlowState) => state.changeVersionRequest
);

export const selectPollingProcessor = createSelector(selectFlowState, (state: FlowState) => state.pollingProcessor);

export const selectSaving = createSelector(selectFlowState, (state: FlowState) => state.saving);

export const selectVersionSaving = createSelector(selectFlowState, (state: FlowState) => state.versionSaving);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,7 @@ export interface FlowState {
flowAnalysisOpen: boolean;
versionSaving: boolean;
changeVersionRequest: FlowUpdateRequestEntity | null;
pollingProcessor: StartPollingProcessorUntilStoppedRequest | null;
status: 'pending' | 'loading' | 'success' | 'complete';
}

Expand Down Expand Up @@ -792,6 +793,10 @@ export interface StopComponentRequest {
errorStrategy: 'snackbar' | 'banner';
}

export interface StartPollingProcessorUntilStoppedRequest {
id: string;
}

export interface StopProcessGroupRequest {
id: string;
type: ComponentType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@
neutral,
map.get(map.get($config, neutral), lighter)
);
$material-theme-neutral-palette-default: mat.get-theme-color(
$material-theme,
neutral,
map.get(map.get($config, neutral), default)
);

$material-theme-primary-palette-default: mat.get-theme-color(
$material-theme,
Expand Down
Loading

0 comments on commit 0271e92

Please sign in to comment.