@@ -66,6 +66,7 @@ extern "C" {
66
66
67
67
pthread_mutex_t db_lock = PTHREAD_MUTEX_INITIALIZER;
68
68
pthread_mutex_t var_lock = PTHREAD_MUTEX_INITIALIZER;
69
+ pthread_mutex_t task_lock = PTHREAD_MUTEX_INITIALIZER;
69
70
70
71
// convert serial feature geometry (drawvec) to output tile geometry (mvt_geometry)
71
72
static std::vector<mvt_geometry> to_feature (drawvec const &geom) {
@@ -833,6 +834,10 @@ static unsigned long long calculate_drop_sequence(serial_feature const &sf) {
833
834
return ~out; // lowest numbered feature gets dropped first
834
835
}
835
836
837
+ struct task {
838
+ int fileno = 0 ;
839
+ };
840
+
836
841
// This is the block of parameters that are passed to write_tile() to read a tile
837
842
// from the serialized form, do whatever needs to be done to it, and to write the
838
843
// MVT-format output to the output tileset.
@@ -841,7 +846,8 @@ static unsigned long long calculate_drop_sequence(serial_feature const &sf) {
841
846
// by the caller to determine whether the zoom level needs to be done over with
842
847
// new thresholds.
843
848
struct write_tile_args {
844
- struct task *tasks = NULL ;
849
+ int threadno;
850
+ std::vector<task *> *tasks;
845
851
char *global_stringpool = NULL ;
846
852
int min_detail = 0 ;
847
853
sqlite3 *outdb = NULL ;
@@ -2702,17 +2708,35 @@ long long write_tile(decompressor *geoms, std::atomic<long long> *geompos_in, ch
2702
2708
return -1 ;
2703
2709
}
2704
2710
2705
- struct task {
2706
- int fileno = 0 ;
2707
- struct task *next = NULL ;
2708
- };
2709
-
2710
2711
void *run_thread (void *vargs) {
2711
2712
write_tile_args *arg = (write_tile_args *) vargs;
2712
- struct task *task;
2713
2713
int *err_or_null = NULL ;
2714
2714
2715
- for (task = arg->tasks ; task != NULL ; task = task->next ) {
2715
+ while (true ) {
2716
+ bool done = false ;
2717
+
2718
+ if (pthread_mutex_lock (&task_lock) != 0 ) {
2719
+ perror (" pthread_mutex_lock" );
2720
+ exit (EXIT_PTHREAD);
2721
+ }
2722
+
2723
+ struct task *task;
2724
+ if (arg->tasks ->size () == 0 ) {
2725
+ done = true ;
2726
+ } else {
2727
+ task = arg->tasks ->back ();
2728
+ arg->tasks ->pop_back ();
2729
+ }
2730
+
2731
+ if (pthread_mutex_unlock (&task_lock) != 0 ) {
2732
+ perror (" pthread_mutex_unlock" );
2733
+ exit (EXIT_PTHREAD);
2734
+ }
2735
+
2736
+ if (done) {
2737
+ break ;
2738
+ }
2739
+
2716
2740
int j = task->fileno ;
2717
2741
2718
2742
if (arg->geomfd [j] < 0 ) {
@@ -2939,47 +2963,15 @@ int traverse_zooms(int *geomfd, off_t *geom_size, char *global_stringpool, std::
2939
2963
std::vector<task> tasks;
2940
2964
tasks.resize (TEMP_FILES);
2941
2965
2942
- struct dispatch {
2943
- struct task *tasks = NULL ;
2944
- long long todo = 0 ;
2945
- struct dispatch *next = NULL ;
2946
- };
2947
- std::vector<dispatch> dispatches;
2948
- dispatches.resize (threads);
2949
-
2950
- dispatch *dispatch_head = &dispatches[0 ];
2951
- for (size_t j = 0 ; j < threads; j++) {
2952
- dispatches[j].tasks = NULL ;
2953
- dispatches[j].todo = 0 ;
2954
- if (j + 1 < threads) {
2955
- dispatches[j].next = &dispatches[j + 1 ];
2956
- } else {
2957
- dispatches[j].next = NULL ;
2958
- }
2959
- }
2966
+ std::vector<task *> dispatch;
2960
2967
2961
2968
for (size_t j = 0 ; j < TEMP_FILES; j++) {
2962
2969
if (geom_size[j] == 0 ) {
2963
2970
continue ;
2964
2971
}
2965
2972
2966
2973
tasks[j].fileno = j;
2967
- tasks[j].next = dispatch_head->tasks ;
2968
- dispatch_head->tasks = &tasks[j];
2969
- dispatch_head->todo += geom_size[j];
2970
-
2971
- dispatch *here = dispatch_head;
2972
- dispatch_head = dispatch_head->next ;
2973
-
2974
- dispatch **d;
2975
- for (d = &dispatch_head; *d != NULL ; d = &((*d)->next )) {
2976
- if (here->todo < (*d)->todo ) {
2977
- break ;
2978
- }
2979
- }
2980
-
2981
- here->next = *d;
2982
- *d = here;
2974
+ dispatch.push_back (&tasks[j]);
2983
2975
}
2984
2976
2985
2977
int err = INT_MAX;
@@ -3001,7 +2993,11 @@ int traverse_zooms(int *geomfd, off_t *geom_size, char *global_stringpool, std::
3001
2993
atomic_strategy strategy;
3002
2994
skip_children_out.clear ();
3003
2995
2996
+ // must be recreate with each pass, since child threads consume it
2997
+ std::vector<task *> pass_dispatch = dispatch;
2998
+
3004
2999
for (size_t thread = 0 ; thread < threads; thread++) {
3000
+ args[thread].threadno = thread;
3005
3001
args[thread].global_stringpool = global_stringpool;
3006
3002
args[thread].min_detail = min_detail;
3007
3003
args[thread].outdb = outdb; // locked with db_lock
@@ -3053,7 +3049,7 @@ int traverse_zooms(int *geomfd, off_t *geom_size, char *global_stringpool, std::
3053
3049
args[thread].filter = filter;
3054
3050
args[thread].unidecode_data = &unidecode_data;
3055
3051
3056
- args[thread].tasks = dispatches[thread]. tasks ;
3052
+ args[thread].tasks = &pass_dispatch ;
3057
3053
args[thread].running = &running;
3058
3054
args[thread].pass = pass;
3059
3055
args[thread].wrote_zoom = -1 ;
0 commit comments