Skip to content

Commit

Permalink
support Arrow using loaders.gl
Browse files Browse the repository at this point in the history
Signed-off-by: Xun Li <[email protected]>
  • Loading branch information
lixun910 committed Nov 1, 2023
1 parent 52a4f09 commit 6150d79
Show file tree
Hide file tree
Showing 19 changed files with 189 additions and 792 deletions.
12 changes: 11 additions & 1 deletion examples/webpack.config.local.js
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ function makeLocalDevConfig(env, EXAMPLE_DIR = LIB_DIR, externals = {}) {
use: ['source-map-loader'],
enforce: 'pre',
exclude: [/node_modules\/react-palm/, /node_modules\/react-data-grid/]
},
{
test: /\.mjs$/,
include: /node_modules/,
type: 'javascript/auto'
}
]
},
Expand Down Expand Up @@ -250,7 +255,12 @@ function addBabelSettings(env, config, exampleDir) {
...config.module,
rules: [
...config.module.rules.filter(r => r.loader !== 'babel-loader'),
makeBabelRule(env, exampleDir)
makeBabelRule(env, exampleDir),
{
test: /\.mjs$/,
include: /node_modules/,
type: 'javascript/auto'
}
]
}
};
Expand Down
11 changes: 11 additions & 0 deletions jest.setup.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,14 @@ jest.mock('@kepler.gl/utils', () => ({
hasPortableWidth: jest.fn(),
hasMobileWidth: jest.fn()
}));

// TextEncoder / TextDecoder APIs are used by arrow, but are not provided by
// jsdom, all node versions supported provide these via the util module
if (
typeof globalThis.TextEncoder === "undefined" ||
typeof globalThis.TextDecoder === "undefined"
) {
const utils = require("util");
globalThis.TextEncoder = utils.TextEncoder;
globalThis.TextDecoder = utils.TextDecoder;
}
1 change: 1 addition & 0 deletions src/deckgl-layers/babel.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const PRESETS = ['@babel/preset-env', '@babel/preset-react', '@babel/preset-type
const PLUGINS = [
['@babel/plugin-transform-typescript', {isTSX: true, allowDeclareFields: true}],
'@babel/plugin-transform-modules-commonjs',
'@babel/plugin-proposal-class-properties',
'@babel/plugin-proposal-optional-chaining',
[
'@babel/transform-runtime',
Expand Down
84 changes: 73 additions & 11 deletions src/layers/src/arrow-layer/arrow-layer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,44 @@
// THE SOFTWARE.

import {BinaryFeatures} from '@loaders.gl/schema';
import {getBinaryGeometriesFromArrow, parseGeometryFromArrow} from '@loaders.gl/arrow';
import {GeoJsonLayer as DeckGLGeoJsonLayer} from '@deck.gl/layers';
import {HIGHLIGH_COLOR_3D} from '@kepler.gl/constants';
import {KeplerTable} from '@kepler.gl/table';
import {
GEOARROW_COLUMN_METADATA_KEY,
DataContainerInterface,
getBinaryGeometriesFromArrow,
parseGeometryFromArrow
} from '@kepler.gl/utils';
import {DataContainerInterface} from '@kepler.gl/utils';
import {FilterArrowExtension} from '@kepler.gl/deckgl-layers';
import GeoJsonLayer, {SUPPORTED_ANALYZER_TYPES} from '../geojson-layer/geojson-layer';

function updateBoundsFromGeoArrowSamples(
flatCoords: Float64Array,
nDim: number,
bounds: [number, number, number, number],
sampleSize: number = 100
) {
const numberOfFeatures = flatCoords.length / nDim;
const sampleStep = Math.max(Math.floor(numberOfFeatures / sampleSize), 1);

const newBounds: [number, number, number, number] = [...bounds];
for (let i = 0; i < numberOfFeatures; i += sampleStep) {
const lng = flatCoords[i * nDim];
const lat = flatCoords[i * nDim + 1];
if (lng < bounds[0]) {
newBounds[0] = lng;
}
if (lat < newBounds[1]) {
newBounds[1] = lat;
}
if (lng > newBounds[2]) {
newBounds[2] = lng;
}
if (lat > newBounds[3]) {
newBounds[3] = lat;
}
}

return newBounds;
}

export default class ArrowLayer extends GeoJsonLayer {
binaryFeatures: BinaryFeatures[];
dataContainer: DataContainerInterface | null;
Expand Down Expand Up @@ -147,12 +173,33 @@ export default class ArrowLayer extends GeoJsonLayer {
updateLayerMeta(dataContainer: DataContainerInterface) {
this.dataContainer = dataContainer;
const {geojson} = this.config.columns;
const geoColumn = dataContainer.getColumn(geojson.fieldIdx);

const geoColumn = dataContainer.getColumn?.(geojson.fieldIdx);
const arrowField = dataContainer.getField?.(geojson.fieldIdx);

const encoding = arrowField?.metadata?.get('ARROW:extension:name');
// create binary data from arrow data for GeoJsonLayer
const {binaryGeometries, bounds, featureTypes} = getBinaryGeometriesFromArrow(geoColumn);
const {binaryGeometries, featureTypes} = getBinaryGeometriesFromArrow(
geoColumn,
encoding
);
this.binaryFeatures = binaryGeometries;

// TODO: this should be removed once fix was applied in loaders.gl
let bounds : [number, number, number, number] = [Infinity, Infinity, -Infinity, -Infinity]
binaryGeometries.forEach(b => {
const coords = featureTypes.polygon
? b.polygons?.positions
: featureTypes.point
? b.points?.positions
: b.lines?.positions;
bounds = updateBoundsFromGeoArrowSamples(
coords?.value as Float64Array,
coords?.size || 2,
bounds
);
});

// since there is no feature.properties.radius, we set fixedRadius to false
const fixedRadius = false;
this.updateMeta({bounds, fixedRadius, featureTypes});
Expand All @@ -173,15 +220,30 @@ export default class ArrowLayer extends GeoJsonLayer {
// NOTE: this could be done in Deck.gl getPickingInfo(params) and binaryToGeojson()
if (this.isLayerHovered(objectInfo) && objectInfo.index >= 0 && this.dataContainer) {
const {geojson} = this.config.columns;
const col = this.dataContainer.getColumn(geojson.fieldIdx);
const rawGeometry = col.get(objectInfo.index);

const col = this.dataContainer.getColumn?.(geojson.fieldIdx);
const rawGeometry = col?.get(objectInfo.index);

const field = this.dataContainer.getField?.(geojson.fieldIdx);
const encoding = field?.metadata?.get('ARROW:extension:name');

const hoveredFeature = parseGeometryFromArrow({
encoding: col.metadata?.get(GEOARROW_COLUMN_METADATA_KEY),
encoding,
data: rawGeometry
});

const properties = this.dataContainer.rowAsArray(objectInfo.index).reduce((prev, cur, i) => {
if (i !== geojson.fieldIdx) {
const fieldName = this.dataContainer?.getField?.(i).name;
prev[fieldName] = cur;
}
return prev;
}, {});

return {
...hoveredFeature,
properties: {
...properties,
index: objectInfo.index
}
};
Expand Down
47 changes: 14 additions & 33 deletions src/processors/src/data-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,13 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

import {Table as ApacheArrowTable, Field as ArrowField} from 'apache-arrow';
import {Table as ApacheArrowTable, Field as ArrowField, RecordBatch} from 'apache-arrow';
import {csvParseRows} from 'd3-dsv';
import {Console} from 'global/console';
import {DATA_TYPES as AnalyzerDATA_TYPES} from 'type-analyzer';
import normalize from '@mapbox/geojson-normalize';
import {
ALL_FIELD_TYPES,
DATASET_FORMATS,
GUIDES_FILE_FORMAT_DOC,
ARROW_GEO_METADATA_KEY
} from '@kepler.gl/constants';
import {ALL_FIELD_TYPES, DATASET_FORMATS, GUIDES_FILE_FORMAT_DOC} from '@kepler.gl/constants';
import {ProcessorResult, Field} from '@kepler.gl/types';
import {
GEOARROW_COLUMN_METADATA_KEY,
arrowDataTypeToAnalyzerDataType,
arrowDataTypeToFieldType,
notNullorUndefined,
Expand Down Expand Up @@ -405,63 +398,50 @@ export function processKeplerglDataset(
* @param arrowTable the arrow table to parse
* @returns dataset containing `fields` and `rows` or null
*/
export function processArrowTable(arrowTable: ApacheArrowTable): ProcessorResult | null {
if (!arrowTable) {
export function processArrowTable(arrowBatches: RecordBatch[]): ProcessorResult | null {
if (arrowBatches.length === 0) {
return null;
}
const metadata = arrowTable.schema.metadata;
// get geometry columns if metadata has key 'geo'
let geometryColumns = [];
if (metadata.get(ARROW_GEO_METADATA_KEY) !== undefined) {
// load geo metadata
// parse metadata string to JSON object
const geoMeta = JSON.parse(metadata.get(ARROW_GEO_METADATA_KEY) || '');
// check if schema_version in geoMeta equals to '0.1.0'
const SCHEMA_VERSION = '0.1.0';
if (geoMeta.schema_version !== SCHEMA_VERSION) {
Console.warn('Apache Arrow schema version not supported');
}
// get all geometry columns
geometryColumns = geoMeta.columns;
}

const arrowTable = new ApacheArrowTable(arrowBatches);
const fields: Field[] = [];

// parse fields
arrowTable.schema.fields.forEach((field: ArrowField, index: number) => {
const isGeometryColumn =
geometryColumns[field.name] !== undefined ||
field.metadata.get(GEOARROW_COLUMN_METADATA_KEY)?.startsWith('geoarrow');
const isGeometryColumn = field.metadata.get('ARROW:extension:name')?.startsWith('geoarrow');
fields.push({
name: field.name,
id: field.name,
displayName: field.name,
format: '',
fieldIdx: index,
type: isGeometryColumn ? ALL_FIELD_TYPES.geoarrow: arrowDataTypeToFieldType(field.type),
type: isGeometryColumn ? ALL_FIELD_TYPES.geoarrow : arrowDataTypeToFieldType(field.type),
analyzerType: isGeometryColumn
? AnalyzerDATA_TYPES.GEOMETRY
: arrowDataTypeToAnalyzerDataType(field.type),
valueAccessor: (dc: any) => d => {
return dc.valueAt(d.index, index);
}
},
metadata: field.metadata
});
});

const cols = [...Array(arrowTable.numCols).keys()].map(i => arrowTable.getChildAt(i));
// return empty rows and use raw arrow table to construct column-wise data container
return {fields, rows: [], rawData: arrowTable};
return {info: {format: 'Arrow'}, fields, rows: [], cols, metadata: arrowTable.schema.metadata};
}

export const DATASET_HANDLERS = {
[DATASET_FORMATS.row]: processRowObject,
[DATASET_FORMATS.geojson]: processGeojson,
[DATASET_FORMATS.csv]: processCsvData,
[DATASET_FORMATS.arrow]: processArrowTable,
[DATASET_FORMATS.keplergl]: processKeplerglDataset
};

export const Processors: {
processGeojson: typeof processGeojson;
processCsvData: typeof processCsvData;
processArrowTable: typeof processArrowTable;
processRowObject: typeof processRowObject;
processKeplerglJSON: typeof processKeplerglJSON;
processKeplerglDataset: typeof processKeplerglDataset;
Expand All @@ -471,6 +451,7 @@ export const Processors: {
} = {
processGeojson,
processCsvData,
processArrowTable,
processRowObject,
processKeplerglJSON,
processKeplerglDataset,
Expand Down
39 changes: 29 additions & 10 deletions src/processors/src/file-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

import {Table as ArrowTable} from 'apache-arrow';
import {parseInBatches} from '@loaders.gl/core';
import {JSONLoader, _JSONPath} from '@loaders.gl/json';
import {CSVLoader} from '@loaders.gl/csv';
import {processGeojson, processKeplerglJSON, processRowObject} from './data-processor';
import {ArrowLoader} from '@loaders.gl/arrow';
import {processArrowTable, processGeojson, processKeplerglJSON, processRowObject} from './data-processor';
import {generateHashId, isPlainObject} from '@kepler.gl/utils';
import {DATASET_FORMATS} from '@kepler.gl/constants';
import {Loader} from '@loaders.gl/loader-utils';
Expand All @@ -40,6 +42,10 @@ const CSV_LOADER_OPTIONS = {
dynamicTyping: false // not working for now
};

const ARROW_LOADER_OPTIONS = {
shape: 'arrow-table'
};

const JSON_LOADER_OPTIONS = {
shape: 'object-row-table',
// instruct loaders.gl on what json paths to stream
Expand All @@ -58,6 +64,14 @@ export type ProcessFileDataContent = {
metadata?: Map<string, string>;
};

export function isArrowTable(table: any): table is ArrowTable {
return Boolean(table && table instanceof ArrowTable);
}

export function isArrowData(data: any): boolean {
return Array.isArray(data) && Boolean(data[0].data && data[0].schema);
}

export function isGeoJson(json: unknown): json is Feature | FeatureCollection {
// json can be feature collection
// or single feature
Expand Down Expand Up @@ -94,7 +108,7 @@ export async function* makeProgressIterator(
let rowCount = 0;

for await (const batch of asyncIterator) {
const rowCountInBatch = (batch.data && batch.data.length) || 0;
const rowCountInBatch = (batch.data && (batch.data.length || batch.length)) || 0;
rowCount += rowCountInBatch;
const percent = Number.isFinite(batch.bytesUsed) ? batch.bytesUsed / info.size : null;

Expand All @@ -116,14 +130,13 @@ export async function* readBatch(
): AsyncGenerator {
let result = null;
const batches = <any>[];

for await (const batch of asyncIterator) {
// Last batch will have this special type and will provide all the root
// properties of the parsed document.
// Only json parse will have `FINAL_RESULT`
if (batch.batchType === BATCH_TYPE.FINAL_RESULT) {
if (batch.container) {
result = {...batch.container};
result = { ...batch.container };
}
// Set the streamed data correctly is Batch json path is set
// and the path streamed is not the top level object (jsonpath = '$')
Expand All @@ -136,14 +149,15 @@ export async function* readBatch(
result = batches;
}
} else {
for (let i = 0; i < batch.data?.length; i++) {
batches.push(batch.data[i]);
const batchData = isArrowTable(batch.data) ? batch.data.batches : batch.data;
for (let i = 0; i < batchData?.length; i++) {
batches.push(batchData[i]);
}
}

yield {
...batch,
...(batch.schema ? {headers: Object.keys(batch.schema)} : {}),
...(batch.schema ? { headers: Object.keys(batch.schema) } : {}),
fileName,
// if dataset is CSV, data is set to the raw batches
data: result ? result : batches
Expand All @@ -161,9 +175,10 @@ export async function readFileInBatches({
loaders: Loader[];
loadOptions: any;
}): Promise<AsyncGenerator> {
loaders = [JSONLoader, CSVLoader, ...loaders];
loaders = [JSONLoader, CSVLoader, ArrowLoader, ...loaders];
loadOptions = {
csv: CSV_LOADER_OPTIONS,
arrow: ARROW_LOADER_OPTIONS,
json: JSON_LOADER_OPTIONS,
metadata: true,
...loadOptions
Expand All @@ -183,11 +198,15 @@ export function processFileData({
fileCache: FileCacheItem[];
}): Promise<FileCacheItem[]> {
return new Promise((resolve, reject) => {
const {data} = content;
let {data} = content;
let format: string | undefined;
let processor: Function | undefined;

if (isKeplerGlMap(data)) {
if (isArrowData(data)) {
format = DATASET_FORMATS.arrow;
processor = processArrowTable;
}
else if (isKeplerGlMap(data)) {
format = DATASET_FORMATS.keplergl;
processor = processKeplerglJSON;
} else if (isRowObject(data)) {
Expand Down
Loading

0 comments on commit 6150d79

Please sign in to comment.