Skip to content

Commit

Permalink
pg2arrow: add -k|--parallel-keys option
Browse files Browse the repository at this point in the history
  • Loading branch information
kaigai committed Mar 31, 2024
1 parent dbbafc2 commit 24b79b6
Showing 1 changed file with 99 additions and 21 deletions.
120 changes: 99 additions & 21 deletions arrow-tools/sql2arrow.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ static char *sqldb_database = NULL;
static char *dump_arrow_filename = NULL;
static char *stat_embedded_columns = NULL;
static int num_worker_threads = 0;
static char *parallel_dist_keys = NULL;
static int shows_progress = 0;
static userConfigOption *sqldb_session_configs = NULL;
static nestLoopOption *sqldb_nestloop_options = NULL;
Expand All @@ -43,6 +44,7 @@ static pthread_mutex_t worker_setup_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t worker_setup_cond = PTHREAD_COND_INITIALIZER;
static pthread_t *worker_threads;
static SQLtable **worker_tables;
static const char **worker_dist_keys = NULL;
static pthread_mutex_t main_table_mutex = PTHREAD_MUTEX_INITIALIZER;

/*
Expand Down Expand Up @@ -565,9 +567,16 @@ usage(void)
" -c, --command=COMMAND SQL command to run\n"
" -t, --table=TABLENAME Equivalent to '-c SELECT * FROM TABLENAME'\n"
" (-c and -t are exclusive, either of them must be given)\n"
" -n, --num-workers=N_WORKERS Enables parallel dump mode. It requires\n"
" the SQL command contains $(WORKER_ID) and\n"
" $(N_WORKERS) to avoid data duplication.\n"
" -n, --num-workers=N_WORKERS Enables parallel dump mode.\n"
" It requires the SQL command contains $(WORKER_ID)\n"
" and $(N_WORKERS), to be replaced by the numeric\n"
" worker-id and number of workers.\n"
" -k, --parallel-keys=PARALLEL_KEYS Enables yet another parallel dump.\n"
" It requires the SQL command contains $(PARALLEL_KEY)\n"
" to be replaced by the comma separated token in the\n"
" PARALLEL_KEYS.\n"
" (-n and -k are exclusive, either of them can be give if parallel dump.\n"
" It is user's responsibility to avoid data duplication.)\n"
#ifdef __PG2ARROW__
" --inner-join=SUB_COMMAND\n"
" --outer-join=SUB_COMMAND\n"
Expand Down Expand Up @@ -633,6 +642,7 @@ parse_options(int argc, char * const argv[])
{"outer-join", required_argument, NULL, 1005},
{"stat", optional_argument, NULL, 'S'},
{"num-workers", required_argument, NULL, 'n'},
{"parallel-keys",required_argument, NULL, 'k'},
{"help", no_argument, NULL, 9999},
{NULL, 0, NULL, 0},
};
Expand All @@ -651,7 +661,7 @@ parse_options(int argc, char * const argv[])
#ifdef __MYSQL2ARROW__
"P:"
#endif
"n:S::", long_options, NULL)) >= 0)
"n:k:S::", long_options, NULL)) >= 0)
{
switch (c)
{
Expand Down Expand Up @@ -728,7 +738,9 @@ parse_options(int argc, char * const argv[])
break;

case 'n':
if (num_worker_threads != 0)
if (parallel_dist_keys != 0)
Elog("-n and -k are exclusive");
else if (num_worker_threads != 0)
Elog("-n option was supplied twice");
else
{
Expand All @@ -741,6 +753,15 @@ parse_options(int argc, char * const argv[])
}
break;

case 'k':
if (parallel_dist_keys != 0)
Elog("-k option was supplied twice");
else if (num_worker_threads != 0)
Elog("-n and -k are exclusive");
else
parallel_dist_keys = pstrdup(optarg);
break;

case 'h':
if (sqldb_hostname)
Elog("-h option was supplied twice");
Expand Down Expand Up @@ -879,13 +900,39 @@ parse_options(int argc, char * const argv[])
/*
* The 'sqldb_command' must contains $(WORKER_ID) and $(N_WORKERS).
*/
if (num_worker_threads == 0)
num_worker_threads = 1;
if (num_worker_threads == 1)
if (parallel_dist_keys)
{
char *temp = pstrdup(parallel_dist_keys);
char *tok, *pos;
int nitems = 0;
int nrooms = 25;

assert(num_worker_threads == 0);
worker_dist_keys = palloc0(sizeof(const char *) * nrooms);
for (tok = strtok_r(temp, ",", &pos);
tok != NULL;
tok = strtok_r(NULL, ",", &pos))
{
tok = __trim(tok);

if (nitems >= nrooms)
{
nrooms += nrooms;
worker_dist_keys = repalloc(worker_dist_keys,
sizeof(const char *) * nrooms);
}
puts(tok);
worker_dist_keys[nitems++] = tok;
}
num_worker_threads = nitems;
}
else if (num_worker_threads == 0 || num_worker_threads == 1)
{
if (strstr(sqldb_command, "$(WORKER_ID)") != NULL ||
strstr(sqldb_command, "$(N_WORKERS)") != NULL)
Elog("The reserved keywords: $(WORKER_ID) and $(N_WORKERS) should not be used in non-parallel SQL commands");
strstr(sqldb_command, "$(N_WORKERS)") != NULL ||
strstr(sqldb_command, "$(PARALLEL_KEY)") != NULL)
Elog("Non-parallel SQL command should not use the reserved keywords: $(WORKER_ID), $(N_WORKERS) and $(PARALLEL_KEY)");
num_worker_threads = 1;
}
else
{
Expand All @@ -898,9 +945,7 @@ parse_options(int argc, char * const argv[])
assert(!meet_command);
sprintf(temp, "%s WHERE hashtid(ctid) %% $(N_WORKERS) = $(WORKER_ID)",
sqldb_command);
sqldb_command = strdup(temp);
if (!sqldb_command)
Elog("out of memory");
sqldb_command = pstrdup(temp);
}
else
#endif /* __PG2ARROW__ */
Expand All @@ -909,6 +954,8 @@ parse_options(int argc, char * const argv[])
strstr(sqldb_command, "$(N_WORKERS)") == NULL)
Elog("The custom SQL command has to contains $(WORKER_ID) and $(N_WORKERS) to avoid data duplications.\n"
"example) SELECT * FROM my_table WHERE hashtid(ctid) %% $(N_WORKERS) = $(WORKER_ID)");
if (strstr(sqldb_command, "$(PARALLEL_KEY)") != NULL)
Elog("-n|--num-workers does not support $(PARALLEL_KEY) macro.");
}
}
if (batch_segment_sz == 0)
Expand All @@ -922,30 +969,61 @@ const char *
sqldb_command_apply_worker_id(const char *command, int worker_id)
{
const char *src = command;
char *buf = palloc(strlen(command) + 100);
char *dst = buf;
size_t len = strlen(command) + 100;
size_t off = 0;
char *buf = palloc(len);
int c;

while ((c = *src++) != '\0')
{
if (c == '$')
{
char temp[100];
const char *tok = NULL;

if (strncmp(src, "(WORKER_ID)", 11) == 0)
{
dst += sprintf(dst, "%d", worker_id);
sprintf(temp, "%d", worker_id);
src += 11;
continue;
tok = temp;
}
if (strncmp(src, "(N_WORKERS)", 11) == 0)
else if (strncmp(src, "(N_WORKERS)", 11) == 0)
{
dst += sprintf(dst, "%d", num_worker_threads);
sprintf(temp, "%d", num_worker_threads);
src += 11;
tok = temp;
}
else if (strncmp(src, "(PARALLEL_KEY)", 14) == 0)
{
if (worker_dist_keys && worker_dist_keys[worker_id])
{
src += 14;
tok = worker_dist_keys[worker_id];
}
}

if (tok)
{
size_t sz = strlen(tok);

if (off + sz + 1 >= len)
{
len = (off + sz) + len;
buf = repalloc(buf, len);
}
strcpy(buf + off, tok);
off += sz;
continue;
}
}
*dst++ = c;
if (off + 1 == len)
{
len += len;
buf = repalloc(buf, len);
}
buf[off++] = c;
}
*dst = '\0';
buf[off++] = '\0';

return buf;
}
Expand Down

0 comments on commit 24b79b6

Please sign in to comment.