diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SimpleWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SimpleWriter.java new file mode 100644 index 0000000000..807fec10d2 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SimpleWriter.java @@ -0,0 +1,126 @@ +package com.google.cloud.bigquery.storage.v1; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController.LimitExceededBehavior; +import com.google.common.base.Preconditions; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.json.JSONArray; + +/** + * A SimpleWriter provides a way to write to BigQuery in a unary fashion. Underneath, it still uses + * the streaming API through JsonStreamWriter. There is a cache that manages different table writes. + * Default max table destination cached is 100, you can adjust it through the Builder. + * + *

Currently, it only supports AT_LEAST_ONCE writes. + * + *

This class is still in development, DO NO USE IT. + * + *

TODOS: 1. Make the class thread safe 2. Handle failed writer case + */ +public class SimpleWriter { + private BigQueryWriteClient client; + private LoadingCache writerCache; + private String traceId; + private static String tableNamePatternString = "projects/[^/]+/datasets/[^/]+/tables/[^/]+"; + private static Pattern tableNamePattern = Pattern.compile(tableNamePatternString); + private boolean ignoreUnkownFields; + + /** Constructs a new {@link SimpleWriter.Builder} using the given bigquery write client. */ + public static SimpleWriter.Builder newBuilder(BigQueryWriteClient client) { + return new SimpleWriter.Builder(client); + } + + private SimpleWriter(SimpleWriter.Builder builder) { + this.client = builder.client; + this.traceId = builder.traceId; + this.ignoreUnkownFields = builder.ignoreUnknownField; + this.writerCache = + CacheBuilder.from(builder.cacheSpec) + .removalListener( + new RemovalListener() { + public void onRemoval( + RemovalNotification notification) { + notification.getValue().close(); + } + }) + .build( + new CacheLoader() { + public JsonStreamWriter load(String key) + throws DescriptorValidationException, IOException, InterruptedException { + return JsonStreamWriter.newBuilder(key, client) + .setEnableConnectionPool(true) + .setTraceId(traceId) + .setIgnoreUnknownFields(ignoreUnkownFields) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setLimitExceededBehavior(LimitExceededBehavior.ThrowException) + .build()) + .build(); + } + }); + } + + public static final class Builder { + BigQueryWriteClient client; + // Cache expriration time is set to the same as connection timeout time. After a connection + // is cut, we might be missing schema updates to the object, so we will just let the cache + // expire so that a fresh table schema will be retrieved. + String cacheSpec = "maximumSize=100,expireAfterWrite=10m"; + String traceId = "SimpleWriter:null"; + boolean ignoreUnknownField = false; + + private Builder(BigQueryWriteClient client) { + this.client = Preconditions.checkNotNull(client); + } + + /** CacheSpec for the JsonStreamWriter cache. */ + public SimpleWriter.Builder setCacheSpec(String cacheSpec) { + this.cacheSpec = cacheSpec; + return this; + } + + /** One time trace id to apply to all writes */ + public SimpleWriter.Builder setTraceId(String traceId) { + this.traceId = "SimpleWriter_" + traceId; + return this; + } + + /** + * One time set ignoreUnknown field. If true, then if the input has unknown fields to bigquery + * table, the append will not fail. By default, the setting is false. + */ + public SimpleWriter.Builder setIgnoreUnknownField(boolean ignoreUnknownField) { + this.ignoreUnknownField = ignoreUnknownField; + return this; + } + + public SimpleWriter build() { + return new SimpleWriter(this); + } + } + + /** + * Appends data to a BigQuery Table. Rows will appear AT_LEAST_ONCE in BigQuery. + * + * @param rows the rows in serialized format to write to BigQuery. + * @return the append response wrapped in a future. + */ + public ApiFuture append(String tableName, JSONArray data) + throws ExecutionException, DescriptorValidationException, IOException { + Matcher tableNameMatcher = tableNamePattern.matcher(tableName); + if (!tableNameMatcher.matches()) { + throw new IllegalArgumentException("Invalid table name: " + tableName); + } + return writerCache.get(tableName).append(data); + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/SimpleWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/SimpleWriterTest.java new file mode 100644 index 0000000000..ba8d5fb0bf --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/SimpleWriterTest.java @@ -0,0 +1,77 @@ +package com.google.cloud.bigquery.storage.v1; + +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.testing.MockGrpcService; +import com.google.api.gax.grpc.testing.MockServiceHelper; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import java.util.Arrays; +import java.util.UUID; +import java.util.logging.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class SimpleWriterTest { + private static final Logger log = + Logger.getLogger(com.google.cloud.bigquery.storage.v1.StreamWriterTest.class.getName()); + private static final String TEST_STREAM_1 = "projects/p/datasets/d1/tables/t1/streams/s1"; + private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/s2"; + private static final String TEST_TRACE_ID = "DATAFLOW:job_id"; + private FakeScheduledExecutorService fakeExecutor; + private FakeBigQueryWrite testBigQueryWrite; + private static MockServiceHelper serviceHelper; + private BigQueryWriteClient client; + private final TableFieldSchema FOO = + TableFieldSchema.newBuilder() + .setType(TableFieldSchema.Type.STRING) + .setMode(TableFieldSchema.Mode.NULLABLE) + .setName("foo") + .build(); + private final TableFieldSchema BAR = + TableFieldSchema.newBuilder() + .setType(TableFieldSchema.Type.STRING) + .setMode(TableFieldSchema.Mode.NULLABLE) + .setName("bar") + .build(); + + public SimpleWriterTest() throws DescriptorValidationException {} + + @Before + public void setUp() throws Exception { + testBigQueryWrite = new FakeBigQueryWrite(); + serviceHelper = + new MockServiceHelper( + UUID.randomUUID().toString(), Arrays.asList(testBigQueryWrite)); + serviceHelper.start(); + fakeExecutor = new FakeScheduledExecutorService(); + testBigQueryWrite.setExecutor(fakeExecutor); + client = + BigQueryWriteClient.create( + BigQueryWriteSettings.newBuilder() + .setCredentialsProvider(NoCredentialsProvider.create()) + .setTransportChannelProvider(serviceHelper.createChannelProvider()) + .build()); + } + + @After + public void tearDown() throws Exception { + log.info("tearDown called"); + client.close(); + serviceHelper.stop(); + } + + @Test + public void testGoodWrites() throws Exception {} + + @Test + public void testBadWrites() throws Exception {} + + @Test + public void testWriterCacheExpired() throws Exception {} + + @Test + public void testBuilderParams() throws Exception {} +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index 6972813e20..fe46e0e989 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -1296,4 +1296,52 @@ public void testMultiplexingMixedLocation() assertEquals("us", streamWriter2.getLocation()); assertEquals("eu", streamWriter3.getLocation()); } + + @Test + public void testSimpleWriter() + throws IOException, InterruptedException, ExecutionException, DescriptorValidationException { + String table1 = + String.format( + "projects/%s/datasets/%s/tables/%s", + ServiceOptions.getDefaultProjectId(), DATASET, TABLE); + String table2 = + String.format( + "projects/%s/datasets/%s/tables/%s", + ServiceOptions.getDefaultProjectId(), DATASET_EU, TABLE); + + SimpleWriter simpleWriter = SimpleWriter.newBuilder(BigQueryWriteClient.create()).build(); + JSONObject foo = new JSONObject(); + foo.put("foo", "aaa"); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + ApiFuture r1 = simpleWriter.append(table1, jsonArr); + foo = new JSONObject(); + foo.put("foo", "bbb"); + jsonArr = new JSONArray(); + jsonArr.put(foo); + ApiFuture r2 = simpleWriter.append(table2, jsonArr); + foo = new JSONObject(); + foo.put("foo", "ccc"); + jsonArr = new JSONArray(); + jsonArr.put(foo); + ApiFuture r3 = simpleWriter.append(table1, jsonArr); + r1.get(); + r2.get(); + r3.get(); + TableResult result = + bigquery.listTableData(tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); + Iterator iter = result.getValues().iterator(); + FieldValueList currentRow = iter.next(); + assertEquals("aaa", currentRow.get(0).getStringValue()); + currentRow = iter.next(); + assertEquals("ccc", currentRow.get(0).getStringValue()); + assertFalse(iter.hasNext()); + result = + bigquery.listTableData( + tableInfoEU.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); + iter = result.getValues().iterator(); + currentRow = iter.next(); + assertEquals("bbb", currentRow.get(0).getStringValue()); + assertFalse(iter.hasNext()); + } }