Skip to content

Commit

Permalink
Merge pull request #9 from kieras/develop
Browse files Browse the repository at this point in the history
Version 0.0.4 (async import/export tasks)
  • Loading branch information
kieras authored Jan 19, 2018
2 parents a1bd2ba + 7221dbb commit 8ea9550
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 43 deletions.
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,30 @@ Simply run:

$ pipsi install gcdu

You need to authenticate in the project using gcloud sdk:

$ gcloud auth login
$ gcloud config set project PROJECT_ID

Or exporting the environment variable:

$ export GOOGLE_APPLICATION_CREDENTIALS='key.json'

More help in this link https://developers.google.com/identity/protocols/application-default-credentials

---

# Usage

To use it:

$ gcdu --help

Export command:

$ gcdu export -p [project] -n [namespace] -k [comma separated list of datastore kinds]

Import command:

$ gcdu import -p [project] -n [namespace] -k [comma separated list of datastore kinds]

2 changes: 1 addition & 1 deletion gcdu/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@
#
# Google Cloud Datastore Utils

__version__ = '0.0.3'
__version__ = '0.0.4'
1 change: 1 addition & 0 deletions gcdu/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ def main():
"""Utilities for Google Cloud Datastore."""
pass


main.add_command(export)
main.add_command(import_cmd)
45 changes: 27 additions & 18 deletions gcdu/commands/export.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# -*- coding: utf-8 -*-
"""The export command."""
import json

import click

from .utils import get_datastore_api, get_kinds_list, show_progressbar_item, partition_replace, save
from .utils import (get_datastore_api, partition_replace, save, execute_tasks)


@click.command()
Expand All @@ -19,29 +20,37 @@
default='./data',
show_default=True)
@click.option('--project-placeholder', '-pp',
help='Placeholder value to replace the project value in exported files.',
help='Placeholder value to replace the project value'
' in exported files.',
required=True,
default='___PROJECT___',
show_default=True)
@click.option('--namespace-placeholder', '-np',
help='Placeholder value to replace the namespace value in exported files.',
help='Placeholder value to replace the namespace value'
' in exported files.',
required=True,
default='___NAMESPACE___',
show_default=True)
@click.option('--kinds', '-k',
help='Comma separated list of Datastore Kinds to export.',
required=True)
def export(project, namespace, data_dir, project_placeholder, namespace_placeholder, kinds):
def export(project, namespace, data_dir, project_placeholder,
namespace_placeholder, kinds):
"""Export data from database."""
kinds_list = get_kinds_list(kinds)
click.echo("Executing export. Project={}, Namespace={}, Kinds={}.".format(project, namespace, kinds_list))
with click.progressbar(kinds_list, label='Exporting', show_eta=True,
item_show_func=show_progressbar_item) as bar:
for kind in bar:
execute_export(project, namespace, data_dir, project_placeholder, namespace_placeholder, kind)
execute_tasks({
'type_task': 'export',
'project': project,
'namespace': namespace,
'data_dir': data_dir,
'project_placeholder': project_placeholder,
'namespace_placeholder': namespace_placeholder,
'kinds': kinds,
'target': execute_export
})


def execute_export(project, namespace, data_dir, project_placeholder, namespace_placeholder, kind):
def execute_export(project, namespace, data_dir, project_placeholder,
namespace_placeholder, kind):
datastore = get_datastore_api()

request_body = {
Expand All @@ -64,16 +73,16 @@ def execute_export(project, namespace, data_dir, project_placeholder, namespace_

entities = extract_entities(response)
entities_json = json.dumps(entities)
entities_replaced_json = partition_replace(entities_json, project, project_placeholder, namespace,
namespace_placeholder)
entities_replaced_json = partition_replace(entities_json, project,
project_placeholder, namespace,
namespace_placeholder)
entities_replaced = json.loads(entities_replaced_json)

save(entities_replaced, kind, data_dir)


def extract_entities(response):
entities = []
for entityResult in response.get('batch').get('entityResults'):
entity = entityResult.get('entity')
entities.append(entity)
return entities
return [
entityResult.get('entity')
for entityResult in response.get('batch', {}).get('entityResults', [])
]
46 changes: 27 additions & 19 deletions gcdu/commands/import_cmd.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# -*- coding: utf-8 -*-
"""The import command."""
import json

import click

from .utils import get_datastore_api, get_kinds_list, show_progressbar_item, partition_replace, load
from .utils import (get_datastore_api, partition_replace, load, execute_tasks)


@click.command('import')
Expand All @@ -19,43 +20,50 @@
default='./data',
show_default=True)
@click.option('--project-placeholder', '-pp',
help='Placeholder value to replace the project value in previously exported files.',
help='Placeholder value to replace the project value in'
' previously exported files.',
required=True,
default='___PROJECT___',
show_default=True)
@click.option('--namespace-placeholder', '-np',
help='Placeholder value to replace the namespace value in previously exported files.',
help='Placeholder value to replace the namespace value in'
' previously exported files.',
required=True,
default='___NAMESPACE___',
show_default=True)
@click.option('--kinds', '-k',
help='Comma separated list of Datastore Kinds to import.',
required=True)
def import_cmd(project, namespace, data_dir, project_placeholder, namespace_placeholder, kinds):
def import_cmd(project, namespace, data_dir, project_placeholder,
namespace_placeholder, kinds):
"""Import data to database using previously exported data as input."""
kinds_list = get_kinds_list(kinds)
click.echo("Executing import. Project={}, Namespace={}, Kinds={}.".format(project, namespace, kinds_list))
with click.progressbar(kinds_list, label='Importing', show_eta=True,
item_show_func=show_progressbar_item) as bar:
for kind in bar:
execute_import(project, namespace, data_dir, project_placeholder, namespace_placeholder, kind)
execute_tasks({
'type_task': 'import',
'project': project,
'namespace': namespace,
'data_dir': data_dir,
'project_placeholder': project_placeholder,
'namespace_placeholder': namespace_placeholder,
'kinds': kinds,
'target': execute_import
})


def execute_import(project, namespace, data_dir, project_placeholder, namespace_placeholder, kind):
def execute_import(project, namespace, data_dir, project_placeholder,
namespace_placeholder, kind):
datastore = get_datastore_api()

entities = load(kind, data_dir)
entities_json = json.dumps(entities)
entities_replaced_json = partition_replace(entities_json, project_placeholder, project,
namespace_placeholder, namespace)
entities_replaced_json = partition_replace(entities_json,
project_placeholder, project,
namespace_placeholder,
namespace)
entities_replaced = json.loads(entities_replaced_json)

inserts = []
for entity in entities_replaced:
insert = {
'insert': entity
}
inserts.append(insert)
inserts = [
{'insert': entity} for entity in entities_replaced
]

request_body = {
"mutations": inserts,
Expand Down
76 changes: 71 additions & 5 deletions gcdu/commands/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,29 @@
# -*- coding: utf-8 -*-
"""Utilities."""
import copy
import io
import os
import json
import os
import threading
from collections import namedtuple

import click
import googleapiclient.discovery

cls_task = namedtuple(
'Task',
[
'type_task',
'project',
'namespace',
'target',
'data_dir',
'project_placeholder',
'namespace_placeholder',
'kinds'
]
)


def get_datastore_api():
return googleapiclient.discovery.build('datastore', 'v1')
Expand All @@ -20,7 +39,8 @@ def show_progressbar_item(item):
return "Kind: {}".format(item)


def partition_replace(entities_json, from_project, to_project, from_namespace, to_namespace):
def partition_replace(entities_json, from_project, to_project, from_namespace,
to_namespace):
result = entities_json.replace('"projectId": "{}"'.format(from_project),
'"projectId": "{}"'.format(to_project))
result = result.replace('"namespaceId": "{}"'.format(from_namespace),
Expand All @@ -29,15 +49,61 @@ def partition_replace(entities_json, from_project, to_project, from_namespace, t


def save(entities, kind, data_dir):
if not entities:
click.echo('No entities found for kind {}'.format(kind))
return
if not os.path.exists(data_dir):
os.makedirs(data_dir)

with io.open('{}/{}.json'.format(data_dir, kind), 'w', encoding='utf-8') as export_file:
with io.open('{}/{}.json'.format(data_dir, kind), 'w',
encoding='utf-8') as export_file:
export_file.write(
json.dumps(entities, ensure_ascii=False, sort_keys=True, indent=2, separators=(',', ': ')))
json.dumps(entities, ensure_ascii=False, sort_keys=True, indent=2,
separators=(u',', u': ')))


def load(kind, data_dir):
with io.open('{}/{}.json'.format(data_dir, kind), 'r', encoding='utf-8') as export_file:
with io.open('{}/{}.json'.format(data_dir, kind), 'r',
encoding='utf-8') as export_file:
entities = json.load(export_file)
return entities


def execute_tasks(kargs):
data = cls_task(**kargs)
kinds_list = get_kinds_list(data.kinds)
click.echo(
"Executing {}. Project={}, Namespace={}, Kinds={}.".format(
data.type_task,
data.project,
data.namespace,
kinds_list))

tasks = {}
for kind in kinds_list:
tasks[kind] = threading.Thread(
target=data.target,
name=kind,
args=(
data.project, data.namespace, data.data_dir,
data.project_placeholder,
data.namespace_placeholder, kind
)
)

kinds_list_progress = copy.deepcopy(kinds_list)

click.echo('Starting tasks...')
for task in tasks:
tasks[task].start()

click.echo('Done. {} tasks started.'.format(len(tasks)))
click.echo('Executing...')

while kinds_list_progress:
for idx, kind in enumerate(kinds_list_progress):
if not tasks.get(kind).is_alive():
click.echo('Done. Kind={}'.format(kind))
kinds_list_progress.pop(idx)

click.echo('Finished!')

0 comments on commit 8ea9550

Please sign in to comment.