Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Janusgraph bigtable rows exceeds the limit 256MiB when exported via Dataflow in Parquet format #4704

Open
opan opened this issue Oct 24, 2024 · 0 comments

Comments

@opan
Copy link

opan commented Oct 24, 2024

Hi team,

Currently, we are using Janusgraph with Bigtable as the storage backend. And we wanted to export the data out of Bigtable using Dataflow in a Parquet format to cloud storage. But during the process it failed because some of the rows size too large that exceeds the limit with the following error messages:

Error message from worker: java.io.IOException: Failed to start reading from source: BigtableSource{config=BigtableConfig{projectId=gopay-ds-staging, instanceId=risk-serving-bt, appProfileId=default, userAgent=null, emulator=null}, readOptions=BigtableReadOptions{tableId=risk-serving-bt-batch-feature-engine, rowFilter=null, keyRanges=[ByteKeyRange{startKey=[39adad4f015489a062715f5f637573746f6d65725f6167655f796561725f5f696e665f32645f5f637573746f6d65725f5f6e756d657269635f5f6461696c795f5f76b1], endKey=[3a34898871a2d37c2add4a2c502c568a3d3c84378375f55aad094c4a683b6775cb50f7dab18254bf3059ebe0c8f64a87effcc14d107f1d7a6cc1c384a391aa079281a1]}], maxBufferElementCount=null, attemptTimeout=null, operationTimeout=null, waitTimeout=null}, estimatedSizeBytes=67108864}
	org.apache.beam.runners.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:634)
	org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:381)
	org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:211)
	org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
	org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
	org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:304)
	org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:276)
	org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:206)
	org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:150)
	org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:130)
	org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:117)
	java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.google.api.gax.rpc.FailedPreconditionException: io.grpc.StatusRuntimeException: FAILED_PRECONDITION: Error while reading table 'projects/gopay-ds-staging/instances/risk-serving-bt/tables/risk-serving-bt-batch-feature-engine' : Read returned 269MiB from row '9\255\255O\001T\211\240bq__customer_age_year__inf_2d__customer__numeric__daily_...(length 67)' which exceeds the limit of 256MiB. Make sure you are setting an appropriate request filter to retrieve only recent versions and only the columns you want. If columns are accumulating more versions than you need to read, you can also create a garbage collection policy: https://cloud.google.com/bigtable/docs/configuring-garbage-collection#versions
	com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:102)
	com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:41)
	com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:86)
	com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
	com.google.api.gax.grpc.ExceptionResponseObserver.onErrorImpl(ExceptionResponseObserver.java:82)
	com.google.api.gax.rpc.StateCheckingResponseObserver.onError(StateCheckingResponseObserver.java:84)
	com.google.api.gax.grpc.GrpcDirectStreamController$ResponseObserverAdapter.onClose(GrpcDirectStreamController.java:148)
	io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:570)
	io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	com.google.cloud.bigtable.data.v2.stub.metrics.ConnectionErrorCountInterceptor$1$1.onClose(ConnectionErrorCountInterceptor.java:66)
	io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	com.google.cloud.bigtable.data.v2.stub.CookiesInterceptor$UpdateCookieListener.onClose(CookiesInterceptor.java:92)
	io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	com.google.api.gax.grpc.GrpcMetadataHandlerInterceptor$1$1.onClose(GrpcMetadataHandlerInterceptor.java:76)
	io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	io.grpc.census.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:814)
	io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	io.grpc.census.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:494)
	io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574)
	io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72)
	io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
	io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
	io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	... 3 more
	Suppressed: java.lang.RuntimeException: Asynchronous task failed
		at com.google.api.gax.rpc.ServerStreamIterator.hasNext(ServerStreamIterator.java:105)
		at org.apache.beam.sdk.io.gcp.bigtable.BigtableServiceImpl$BigtableReaderImpl.advance(BigtableServiceImpl.java:193)
		at org.apache.beam.sdk.io.gcp.bigtable.BigtableServiceImpl$BigtableReaderImpl.start(BigtableServiceImpl.java:188)
		at org.apache.beam.sdk.io.gcp.bigtable.BigtableIO$BigtableReader.start(BigtableIO.java:2029)
		at org.apache.beam.runners.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:631)
		at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:381)
		at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:211)
		at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
		at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
		at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:304)
		at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:276)
		at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:206)
		at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:150)
		at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:130)
		at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:117)
		at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
		at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
		... 3 more
Caused by: io.grpc.StatusRuntimeException: FAILED_PRECONDITION: Error while reading table 'projects/gopay-ds-staging/instances/risk-serving-bt/tables/risk-serving-bt-batch-feature-engine' : Read returned 269MiB from row '9\255\255O\001T\211\240bq__customer_age_year__inf_2d__customer__numeric__daily_...(length 67)' which exceeds the limit of 256MiB. Make sure you are setting an appropriate request filter to retrieve only recent versions and only the columns you want. If columns are accumulating more versions than you need to read, you can also create a garbage collection policy: https://cloud.google.com/bigtable/docs/configuring-garbage-collection#versions
	io.grpc.Status.asRuntimeException(Status.java:533)
	... 34 more

We have talked with GCP support if there is a workaround for this and they suggest to change the GC policy of the columns in the table. But since the rows and columns structure are created and managed directly by Janusgraph, we have concern that if we modify/change the GC policy, it might corrupt the data.

Our question is, is there a way to configure the size of the rows in janusgraph? Or is it possible to configure the GC policy directly from Janusgraph?

Do let me know if I posted this in a wrong section.

Column families that have large row size:

risk-serving-bt-feature-engine 

family {
  name: "l"
  locality_group: "user_flash"
  administrator: "chubby!mdb/cloud-bigtable-internal-bigtable-administrators-prod"
  administrator: "chubby!user/cloud-bigtable"
  writer: "chubby!mdb/cloud-bigtable-internal-bigtable-writers-prod"
  writer: "chubby!user/cloud-bigtable"
  reader: "chubby!mdb/cloud-bigtable-internal-bigtable-readers-prod"
  reader: "chubby!user/cloud-bigtable"
  gcexpr: "(age() > 604800000000 || version() > 1)"
}

janusgraph version: 0.6.4
storage backend: bigtable

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant