@@ -373,7 +373,7 @@ void *run_sort(void *v) {
373
373
return NULL ;
374
374
}
375
375
376
- void do_read_parallel (char *map, long long len, long long initial_offset, const char *reading, struct reader *reader, volatile long long *progress_seq, std::set<std::string> *exclude, std::set<std::string> *include, int exclude_all, char *fname, int basezoom, int source, int nlayers, std::vector<std::map<std::string, layermap_entry> > *layermaps, double droprate, int *initialized, unsigned *initial_x, unsigned *initial_y, int maxzoom, std::string layername, bool uses_gamma, std::map<std::string, int > const *attribute_types, int separator, double *dist_sum, size_t *dist_count, bool want_dist) {
376
+ void do_read_parallel (char *map, long long len, long long initial_offset, const char *reading, struct reader *reader, volatile long long *progress_seq, std::set<std::string> *exclude, std::set<std::string> *include, int exclude_all, char *fname, int basezoom, int source, int nlayers, std::vector<std::map<std::string, layermap_entry> > *layermaps, double droprate, int *initialized, unsigned *initial_x, unsigned *initial_y, int maxzoom, std::string layername, bool uses_gamma, std::map<std::string, int > const *attribute_types, int separator, double *dist_sum, size_t *dist_count, bool want_dist, bool filters ) {
377
377
long long segs[CPUS + 1 ];
378
378
segs[0 ] = 0 ;
379
379
segs[CPUS] = len;
@@ -439,6 +439,7 @@ void do_read_parallel(char *map, long long len, long long initial_offset, const
439
439
pja[i].dist_sum = &(dist_sums[i]);
440
440
pja[i].dist_count = &(dist_counts[i]);
441
441
pja[i].want_dist = want_dist;
442
+ pja[i].filters = filters;
442
443
443
444
if (pthread_create (&pthreads[i], NULL , run_parse_json, &pja[i]) != 0 ) {
444
445
perror (" pthread_create" );
@@ -490,6 +491,7 @@ struct read_parallel_arg {
490
491
double *dist_sum;
491
492
size_t *dist_count;
492
493
bool want_dist;
494
+ bool filters;
493
495
};
494
496
495
497
void *run_read_parallel (void *v) {
@@ -511,7 +513,7 @@ void *run_read_parallel(void *v) {
511
513
}
512
514
madvise (map, rpa->len , MADV_RANDOM); // sequential, but from several pointers at once
513
515
514
- do_read_parallel (map, rpa->len , rpa->offset , rpa->reading , rpa->reader , rpa->progress_seq , rpa->exclude , rpa->include , rpa->exclude_all , rpa->fname , rpa->basezoom , rpa->source , rpa->nlayers , rpa->layermaps , rpa->droprate , rpa->initialized , rpa->initial_x , rpa->initial_y , rpa->maxzoom , rpa->layername , rpa->uses_gamma , rpa->attribute_types , rpa->separator , rpa->dist_sum , rpa->dist_count , rpa->want_dist );
516
+ do_read_parallel (map, rpa->len , rpa->offset , rpa->reading , rpa->reader , rpa->progress_seq , rpa->exclude , rpa->include , rpa->exclude_all , rpa->fname , rpa->basezoom , rpa->source , rpa->nlayers , rpa->layermaps , rpa->droprate , rpa->initialized , rpa->initial_x , rpa->initial_y , rpa->maxzoom , rpa->layername , rpa->uses_gamma , rpa->attribute_types , rpa->separator , rpa->dist_sum , rpa->dist_count , rpa->want_dist , rpa-> filters );
515
517
516
518
madvise (map, rpa->len , MADV_DONTNEED);
517
519
if (munmap (map, rpa->len ) != 0 ) {
@@ -528,7 +530,7 @@ void *run_read_parallel(void *v) {
528
530
return NULL ;
529
531
}
530
532
531
- void start_parsing (int fd, FILE *fp, long long offset, long long len, volatile int *is_parsing, pthread_t *parallel_parser, bool &parser_created, const char *reading, struct reader *reader, volatile long long *progress_seq, std::set<std::string> *exclude, std::set<std::string> *include, int exclude_all, char *fname, int basezoom, int source, int nlayers, std::vector<std::map<std::string, layermap_entry> > &layermaps, double droprate, int *initialized, unsigned *initial_x, unsigned *initial_y, int maxzoom, std::string layername, bool uses_gamma, std::map<std::string, int > const *attribute_types, int separator, double *dist_sum, size_t *dist_count, bool want_dist) {
533
+ void start_parsing (int fd, FILE *fp, long long offset, long long len, volatile int *is_parsing, pthread_t *parallel_parser, bool &parser_created, const char *reading, struct reader *reader, volatile long long *progress_seq, std::set<std::string> *exclude, std::set<std::string> *include, int exclude_all, char *fname, int basezoom, int source, int nlayers, std::vector<std::map<std::string, layermap_entry> > &layermaps, double droprate, int *initialized, unsigned *initial_x, unsigned *initial_y, int maxzoom, std::string layername, bool uses_gamma, std::map<std::string, int > const *attribute_types, int separator, double *dist_sum, size_t *dist_count, bool want_dist, bool filters ) {
532
534
// This has to kick off an intermediate thread to start the parser threads,
533
535
// so the main thread can get back to reading the next input stage while
534
536
// the intermediate thread waits for the completion of the parser threads.
@@ -570,6 +572,7 @@ void start_parsing(int fd, FILE *fp, long long offset, long long len, volatile i
570
572
rpa->dist_sum = dist_sum;
571
573
rpa->dist_count = dist_count;
572
574
rpa->want_dist = want_dist;
575
+ rpa->filters = filters;
573
576
574
577
if (pthread_create (parallel_parser, NULL , run_read_parallel, rpa) != 0 ) {
575
578
perror (" pthread_create" );
@@ -1257,7 +1260,7 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
1257
1260
}
1258
1261
1259
1262
if (map != NULL && map != MAP_FAILED && read_parallel_this) {
1260
- do_read_parallel (map, st.st_size - off, overall_offset, reading.c_str (), reader, &progress_seq, exclude, include, exclude_all, fname, basezoom, layer, nlayers, &layermaps, droprate, initialized, initial_x, initial_y, maxzoom, sources[layer].layer , uses_gamma, attribute_types, read_parallel_this, &dist_sum, &dist_count, guess_maxzoom);
1263
+ do_read_parallel (map, st.st_size - off, overall_offset, reading.c_str (), reader, &progress_seq, exclude, include, exclude_all, fname, basezoom, layer, nlayers, &layermaps, droprate, initialized, initial_x, initial_y, maxzoom, sources[layer].layer , uses_gamma, attribute_types, read_parallel_this, &dist_sum, &dist_count, guess_maxzoom, prefilter != NULL || postfilter != NULL );
1261
1264
overall_offset += st.st_size - off;
1262
1265
checkdisk (reader, CPUS);
1263
1266
@@ -1333,7 +1336,7 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
1333
1336
}
1334
1337
1335
1338
fflush (readfp);
1336
- start_parsing (readfd, readfp, initial_offset, ahead, &is_parsing, ¶llel_parser, parser_created, reading.c_str (), reader, &progress_seq, exclude, include, exclude_all, fname, basezoom, layer, nlayers, layermaps, droprate, initialized, initial_x, initial_y, maxzoom, sources[layer].layer , gamma != 0 , attribute_types, read_parallel_this, &dist_sum, &dist_count, guess_maxzoom);
1339
+ start_parsing (readfd, readfp, initial_offset, ahead, &is_parsing, ¶llel_parser, parser_created, reading.c_str (), reader, &progress_seq, exclude, include, exclude_all, fname, basezoom, layer, nlayers, layermaps, droprate, initialized, initial_x, initial_y, maxzoom, sources[layer].layer , gamma != 0 , attribute_types, read_parallel_this, &dist_sum, &dist_count, guess_maxzoom, prefilter != NULL || postfilter != NULL );
1337
1340
1338
1341
initial_offset += ahead;
1339
1342
overall_offset += ahead;
@@ -1370,7 +1373,7 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
1370
1373
fflush (readfp);
1371
1374
1372
1375
if (ahead > 0 ) {
1373
- start_parsing (readfd, readfp, initial_offset, ahead, &is_parsing, ¶llel_parser, parser_created, reading.c_str (), reader, &progress_seq, exclude, include, exclude_all, fname, basezoom, layer, nlayers, layermaps, droprate, initialized, initial_x, initial_y, maxzoom, sources[layer].layer , gamma != 0 , attribute_types, read_parallel_this, &dist_sum, &dist_count, guess_maxzoom);
1376
+ start_parsing (readfd, readfp, initial_offset, ahead, &is_parsing, ¶llel_parser, parser_created, reading.c_str (), reader, &progress_seq, exclude, include, exclude_all, fname, basezoom, layer, nlayers, layermaps, droprate, initialized, initial_x, initial_y, maxzoom, sources[layer].layer , gamma != 0 , attribute_types, read_parallel_this, &dist_sum, &dist_count, guess_maxzoom, prefilter != NULL || postfilter != NULL );
1374
1377
1375
1378
if (parser_created) {
1376
1379
if (pthread_join (parallel_parser, NULL ) != 0 ) {
@@ -1387,7 +1390,7 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
1387
1390
1388
1391
long long layer_seq = overall_offset;
1389
1392
json_pull *jp = json_begin_file (fp);
1390
- parse_json (jp, reading.c_str (), &layer_seq, &progress_seq, &reader[0 ].metapos , &reader[0 ].geompos , &reader[0 ].indexpos , exclude, include, exclude_all, reader[0 ].metafile , reader[0 ].geomfile , reader[0 ].indexfile , reader[0 ].poolfile , reader[0 ].treefile , fname, basezoom, layer, droprate, reader[0 ].file_bbox , 0 , &initialized[0 ], &initial_x[0 ], &initial_y[0 ], reader, maxzoom, &layermaps[0 ], sources[layer].layer , uses_gamma, attribute_types, &dist_sum, &dist_count, guess_maxzoom);
1393
+ parse_json (jp, reading.c_str (), &layer_seq, &progress_seq, &reader[0 ].metapos , &reader[0 ].geompos , &reader[0 ].indexpos , exclude, include, exclude_all, reader[0 ].metafile , reader[0 ].geomfile , reader[0 ].indexfile , reader[0 ].poolfile , reader[0 ].treefile , fname, basezoom, layer, droprate, reader[0 ].file_bbox , 0 , &initialized[0 ], &initial_x[0 ], &initial_y[0 ], reader, maxzoom, &layermaps[0 ], sources[layer].layer , uses_gamma, attribute_types, &dist_sum, &dist_count, guess_maxzoom, prefilter != NULL || postfilter != NULL );
1391
1394
json_end (jp);
1392
1395
overall_offset = layer_seq;
1393
1396
checkdisk (reader, CPUS);
0 commit comments