diff --git a/src/app/collection-page/edit-collection-page/collection-source/collection-source-controls/collection-source-controls.component.ts b/src/app/collection-page/edit-collection-page/collection-source/collection-source-controls/collection-source-controls.component.ts index 185a1f938e..1e6e3dd511 100644 --- a/src/app/collection-page/edit-collection-page/collection-source/collection-source-controls/collection-source-controls.component.ts +++ b/src/app/collection-page/edit-collection-page/collection-source/collection-source-controls/collection-source-controls.component.ts @@ -1,4 +1,4 @@ -import { Component, Input, OnDestroy } from '@angular/core'; +import { Component, Input, OnDestroy, OnInit } from '@angular/core'; import { ScriptDataService } from '../../../../core/data/processes/script-data.service'; import { ContentSource } from '../../../../core/shared/content-source.model'; import { ProcessDataService } from '../../../../core/data/processes/process-data.service'; @@ -29,7 +29,7 @@ import { ContentSourceSetSerializer } from '../../../../core/shared/content-sour styleUrls: ['./collection-source-controls.component.scss'], templateUrl: './collection-source-controls.component.html', }) -export class CollectionSourceControlsComponent implements OnDestroy { +export class CollectionSourceControlsComponent implements OnInit, OnDestroy { /** * Should the controls be enabled. @@ -48,6 +48,7 @@ export class CollectionSourceControlsComponent implements OnDestroy { contentSource$: Observable; private subs: Subscription[] = []; + private autoRefreshIDs: string[] = []; testConfigRunning$ = new BehaviorSubject(false); importRunning$ = new BehaviorSubject(false); @@ -94,7 +95,10 @@ export class CollectionSourceControlsComponent implements OnDestroy { }), // filter out responses that aren't successful since the pinging of the process only needs to happen when the invocation was successful. filter((rd) => rd.hasSucceeded && hasValue(rd.payload)), - switchMap((rd) => this.processDataService.autoRefreshUntilCompletion(rd.payload.processId)), + switchMap((rd) => { + this.autoRefreshIDs.push(rd.payload.processId); + return this.processDataService.autoRefreshUntilCompletion(rd.payload.processId); + }), map((rd) => rd.payload) ).subscribe((process: Process) => { if (process.processStatus.toString() === ProcessStatus[ProcessStatus.FAILED].toString()) { @@ -135,7 +139,10 @@ export class CollectionSourceControlsComponent implements OnDestroy { } }), filter((rd) => rd.hasSucceeded && hasValue(rd.payload)), - switchMap((rd) => this.processDataService.autoRefreshUntilCompletion(rd.payload.processId)), + switchMap((rd) => { + this.autoRefreshIDs.push(rd.payload.processId); + return this.processDataService.autoRefreshUntilCompletion(rd.payload.processId); + }), map((rd) => rd.payload) ).subscribe((process) => { if (process.processStatus.toString() === ProcessStatus[ProcessStatus.FAILED].toString()) { @@ -170,7 +177,10 @@ export class CollectionSourceControlsComponent implements OnDestroy { } }), filter((rd) => rd.hasSucceeded && hasValue(rd.payload)), - switchMap((rd) => this.processDataService.autoRefreshUntilCompletion(rd.payload.processId)), + switchMap((rd) => { + this.autoRefreshIDs.push(rd.payload.processId); + return this.processDataService.autoRefreshUntilCompletion(rd.payload.processId); + }), map((rd) => rd.payload) ).subscribe((process) => { if (process.processStatus.toString() === ProcessStatus[ProcessStatus.FAILED].toString()) { @@ -191,5 +201,9 @@ export class CollectionSourceControlsComponent implements OnDestroy { sub.unsubscribe(); } }); + + this.autoRefreshIDs.forEach((id) => { + this.processDataService.stopAutoRefreshing(id); + }); } } diff --git a/src/app/core/data/processes/process-data.service.spec.ts b/src/app/core/data/processes/process-data.service.spec.ts index 3b663abae9..99cd317cdb 100644 --- a/src/app/core/data/processes/process-data.service.spec.ts +++ b/src/app/core/data/processes/process-data.service.spec.ts @@ -9,7 +9,7 @@ import { testFindAllDataImplementation } from '../base/find-all-data.spec'; import { ProcessDataService, TIMER_FACTORY } from './process-data.service'; import { testDeleteDataImplementation } from '../base/delete-data.spec'; -import { waitForAsync, TestBed, fakeAsync, tick, flush } from '@angular/core/testing'; +import { waitForAsync, TestBed, fakeAsync, tick } from '@angular/core/testing'; import { RequestService } from '../request.service'; import { RemoteData } from '../remote-data'; import { RequestEntryState } from '../request-entry-state.model'; @@ -27,6 +27,7 @@ import { testSearchDataImplementation } from '../base/search-data.spec'; import { PaginatedList } from '../paginated-list.model'; import { FindListOptions } from '../find-list-options.model'; import { of } from 'rxjs'; +import { getMockRequestService } from '../../../shared/mocks/request.service.mock'; describe('ProcessDataService', () => { let testScheduler; @@ -43,7 +44,7 @@ describe('ProcessDataService', () => { testSearchDataImplementation(initService); }); - let requestService; + let requestService = getMockRequestService(); let processDataService; let remoteDataBuildService; @@ -136,7 +137,7 @@ describe('ProcessDataService', () => { imports: [], providers: [ ProcessDataService, - { provide: RequestService, useValue: null }, + { provide: RequestService, useValue: requestService }, { provide: RemoteDataBuildService, useValue: null }, { provide: ObjectCacheService, useValue: null }, { provide: ReducerManager, useValue: null }, @@ -152,41 +153,41 @@ describe('ProcessDataService', () => { })); it('should refresh after the specified interval', fakeAsync(() => { - const runningProcess = Object.assign(new Process(), { - _links: { - self: { - href: 'https://rest.api/processes/123' - } + const runningProcess = Object.assign(new Process(), { + _links: { + self: { + href: 'https://rest.api/processes/123' } - }); - runningProcess.processStatus = ProcessStatus.RUNNING; + } + }); + runningProcess.processStatus = ProcessStatus.RUNNING; - const runningProcessPagination: PaginatedList = Object.assign(new PaginatedList(), { - page: [runningProcess], - _links: { - self: { - href: 'https://rest.api/processesList/456' - } + const runningProcessPagination: PaginatedList = Object.assign(new PaginatedList(), { + page: [runningProcess], + _links: { + self: { + href: 'https://rest.api/processesList/456' } - }); + } + }); - const runningProcessRD = new RemoteData(0, 0, 0, RequestEntryState.Success, null, runningProcessPagination); + const runningProcessRD = new RemoteData(0, 0, 0, RequestEntryState.Success, null, runningProcessPagination); - spyOn(processDataService, 'invalidateByHref'); - spyOn(processDataService, 'searchBy').and.returnValue( - of(runningProcessRD) - ); + spyOn(processDataService, 'searchBy').and.returnValue( + of(runningProcessRD) + ); - let sub = processDataService.autoRefreshingSearchBy('byProperty', new FindListOptions(), 200).subscribe(); - tick(0); - expect(processDataService.searchBy).toHaveBeenCalledTimes(1); - tick(450); - expect(processDataService.searchBy).toHaveBeenCalledTimes(3); - sub.unsubscribe(); + expect(processDataService.searchBy).toHaveBeenCalledTimes(0); + expect(requestService.setStaleByHrefSubstring).toHaveBeenCalledTimes(0); - flush(); + let sub = processDataService.autoRefreshingSearchBy('id', 'byProperty', new FindListOptions(), 200).subscribe(); + expect(processDataService.searchBy).toHaveBeenCalledTimes(1); - expect(processDataService.invalidateByHref).toHaveBeenCalledTimes(3); + tick(250); + + expect(requestService.setStaleByHrefSubstring).toHaveBeenCalledTimes(1); + + sub.unsubscribe(); })); }); }); diff --git a/src/app/core/data/processes/process-data.service.ts b/src/app/core/data/processes/process-data.service.ts index bf4d72d5b3..50a77c63a4 100644 --- a/src/app/core/data/processes/process-data.service.ts +++ b/src/app/core/data/processes/process-data.service.ts @@ -5,8 +5,8 @@ import { ObjectCacheService } from '../../cache/object-cache.service'; import { HALEndpointService } from '../../shared/hal-endpoint.service'; import { Process } from '../../../process-page/processes/process.model'; import { PROCESS } from '../../../process-page/processes/process.resource-type'; -import { Observable, timer as rxjsTimer, concatMap } from 'rxjs'; -import { switchMap, filter, distinctUntilChanged, find, tap } from 'rxjs/operators'; +import { Observable, Subscription } from 'rxjs'; +import { switchMap, filter, distinctUntilChanged, find } from 'rxjs/operators'; import { PaginatedList } from '../paginated-list.model'; import { Bitstream } from '../../shared/bitstream.model'; import { RemoteData } from '../remote-data'; @@ -19,7 +19,7 @@ import { dataService } from '../base/data-service.decorator'; import { DeleteData, DeleteDataImpl } from '../base/delete-data'; import { NotificationsService } from '../../../shared/notifications/notifications.service'; import { NoContent } from '../../shared/NoContent.model'; -import { getAllCompletedRemoteData, getFirstCompletedRemoteData } from '../../shared/operators'; +import { getAllCompletedRemoteData } from '../../shared/operators'; import { ProcessStatus } from 'src/app/process-page/processes/process-status.model'; import { hasValue } from '../../../shared/empty.util'; import { SearchData, SearchDataImpl } from '../base/search-data'; @@ -41,6 +41,7 @@ export class ProcessDataService extends IdentifiableDataService impleme private deleteData: DeleteData; private searchData: SearchData; protected activelyBeingPolled: Map = new Map(); + protected subs: Map = new Map(); constructor( protected requestService: RequestService, @@ -129,6 +130,9 @@ export class ProcessDataService extends IdentifiableDataService impleme } /** + * @param id The id for this auto-refreshing search. Used to stop + * auto-refreshing afterwards, and ensure we're not + * auto-refreshing the same thing multiple times. * @param searchMethod The search method for the Process * @param options The FindListOptions object * @param pollingIntervalInMs The interval by which the search will be repeated @@ -137,22 +141,41 @@ export class ProcessDataService extends IdentifiableDataService impleme * @return {Observable>>} * Return an observable that emits a paginated list of processes every interval */ - autoRefreshingSearchBy(searchMethod: string, options?: FindListOptions, pollingIntervalInMs: number = 5000, ...linksToFollow: FollowLinkConfig[]): Observable>> { - // Create observable that emits every pollingInterval - return rxjsTimer(0, pollingIntervalInMs).pipe( - concatMap(() => { - // Every time the timer emits, request the current state of the processes - return this.searchBy(searchMethod, options, false, false, ...linksToFollow).pipe( - getFirstCompletedRemoteData(), - tap((processListRD: RemoteData>) => { - // Once the response has been received, invalidate the response right before the next request - setTimeout(() => { - this.invalidateByHref(processListRD.payload._links.self.href); - }, Math.max(pollingIntervalInMs - 100, 0)); - }), - ); - }) + autoRefreshingSearchBy(id: string, searchMethod: string, options?: FindListOptions, pollingIntervalInMs: number = 5000, ...linksToFollow: FollowLinkConfig[]): Observable>> { + + const result$ = this.searchBy(searchMethod, options, true, true, ...linksToFollow).pipe( + getAllCompletedRemoteData() ); + + const sub = result$.pipe( + filter(() => + !this.activelyBeingPolled.has(id) + ) + ).subscribe((processListRd: RemoteData>) => { + this.clearCurrentTimeout(id); + const nextTimeout = this.timer(() => { + this.activelyBeingPolled.delete(id); + this.requestService.setStaleByHrefSubstring(processListRd.payload._links.self.href); + }, pollingIntervalInMs); + + this.activelyBeingPolled.set(id, nextTimeout); + }); + + this.subs.set(id, sub); + + return result$; + } + + /** + * Stop auto-refreshing the request with the given id + * @param id the id of the request to stop automatically refreshing + */ + stopAutoRefreshing(id: string) { + this.clearCurrentTimeout(id); + if (hasValue(this.subs.get(id))) { + this.subs.get(id).unsubscribe(); + this.subs.delete(id); + } } /** @@ -181,14 +204,15 @@ export class ProcessDataService extends IdentifiableDataService impleme } /** - * Clear the timeout for the given process, if that timeout exists + * Clear the timeout for the given id, if that timeout exists * @protected */ - protected clearCurrentTimeout(processId: string): void { - const timeout = this.activelyBeingPolled.get(processId); + protected clearCurrentTimeout(id: string): void { + const timeout = this.activelyBeingPolled.get(id); if (hasValue(timeout)) { clearTimeout(timeout); } + this.activelyBeingPolled.delete(id); } /** @@ -229,15 +253,15 @@ export class ProcessDataService extends IdentifiableDataService impleme this.activelyBeingPolled.set(processId, nextTimeout); }); + this.subs.set(processId, sub); + // When the process completes create a one off subscription (the `find` completes the // observable) that unsubscribes the previous one, removes the processId from the list of // processes being polled and clears any running timeouts process$.pipe( find((processRD: RemoteData) => ProcessDataService.hasCompletedOrFailed(processRD.payload)) ).subscribe(() => { - this.clearCurrentTimeout(processId); - this.activelyBeingPolled.delete(processId); - sub.unsubscribe(); + this.stopAutoRefreshing(processId); }); return process$.pipe( diff --git a/src/app/process-page/detail/process-detail.component.ts b/src/app/process-page/detail/process-detail.component.ts index 719b4d6e2b..af6b56dcb4 100644 --- a/src/app/process-page/detail/process-detail.component.ts +++ b/src/app/process-page/detail/process-detail.component.ts @@ -1,5 +1,5 @@ import { HttpClient } from '@angular/common/http'; -import { Component, Inject, NgZone, OnInit, PLATFORM_ID } from '@angular/core'; +import { Component, Inject, NgZone, OnInit, PLATFORM_ID, OnDestroy } from '@angular/core'; import { ActivatedRoute, Router } from '@angular/router'; import { BehaviorSubject, Observable } from 'rxjs'; import { finalize, map, switchMap, take, tap, find, startWith } from 'rxjs/operators'; @@ -36,7 +36,7 @@ import { PROCESS_PAGE_FOLLOW_LINKS } from '../process-page.resolver'; /** * A component displaying detailed information about a DSpace Process */ -export class ProcessDetailComponent implements OnInit { +export class ProcessDetailComponent implements OnInit, OnDestroy { /** * The AlertType enumeration @@ -80,6 +80,8 @@ export class ProcessDetailComponent implements OnInit { isRefreshing$: Observable; + protected autoRefreshingID: string; + /** * Reference to NgbModal */ @@ -108,7 +110,8 @@ export class ProcessDetailComponent implements OnInit { this.processRD$ = this.route.data.pipe( switchMap((data) => { if (isPlatformBrowser(this.platformId)) { - return this.processService.autoRefreshUntilCompletion(this.route.snapshot.params.id, 5000, ...PROCESS_PAGE_FOLLOW_LINKS); + this.autoRefreshingID = this.route.snapshot.params.id; + return this.processService.autoRefreshUntilCompletion(this.autoRefreshingID, 5000, ...PROCESS_PAGE_FOLLOW_LINKS); } else { return [data.process as RemoteData]; } @@ -128,6 +131,15 @@ export class ProcessDetailComponent implements OnInit { ); } + /** + * Make sure the autoRefreshUntilCompletion is cleaned up properly + */ + ngOnDestroy() { + if (hasValue(this.autoRefreshingID)) { + this.processService.stopAutoRefreshing(this.autoRefreshingID); + } + } + /** * Get the name of a bitstream * @param bitstream diff --git a/src/app/process-page/overview/process-overview.service.ts b/src/app/process-page/overview/process-overview.service.ts index 73c7c1529f..78287ca182 100644 --- a/src/app/process-page/overview/process-overview.service.ts +++ b/src/app/process-page/overview/process-overview.service.ts @@ -62,12 +62,21 @@ export class ProcessOverviewService { }, findListOptions); if (hasValue(autoRefreshingIntervalInMs) && autoRefreshingIntervalInMs > 0) { - return this.processDataService.autoRefreshingSearchBy('byProperty', options, autoRefreshingIntervalInMs); + this.processDataService.stopAutoRefreshing(processStatus); + return this.processDataService.autoRefreshingSearchBy(processStatus, 'byProperty', options, autoRefreshingIntervalInMs); } else { return this.processDataService.searchBy('byProperty', options); } } + /** + * Stop auto-refreshing the process with the given status + * @param processStatus the processStatus of the request to stop automatically refreshing + */ + stopAutoRefreshing(processStatus: ProcessStatus) { + this.processDataService.stopAutoRefreshing(processStatus); + } + /** * Map the provided paginationOptions to FindListOptions * @param paginationOptions the PaginationComponentOptions to map diff --git a/src/app/process-page/overview/table/process-overview-table.component.spec.ts b/src/app/process-page/overview/table/process-overview-table.component.spec.ts index aa073f291c..39520fa923 100644 --- a/src/app/process-page/overview/table/process-overview-table.component.spec.ts +++ b/src/app/process-page/overview/table/process-overview-table.component.spec.ts @@ -15,18 +15,20 @@ import { VarDirective } from '../../../shared/utils/var.directive'; import { TranslateModule } from '@ngx-translate/core'; import { RouterTestingModule } from '@angular/router/testing'; import { PaginationService } from '../../../core/pagination/pagination.service'; -import { NO_ERRORS_SCHEMA } from '@angular/core'; import { By } from '@angular/platform-browser'; import { AuthService } from '../../../core/auth/auth.service'; import { AuthServiceMock } from '../../../shared/mocks/auth.service.mock'; import { RouteService } from '../../../core/services/route.service'; import { routeServiceStub } from '../../../shared/testing/route-service.stub'; +import { ProcessOverviewService } from '../process-overview.service'; +import { take } from 'rxjs/operators'; describe('ProcessOverviewTableComponent', () => { let component: ProcessOverviewTableComponent; let fixture: ComponentFixture; + let processOverviewService: ProcessOverviewService; let processService: ProcessDataService; let ePersonService: EPersonDataService; let paginationService; // : PaginationService; Not typed as the stub does not fully implement PaginationService @@ -78,8 +80,16 @@ describe('ProcessOverviewTableComponent', () => { ] } }); + processOverviewService = jasmine.createSpyObj('processOverviewService', { + getFindListOptions: { + currentPage: 1, + elementsPerPage: 5, + sort: 'creationTime' + }, + getProcessesByProcessStatus: createSuccessfulRemoteDataObject$(createPaginatedList(processes)).pipe(take(1)) + }); processService = jasmine.createSpyObj('processService', { - searchBy: createSuccessfulRemoteDataObject$(createPaginatedList(processes)) + searchBy: createSuccessfulRemoteDataObject$(createPaginatedList(processes)).pipe(take(1)) }); ePersonService = jasmine.createSpyObj('ePersonService', { findById: createSuccessfulRemoteDataObject$(ePerson) @@ -117,6 +127,7 @@ describe('ProcessOverviewTableComponent', () => { declarations: [ProcessOverviewTableComponent, VarDirective, NgbCollapse], imports: [TranslateModule.forRoot(), RouterTestingModule.withRoutes([])], providers: [ + { provide: ProcessOverviewService, useValue: processOverviewService }, { provide: ProcessDataService, useValue: processService }, { provide: EPersonDataService, useValue: ePersonService }, { provide: PaginationService, useValue: paginationService }, @@ -125,7 +136,6 @@ describe('ProcessOverviewTableComponent', () => { { provide: AuthService, useValue: authService }, { provide: RouteService, useValue: routeService }, ], - schemas: [NO_ERRORS_SCHEMA] }).compileComponents(); })); diff --git a/src/app/process-page/overview/table/process-overview-table.component.ts b/src/app/process-page/overview/table/process-overview-table.component.ts index 31ac041adc..7bd5c02b43 100644 --- a/src/app/process-page/overview/table/process-overview-table.component.ts +++ b/src/app/process-page/overview/table/process-overview-table.component.ts @@ -1,20 +1,21 @@ -import { Component, Input, OnInit, Inject, PLATFORM_ID } from '@angular/core'; +import { Component, Input, OnInit, Inject, PLATFORM_ID, OnDestroy } from '@angular/core'; import { ProcessStatus } from '../../processes/process-status.model'; -import { Observable, mergeMap, from as observableFrom } from 'rxjs'; +import { Observable, mergeMap, from as observableFrom, BehaviorSubject, Subscription } from 'rxjs'; import { RemoteData } from '../../../core/data/remote-data'; import { PaginatedList } from '../../../core/data/paginated-list.model'; import { Process } from '../../processes/process.model'; -import { PaginationComponentOptions } from '../../../shared/pagination/pagination-component-options.model'; +import { + PaginationComponentOptions +} from '../../../shared/pagination/pagination-component-options.model'; import { ProcessOverviewService, ProcessSortField } from '../process-overview.service'; import { ProcessBulkDeleteService } from '../process-bulk-delete.service'; import { EPersonDataService } from '../../../core/eperson/eperson-data.service'; import { DSONameService } from '../../../core/breadcrumbs/dso-name.service'; import { getFirstSucceededRemoteDataPayload, - getFirstCompletedRemoteData, getAllCompletedRemoteData } from '../../../core/shared/operators'; -import { map, switchMap, toArray, take } from 'rxjs/operators'; +import { map, switchMap, toArray, take, filter } from 'rxjs/operators'; import { EPerson } from '../../../core/eperson/models/eperson.model'; import { PaginationService } from 'src/app/core/pagination/pagination.service'; import { FindListOptions } from '../../../core/data/find-list-options.model'; @@ -23,6 +24,7 @@ import { Router } from '@angular/router'; import { AuthService } from '../../../core/auth/auth.service'; import { isPlatformBrowser } from '@angular/common'; import { RouteService } from '../../../core/services/route.service'; +import { hasValue } from '../../../shared/empty.util'; const NEW_PROCESS_PARAM = 'new_process_id'; @@ -41,7 +43,7 @@ export interface ProcessOverviewTableEntry { styleUrls: ['./process-overview-table.component.scss'], templateUrl: './process-overview-table.component.html' }) -export class ProcessOverviewTableComponent implements OnInit { +export class ProcessOverviewTableComponent implements OnInit, OnDestroy { /** * The status of the processes this sections should show @@ -74,7 +76,7 @@ export class ProcessOverviewTableComponent implements OnInit { /** * List of processes and their info to be shown in this table */ - processesRD$: Observable>>; + processesRD$: BehaviorSubject>>; /** * The pagination ID for this overview section @@ -96,6 +98,11 @@ export class ProcessOverviewTableComponent implements OnInit { */ newProcessId: string; + /** + * List of subscriptions + */ + subs: Subscription[] = []; + constructor(protected processOverviewService: ProcessOverviewService, protected processBulkDeleteService: ProcessBulkDeleteService, protected ePersonDataService: EPersonDataService, @@ -131,6 +138,8 @@ export class ProcessOverviewTableComponent implements OnInit { // Get the current pagination from the route this.paginationOptions$ = this.paginationService.getCurrentPagination(this.paginationId, defaultPaginationOptions); + this.processesRD$ = new BehaviorSubject(undefined); + // Once we have the pagination, retrieve the processes matching the process type and the pagination // // Reasoning why this monstrosity is the way it is: @@ -145,7 +154,7 @@ export class ProcessOverviewTableComponent implements OnInit { // between the update of the paginatedList and the entryArray. This results in the processOverviewPage showing // no processes for a split second every time the processes are updated which in turn causes the different // sections of the page to jump around. By combining these and causing the page to update only once this is avoided. - this.processesRD$ = this.paginationOptions$ + this.subs.push(this.paginationOptions$ .pipe( // Map the paginationOptions to findListOptions map((paginationOptions: PaginationComponentOptions) => @@ -187,16 +196,21 @@ export class ProcessOverviewTableComponent implements OnInit { ); }), - ); + ).subscribe((next: RemoteData>) => { + this.processesRD$.next(next); + })); // Collapse this section when the number of processes is zero the first time processes are retrieved - this.processesRD$.pipe(take(1)).subscribe( + this.subs.push(this.processesRD$.pipe( + filter((processListRd: RemoteData>) => hasValue(processListRd)), + take(1), + ).subscribe( (processesRD: RemoteData>) => { if (!(processesRD.payload.totalElements > 0)) { this.isCollapsed = true; } } - ); + )); } @@ -225,4 +239,11 @@ export class ProcessOverviewTableComponent implements OnInit { } } -} + ngOnDestroy(): void { + this.subs + .filter((sub) => hasValue(sub)) + .forEach((sub) => sub.unsubscribe()); + this.processOverviewService.stopAutoRefreshing(this.processStatus); + } + + }