diff --git a/src/codegen.c b/src/codegen.c index 7249f7883..103920780 100644 --- a/src/codegen.c +++ b/src/codegen.c @@ -202,6 +202,7 @@ build_array_devtype_info(TypeCacheEntry *tcache, const char *ext_name) dtype->type_sizeof = sizeof(xpu_array_t); dtype->type_alignof = __alignof__(xpu_array_t); dtype->type_hashfunc = NULL; //devtype_array_hash; + dtype->type_element = elem; /* type equality functions */ dtype->type_eqfunc = get_opcode(tcache->eq_opr); dtype->type_cmpfunc = tcache->cmp_proc; @@ -1753,6 +1754,147 @@ codegen_casewhen_expression(codegen_context *context, return 0; } +/* + * codegen_scalar_array_op_expression + */ +static int +codegen_scalar_array_op_expression(codegen_context *context, + StringInfo buf, + ScalarArrayOpExpr *sa_op) +{ + Expr *expr_a, *expr_s; + devtype_info *dtype_a, *dtype_s, *dtype_e; + devfunc_info *dfunc; + Oid type_oid; + Oid func_oid; + Oid argtypes[2]; + int pos = -1, __pos = -1; + uint32_t slot_id; + kern_expression kexp; + + if (list_length(sa_op->args) != 2) + { + __Elog("ScalarArrayOpExpr is not binary operator, not supported"); + return -1; + } + expr_a = linitial(sa_op->args); + type_oid = exprType((Node *)expr_a); + dtype_a = pgstrom_devtype_lookup(type_oid); + if (!dtype_a) + __Elog("type %s is not device supported", format_type_be(type_oid)); + + expr_s = lsecond(sa_op->args); + type_oid = exprType((Node *)expr_s); + dtype_s = pgstrom_devtype_lookup(type_oid); + if (!dtype_s) + __Elog("type %s is not device supported", format_type_be(type_oid)); + + if (dtype_s->type_element == NULL && + dtype_a->type_element != NULL) + { + func_oid = get_opcode(sa_op->opno); + } + else if (dtype_s->type_element != NULL && + dtype_a->type_element == NULL) + { + /* swap arguments */ + Expr *expr_temp = expr_a; + devtype_info *dtype_temp = dtype_a; + Oid opcode; + + expr_a = expr_s; + dtype_a = dtype_s; + expr_s = expr_temp; + dtype_s = dtype_temp; + opcode = get_commutator(sa_op->opno); + func_oid = get_opcode(opcode); + } + else + { + __Elog("ScalarArrayOpExpr must be 'SCALAR = %s ARRAY' form", + sa_op->useOr ? "ANY" : "ALL"); + } + dtype_e = dtype_a->type_element; + argtypes[0] = dtype_s->type_oid; + argtypes[1] = dtype_e->type_oid; + dfunc = __pgstrom_devfunc_lookup(func_oid, + 2, argtypes, + sa_op->inputcollid); + if (!dfunc) + __Elog("function %s is not device supported", + format_procedure(func_oid)); + if (dfunc->func_rettype->type_oid != BOOLOID || + dfunc->func_nargs != 2) + __Elog("function %s is not a binary boolean function", + format_procedure(func_oid)); + /* allocation of kvars slot */ + slot_id = list_length(context->kvars_depth); + context->kvars_depth = lappend_int(context->kvars_depth, -1); + context->kvars_resno = lappend_int(context->kvars_resno, -1); + context->kvars_types = lappend_oid(context->kvars_types, InvalidOid); + context->kvars_exprs = lappend(context->kvars_exprs, /* dummy */ + makeNullConst(argtypes[0], -1, InvalidOid)); + + if (buf) + pos = __appendZeroStringInfo(buf, offsetof(kern_expression, + u.saop.data)); + /* 1st arg - array-expression to be walked on */ + if (codegen_expression_walker(context, buf, expr_a) < 0) + return -1; + /* 2nd arg - expression to be saved */ + if (buf) + { + devtype_info *dtype_e = dtype_a->type_element; + int __off; + + memset(&kexp, 0, sizeof(kexp)); + kexp.exptype = dfunc->func_rettype->type_code; + kexp.expflags = context->kexp_flags; + kexp.opcode = dfunc->func_code; + kexp.nr_args = 2; + kexp.args_offset = offsetof(kern_expression, u.data); + __pos = __appendBinaryStringInfo(buf, &kexp, + offsetof(kern_expression, u.data)); + + /* 1st arg of the 2nd arg (comparator function) */ + memset(&kexp, 0, sizeof(kexp)); + kexp.exptype = dtype_e->type_code; + kexp.expflags = context->kexp_flags; + kexp.opcode = FuncOpCode__VarExpr; + kexp.nr_args = 0; + kexp.u.v.var_typlen = dtype_e->type_length; + kexp.u.v.var_typbyval = dtype_e->type_byval; + kexp.u.v.var_typalign = dtype_e->type_align; + kexp.u.v.var_slot_id = slot_id; + __off = __appendBinaryStringInfo(buf, &kexp, SizeOfKernExprVar); + __appendKernExpMagicAndLength(buf, __off); + } + /* 2nd arg of the 2nd arg (comparator function) */ + if (codegen_expression_walker(context, buf, expr_s) < 0) + return -1; + if (buf) + { + memset(&kexp, 0, sizeof(kexp)); + kexp.exptype = TypeOpCode__bool; + kexp.expflags = context->kexp_flags; + kexp.opcode = (sa_op->useOr + ? FuncOpCode__ScalarArrayOpAny + : FuncOpCode__ScalarArrayOpAll); + kexp.nr_args = 2; + kexp.args_offset = offsetof(kern_expression, u.saop.data); + kexp.u.saop.slot_id = slot_id; + kexp.u.saop.slot_bufsz = dtype_e->type_sizeof; + kexp.u.saop.elem_byval = dtype_e->type_byval; + kexp.u.saop.elem_align = dtype_e->type_align; + kexp.u.saop.elem_len = dtype_e->type_length; + memcpy(buf->data + pos, &kexp, offsetof(kern_expression, + u.saop.data)); + __appendKernExpMagicAndLength(buf, __pos); + __appendKernExpMagicAndLength(buf, pos); + } + return 0; +} + /* * is_expression_equals_tlist * @@ -1870,9 +2012,10 @@ codegen_expression_walker(codegen_context *context, return codegen_relabel_expression(context, buf, (RelabelType *)expr); case T_CaseExpr: return codegen_casewhen_expression(context, buf, (CaseExpr *)expr); -// case T_CaseTestExpr: - case T_CoerceToDomain: case T_ScalarArrayOpExpr: + return codegen_scalar_array_op_expression(context, buf, + (ScalarArrayOpExpr *)expr); + case T_CoerceToDomain: default: __Elog("not a supported expression type: %s", nodeToString(expr)); } @@ -3429,7 +3572,10 @@ __xpucode_to_cstring(StringInfo buf, List *dcontext) /* optionsl */ { const kern_expression *karg; - int i, pos; + devfunc_info *dfunc; + devtype_info *dtype; + const char *label; + int i, pos; switch (kexp->opcode) { @@ -3589,6 +3735,17 @@ __xpucode_to_cstring(StringInfo buf, appendStringInfo(buf, "}"); return; + case FuncOpCode__ScalarArrayOpAny: + case FuncOpCode__ScalarArrayOpAll: + Assert(kexp->nr_args == 2); + if (kexp->opcode == FuncOpCode__ScalarArrayOpAny) + label = "Any"; + else + label = "All"; + appendStringInfo(buf, "{ScalarArrayOp%s slot=%d:", + label, kexp->u.saop.slot_id); + break; + default: { static struct { @@ -3601,9 +3758,8 @@ __xpucode_to_cstring(StringInfo buf, #include "xpu_opcodes.h" {FuncOpCode__Invalid,NULL,NULL} }; - devfunc_info *dfunc = devfunc_lookup_by_opcode(kexp->opcode); - devtype_info *dtype; + dfunc = devfunc_lookup_by_opcode(kexp->opcode); if (dfunc) { dtype = devtype_lookup_by_opcode(kexp->exptype); diff --git a/src/relscan.c b/src/relscan.c index 0c6123a76..68c5889b6 100644 --- a/src/relscan.c +++ b/src/relscan.c @@ -126,6 +126,7 @@ __setup_kern_colmeta(kern_data_store *kds, int *p_attcacheoff) { kern_colmeta *cmeta = &kds->colmeta[column_index]; + devtype_info *dtype = pgstrom_devtype_lookup(atttypid); TypeCacheEntry *tcache; memset(cmeta, 0, sizeof(kern_colmeta)); @@ -250,6 +251,7 @@ __setup_kern_colmeta(kern_data_store *kds, /* * for the reverse references to KDS */ + cmeta->dtype_sizeof = (dtype ? dtype->type_sizeof : 0); cmeta->kds_format = kds->format; cmeta->kds_offset = (char *)cmeta - (char *)kds; } @@ -308,6 +310,8 @@ setup_kern_data_store(kern_data_store *kds, cmeta->atttypid = InvalidOid; cmeta->atttypmod = -1; cmeta->atttypkind = TYPE_KIND__BASE; + cmeta->kds_format = kds->format; + cmeta->kds_offset = (uint32_t)((char *)cmeta - (char *)kds); strcpy(cmeta->attname, "__gcache_sysattr__"); } return MAXALIGN(offsetof(kern_data_store, colmeta[kds->nr_colmeta])); diff --git a/src/xpu_common.cu b/src/xpu_common.cu index dabeac8cd..aa2afbfbf 100644 --- a/src/xpu_common.cu +++ b/src/xpu_common.cu @@ -11,866 +11,970 @@ */ #include "xpu_common.h" -/* - * Const Expression +/* ---------------------------------------------------------------- + * + * LoadVars / Projection Routines + * + * ---------------------------------------------------------------- */ STATIC_FUNCTION(bool) -pgfn_ConstExpr(XPU_PGFUNCTION_ARGS) +__extract_heap_tuple_attr(kern_context *kcxt, + const kern_data_store *kds, + const kern_colmeta *cmeta, + const kern_vars_defitem *kvdef, + const char *addr) { - if (!kexp->u.c.const_isnull) + uint32_t slot_id = kvdef->var_slot_id; + + if (slot_id >= kcxt->kvars_nslots) { - const xpu_datum_operators *expr_ops = kexp->expr_ops; - int typlen = expr_ops->xpu_type_length; - kern_variable kvar; + STROM_ELOG(kcxt, "kvars::slot_id is out of range"); + return false; + } + else if (!addr) + { + kcxt->kvars_class[slot_id] = KVAR_CLASS__NULL; + } + else if (cmeta->atttypkind == TYPE_KIND__ARRAY) + { + /* special case if xpu_array_t */ + xpu_array_t *array; - kvar.ptr = (void *)kexp->u.c.const_value; - if (typlen >= 0) - return expr_ops->xpu_datum_ref(kcxt, __result, typlen, &kvar); - else if (typlen == -1) - return expr_ops->xpu_datum_ref(kcxt, __result, KVAR_CLASS__VARLENA, &kvar); - else - { - STROM_ELOG(kcxt, "Bug? ConstExpr has unknown type length"); - return false; - } + assert(kvdef->var_slot_off > 0 && + kvdef->var_slot_off + sizeof(xpu_array_t) <= kcxt->kvars_nbytes); + assert((char *)kds + cmeta->kds_offset == (char *)cmeta); + array = (xpu_array_t *) + ((char *)kcxt->kvars_slot + kvdef->var_slot_off); + memset(array, 0, sizeof(xpu_array_t)); + array->expr_ops = &xpu_array_ops; + array->length = -1; + array->u.heap.value = (const varlena *)addr; + } + else if (cmeta->atttypkind == TYPE_KIND__COMPOSITE) + { + /* special case if xpu_composite_t */ + xpu_composite_t *comp; + + assert(kvdef->var_slot_off > 0 && + kvdef->var_slot_off + sizeof(xpu_composite_t) <= kcxt->kvars_nbytes); + assert((char *)kds + cmeta->kds_offset == (char *)cmeta); + comp = (xpu_composite_t *) + ((char *)kcxt->kvars_slot + kvdef->var_slot_off); + comp->expr_ops = &xpu_composite_ops; + comp->comp_typid = cmeta->atttypid; + comp->comp_typmod = cmeta->atttypmod; + comp->rowidx = 0; + comp->nfields = cmeta->num_subattrs; + comp->smeta = kds->colmeta + cmeta->idx_subattrs;; + comp->value = (const varlena *)addr; + } + else if (cmeta->attbyval) + { + assert(cmeta->attlen > 0 && cmeta->attlen <= sizeof(kern_variable)); + kcxt->kvars_class[slot_id] = KVAR_CLASS__INLINE; + memcpy(&kcxt->kvars_slot[slot_id], addr, cmeta->attlen); + } + else if (cmeta->attlen > 0) + { + kcxt->kvars_class[slot_id] = cmeta->attlen; + kcxt->kvars_slot[slot_id].ptr = (void *)addr; + } + else if (cmeta->attlen == -1) + { + kcxt->kvars_class[slot_id] = KVAR_CLASS__VARLENA; + kcxt->kvars_slot[slot_id].ptr = (void *)addr; + } + else + { + STROM_ELOG(kcxt, "not a supported attribute length"); + return false; } - __result->expr_ops = NULL; return true; } -STATIC_FUNCTION(bool) -pgfn_ParamExpr(XPU_PGFUNCTION_ARGS) +INLINE_FUNCTION(bool) +__extract_heap_tuple_sysattr(kern_context *kcxt, + const kern_data_store *kds, + const HeapTupleHeaderData *htup, + const kern_vars_defitem *kvdef) { - kern_session_info *session = kcxt->session; - uint32_t param_id = kexp->u.p.param_id; + uint32_t slot_id = kvdef->var_slot_id; - if (param_id < session->nparams && session->poffset[param_id] != 0) + /* out of range? */ + if (slot_id >= kcxt->kvars_nslots) + return true; + switch (kvdef->var_resno) { - const xpu_datum_operators *expr_ops = kexp->expr_ops; - int typlen = expr_ops->xpu_type_length; - kern_variable kvar; - - kvar.ptr = ((char *)session + session->poffset[param_id]); - if (typlen >= 0) - return expr_ops->xpu_datum_ref(kcxt, __result, typlen, &kvar); - else if (typlen == -1) - return expr_ops->xpu_datum_ref(kcxt, __result, KVAR_CLASS__VARLENA, &kvar); - else - { - STROM_ELOG(kcxt, "Bug? ParamExpr has unknown type length"); + case SelfItemPointerAttributeNumber: + kcxt->kvars_slot[slot_id].ptr = (void *)&htup->t_ctid; + kcxt->kvars_class[slot_id] = sizeof(ItemPointerData); + break; + case MinTransactionIdAttributeNumber: + kcxt->kvars_slot[slot_id].u32 = htup->t_choice.t_heap.t_xmin; + kcxt->kvars_class[slot_id] = KVAR_CLASS__INLINE; + break; + case MaxTransactionIdAttributeNumber: + kcxt->kvars_slot[slot_id].u32 = htup->t_choice.t_heap.t_xmax; + kcxt->kvars_class[slot_id] = KVAR_CLASS__INLINE; + break; + case MinCommandIdAttributeNumber: + case MaxCommandIdAttributeNumber: + kcxt->kvars_slot[slot_id].u32 = htup->t_choice.t_heap.t_field3.t_cid; + kcxt->kvars_class[slot_id] = KVAR_CLASS__INLINE; + break; + case TableOidAttributeNumber: + kcxt->kvars_slot[slot_id].u32 = kds->table_oid; + kcxt->kvars_class[slot_id] = KVAR_CLASS__INLINE; + break; + default: + STROM_ELOG(kcxt, "not a supported system attribute reference"); return false; - } } - __result->expr_ops = NULL; return true; } STATIC_FUNCTION(bool) -pgfn_VarExpr(XPU_PGFUNCTION_ARGS) +kern_extract_heap_tuple(kern_context *kcxt, + const kern_data_store *kds, + const HeapTupleHeaderData *htup, + const kern_vars_defitem *kvars_items, + int kvars_nloads) { - uint32_t slot_id = kexp->u.v.var_slot_id; + const kern_vars_defitem *kvdef = kvars_items; + uint32_t offset = htup->t_hoff; + int resno = 1; + int kvars_count = 0; + int ncols = Min(htup->t_infomask2 & HEAP_NATTS_MASK, kds->ncols); + bool heap_hasnull = ((htup->t_infomask & HEAP_HASNULL) != 0); - if (slot_id < kcxt->kvars_nslots) + /* extract system attributes, if rquired */ + while (kvars_count < kvars_nloads && + kvdef->var_resno < 0) { - const xpu_datum_operators *expr_ops = kexp->expr_ops; - kern_variable *kvar = &kcxt->kvars_slot[slot_id]; - int vclass = kcxt->kvars_class[slot_id]; - - switch (vclass) + if (!__extract_heap_tuple_sysattr(kcxt, kds, htup, kvdef)) + return; + kvdef++; + kvars_count++; + } + /* try attcacheoff shortcut, if available. */ + if (!heap_hasnull) + { + while (kvars_count < kvars_nloads && + kvdef->var_resno > 0 && + kvdef->var_resno <= ncols) { - case KVAR_CLASS__NULL: - __result->expr_ops = NULL; - return true; - - case KVAR_CLASS__XPU_DATUM: - assert(((const xpu_datum_t *)kvar->ptr)->expr_ops == expr_ops); - memcpy(__result, kvar->ptr, expr_ops->xpu_type_sizeof); - return true; + const kern_colmeta *cmeta = &kds->colmeta[kvdef->var_resno-1]; + char *addr; - default: - if (vclass < 0) - { - STROM_ELOG(kcxt, "Bug? KVAR_CLASS__* mismatch"); - return false; - } - case KVAR_CLASS__INLINE: - case KVAR_CLASS__VARLENA: - return expr_ops->xpu_datum_ref(kcxt, __result, vclass, kvar); + if (cmeta->attcacheoff < 0) + break; + offset = htup->t_hoff + cmeta->attcacheoff; + addr = (char *)htup + offset; + if (!__extract_heap_tuple_attr(kcxt, kds, cmeta, kvdef, addr)) + return false; + /* next resno */ + resno = kvdef->var_resno + 1; + if (cmeta->attlen > 0) + offset += cmeta->attlen; + else + offset += VARSIZE_ANY(addr); + kvdef++; + kvars_count++; } } - STROM_ELOG(kcxt, "Bug? slot_id is out of range"); - return false; -} - -STATIC_FUNCTION(bool) -pgfn_BoolExprAnd(XPU_PGFUNCTION_ARGS) -{ - xpu_bool_t *result = (xpu_bool_t *)__result; - int i; - bool anynull = false; - const kern_expression *karg; - assert(kexp->exptype == TypeOpCode__bool); - for (i=0, karg=KEXP_FIRST_ARG(kexp); - i < kexp->nr_args; - i++, karg=KEXP_NEXT_ARG(karg)) + /* extract slow path */ + while (resno <= ncols && kvars_count < kvars_nloads) { - xpu_bool_t status; + const kern_colmeta *cmeta = &kds->colmeta[resno-1]; + char *addr; - assert(KEXP_IS_VALID(karg, bool)); - if (!EXEC_KERN_EXPRESSION(kcxt, karg, &status)) - return false; - if (XPU_DATUM_ISNULL(&status)) - anynull = true; - else if (!status.value) + if (heap_hasnull && att_isnull(resno-1, htup->t_bits)) { - result->expr_ops = kexp->expr_ops; - result->value = false; - return true; + addr = NULL; } - } - result->expr_ops = (anynull ? NULL : kexp->expr_ops); - result->value = true; - return true; -} - -STATIC_FUNCTION(bool) -pgfn_BoolExprOr(XPU_PGFUNCTION_ARGS) -{ - xpu_bool_t *result = (xpu_bool_t *)__result; - int i; - bool anynull = false; - const kern_expression *karg; + else + { + if (cmeta->attlen > 0) + offset = TYPEALIGN(cmeta->attalign, offset); + else if (!VARATT_NOT_PAD_BYTE((char *)htup + offset)) + offset = TYPEALIGN(cmeta->attalign, offset); - assert(kexp->exptype == TypeOpCode__bool); - for (i=0, karg=KEXP_FIRST_ARG(kexp); - i < kexp->nr_args; - i++, karg=KEXP_NEXT_ARG(karg)) + addr = ((char *)htup + offset); + if (cmeta->attlen > 0) + offset += cmeta->attlen; + else + offset += VARSIZE_ANY(addr); + } + + if (kvdef->var_resno == resno) + { + if (!__extract_heap_tuple_attr(kcxt, kds, cmeta, kvdef, addr)) + return false; + kvdef++; + kvars_count++; + } + resno++; + } + /* fill-up by NULLs for the remained slot */ + while (kvars_count < kvars_nloads) { - xpu_bool_t status; + uint32_t slot_id = kvdef->var_slot_id; - assert(KEXP_IS_VALID(karg, bool)); - if (!EXEC_KERN_EXPRESSION(kcxt, karg, &status)) - return false; - if (XPU_DATUM_ISNULL(&status)) - anynull = true; - else if (status.value) + if (slot_id < kcxt->kvars_nslots) { - result->expr_ops = kexp->expr_ops; - result->value = true; - return true; + kcxt->kvars_slot[slot_id].ptr = NULL; + kcxt->kvars_class[slot_id] = KVAR_CLASS__NULL; } + kvdef++; + kvars_count++; } - result->expr_ops = (anynull ? NULL : kexp->expr_ops); - result->value = false; return true; } -STATIC_FUNCTION(bool) -pgfn_BoolExprNot(XPU_PGFUNCTION_ARGS) +/* + * Routines to extract Arrow data store + */ +INLINE_FUNCTION(bool) +arrow_bitmap_check(const kern_data_store *kds, + uint32_t kds_index, + uint32_t bitmap_offset, + uint32_t bitmap_length) { - xpu_bool_t *result = (xpu_bool_t *)__result; - xpu_bool_t status; - const kern_expression *karg = KEXP_FIRST_ARG(kexp); + uint8_t *bitmap; + uint8_t mask = (1<<(kds_index & 7)); + uint32_t idx = (kds_index >> 3); - assert(kexp->exptype == TypeOpCode__bool && - kexp->nr_args == 1 && KEXP_IS_VALID(karg, bool)); - if (!EXEC_KERN_EXPRESSION(kcxt, karg, &status)) + if (bitmap_offset == 0 || /* no bitmap */ + bitmap_length == 0 || /* no bitmap */ + idx >= __kds_unpack(bitmap_length)) /* out of range */ return false; - if (XPU_DATUM_ISNULL(&status)) - result->expr_ops = NULL; - else - { - result->expr_ops = kexp->expr_ops; - result->value = !result->value; - } - return true; + bitmap = (uint8_t *)kds + __kds_unpack(bitmap_offset); + + return (bitmap[idx] & mask) != 0; } STATIC_FUNCTION(bool) -pgfn_NullTestExpr(XPU_PGFUNCTION_ARGS) +arrow_fetch_secondary_index(kern_context *kcxt, + const kern_data_store *kds, + uint32_t kds_index, + uint32_t values_offset, + uint32_t values_length, + bool is_large_offset, + uint64_t *p_start, + uint64_t *p_end) { - xpu_bool_t *result = (xpu_bool_t *)__result; - xpu_datum_t *status; - const kern_expression *karg = KEXP_FIRST_ARG(kexp); - - assert(kexp->exptype == TypeOpCode__bool && - kexp->nr_args == 1 && __KEXP_IS_VALID(kexp, karg)); - status = (xpu_datum_t *)alloca(karg->expr_ops->xpu_type_sizeof); - if (!EXEC_KERN_EXPRESSION(kcxt, karg, status)) + if (!values_offset || !values_length) + { + STROM_ELOG(kcxt, "Arrow variable index/buffer is missing"); return false; - result->expr_ops = kexp->expr_ops; - switch (kexp->opcode) + } + + if (is_large_offset) { - case FuncOpCode__NullTestExpr_IsNull: - result->value = XPU_DATUM_ISNULL(status); - break; - case FuncOpCode__NullTestExpr_IsNotNull: - result->value = !XPU_DATUM_ISNULL(status); - break; - default: - STROM_ELOG(kcxt, "corrupted kernel expression"); + uint64_t *base = (uint64_t *)((char *)kds + __kds_unpack(values_offset)); + + if (sizeof(uint64_t) * (kds_index+2) > __kds_unpack(values_length)) + { + STROM_ELOG(kcxt, "Arrow variable index[64bit] out of range"); + return false; + } + *p_start = base[kds_index]; + *p_end = base[kds_index+1]; + } + else + { + uint32_t *base = (uint32_t *)((char *)kds + __kds_unpack(values_offset)); + + if (sizeof(uint32_t) * (kds_index+2) > __kds_unpack(values_length)) + { + STROM_ELOG(kcxt, "Arrow variable index[32bit] out of range"); return false; + } + *p_start = base[kds_index]; + *p_end = base[kds_index+1]; + } return true; } STATIC_FUNCTION(bool) -pgfn_BoolTestExpr(XPU_PGFUNCTION_ARGS) +__arrow_fetch_bool_datum(kern_context *kcxt, + const kern_data_store *kds, + const kern_colmeta *cmeta, + uint32_t kds_index, + kern_variable *kvar, + int *vclass) { - xpu_bool_t *result = (xpu_bool_t *)__result; - xpu_bool_t status; - const kern_expression *karg = KEXP_FIRST_ARG(kexp); + assert(cmeta->extra_offset == 0 && + cmeta->extra_length == 0); + kvar->i8 = arrow_bitmap_check(kds, kds_index, + cmeta->values_offset, + cmeta->values_length); + *vclass = KVAR_CLASS__INLINE; + return true; +} - assert(kexp->exptype == TypeOpCode__bool && - kexp->nr_args == 1 && KEXP_IS_VALID(karg, bool)); - if (!EXEC_KERN_EXPRESSION(kcxt, karg, &status)) - return false; - result->expr_ops = kexp->expr_ops; - switch (kexp->opcode) +STATIC_FUNCTION(bool) +__arrow_fetch_int_datum(kern_context *kcxt, + const kern_data_store *kds, + const kern_colmeta *cmeta, + uint32_t kds_index, + kern_variable *kvar, + int *vclass) +{ + size_t values_length = __kds_unpack(cmeta->values_length); + + assert(cmeta->extra_offset == 0 && + cmeta->extra_length == 0); + switch (cmeta->attopts.integer.bitWidth) { - case FuncOpCode__BoolTestExpr_IsTrue: - result->value = (!XPU_DATUM_ISNULL(&status) && status.value); - break; - case FuncOpCode__BoolTestExpr_IsNotTrue: - result->value = (XPU_DATUM_ISNULL(&status) || !status.value); - break; - case FuncOpCode__BoolTestExpr_IsFalse: - result->value = (!XPU_DATUM_ISNULL(&status) && !status.value); + case 8: + if (cmeta->values_offset && + sizeof(uint8_t) * (kds_index+1) <= values_length) + { + uint8_t *base = (uint8_t *) + ((char *)kds + __kds_unpack(cmeta->values_offset)); + kvar->u8 = base[kds_index]; + *vclass = KVAR_CLASS__INLINE; + return true; + } break; - case FuncOpCode__BoolTestExpr_IsNotFalse: - result->value = (XPU_DATUM_ISNULL(&status) || status.value); + case 16: + if (cmeta->values_offset && + sizeof(uint16_t) * (kds_index+1) <= values_length) + { + uint16_t *base = (uint16_t *) + ((char *)kds + __kds_unpack(cmeta->values_offset)); + kvar->u16 = base[kds_index]; + *vclass = KVAR_CLASS__INLINE; + return true; + } break; - case FuncOpCode__BoolTestExpr_IsUnknown: - result->value = XPU_DATUM_ISNULL(&status); + case 32: + if (cmeta->values_offset && + sizeof(uint32_t) * (kds_index+1) <= values_length) + { + uint32_t *base = (uint32_t *) + ((char *)kds + __kds_unpack(cmeta->values_offset)); + kvar->u32 = base[kds_index]; + *vclass = KVAR_CLASS__INLINE; + return true; + } break; - case FuncOpCode__BoolTestExpr_IsNotUnknown: - result->value = !XPU_DATUM_ISNULL(&status); + case 64: + if (cmeta->values_offset && + sizeof(uint64_t) * (kds_index+1) <= values_length) + { + uint64_t *base = (uint64_t *) + ((char *)kds + __kds_unpack(cmeta->values_offset)); + kvar->u64 = base[kds_index]; + *vclass = KVAR_CLASS__INLINE; + } break; default: - STROM_ELOG(kcxt, "corrupted kernel expression"); + STROM_ELOG(kcxt, "Arrow::Int unsupported bitWidth"); return false; } - return true; + *vclass = KVAR_CLASS__NULL; + return true; } STATIC_FUNCTION(bool) -pgfn_CoalesceExpr(XPU_PGFUNCTION_ARGS) +__arrow_fetch_float_datum(kern_context *kcxt, + const kern_data_store *kds, + const kern_colmeta *cmeta, + uint32_t kds_index, + kern_variable *kvar, + int *vclass) { - const kern_expression *karg; - int i; + size_t values_length = __kds_unpack(cmeta->values_length); - for (i=0, karg = KEXP_FIRST_ARG(kexp); - i < kexp->nr_args; - i++, karg = KEXP_NEXT_ARG(karg)) + assert(cmeta->extra_offset == 0 && + cmeta->extra_length == 0); + switch (cmeta->attopts.floating_point.precision) { - assert(__KEXP_IS_VALID(kexp, karg) && kexp->exptype == karg->exptype); - if (!EXEC_KERN_EXPRESSION(kcxt, karg, __result)) - return false; - if (!XPU_DATUM_ISNULL(__result)) - return true; - } - __result->expr_ops = NULL; - return true; -} - -STATIC_FUNCTION(bool) -pgfn_LeastExpr(XPU_PGFUNCTION_ARGS) -{ - const xpu_datum_operators *kexp_ops = kexp->expr_ops; - const kern_expression *karg; - xpu_datum_t *temp; - int comp; - int i, sz = kexp_ops->xpu_type_sizeof; - - temp = (xpu_datum_t *)alloca(sz); - memset(temp, 0, sz); - __result->expr_ops = NULL; - for (i=0, karg = KEXP_FIRST_ARG(kexp); - i < kexp->nr_args; - i++, karg = KEXP_NEXT_ARG(karg)) - { - assert(__KEXP_IS_VALID(kexp, karg) && kexp->exptype == karg->exptype); - if (!EXEC_KERN_EXPRESSION(kcxt, karg, temp)) + case ArrowPrecision__Half: + if (cmeta->values_offset && + sizeof(float2_t) * (kds_index+1) <= cmeta->values_length) + { + float2_t *base = (float2_t *) + ((char *)kds + __kds_unpack(cmeta->values_offset)); + kvar->fp16 = base[kds_index]; + *vclass = KVAR_CLASS__INLINE; + return true; + } + break; + case ArrowPrecision__Single: + if (cmeta->values_offset && + sizeof(float4_t) * (kds_index+1) <= values_length) + { + float4_t *base = (float4_t *) + ((char *)kds + __kds_unpack(cmeta->values_offset)); + kvar->fp32 = base[kds_index]; + *vclass = KVAR_CLASS__INLINE; + return true; + } + break; + case ArrowPrecision__Double: + if (cmeta->values_offset && + sizeof(float8_t) * (kds_index+1) <= values_length) + { + float8_t *base = (float8_t *) + ((char *)kds + __kds_unpack(cmeta->values_offset)); + kvar->fp64 = base[kds_index]; + *vclass = KVAR_CLASS__INLINE; + return true; + } + break; + default: + STROM_ELOG(kcxt, "Arrow::FloatingPoint unsupported precision"); return false; - if (XPU_DATUM_ISNULL(temp)) - continue; - - if (XPU_DATUM_ISNULL(__result)) - { - memcpy(__result, temp, sz); - } - else - { - if (!kexp_ops->xpu_datum_comp(kcxt, &comp, __result, temp)) - return false; - if (comp > 0) - memcpy(__result, temp, sz); - } } + *vclass = KVAR_CLASS__NULL; return true; } STATIC_FUNCTION(bool) -pgfn_GreatestExpr(XPU_PGFUNCTION_ARGS) +__arrow_fetch_decimal_datum(kern_context *kcxt, + const kern_data_store *kds, + const kern_colmeta *cmeta, + uint32_t kds_index, + kern_variable *kvar, + int *vclass, + char *slot_buf) { - const xpu_datum_operators *kexp_ops = kexp->expr_ops; - const kern_expression *karg; - xpu_datum_t *temp; - int comp; - int i, sz = kexp_ops->xpu_type_sizeof; - - temp = (xpu_datum_t *)alloca(sz); - memset(temp, 0, sz); - __result->expr_ops = NULL; - for (i=0, karg = KEXP_FIRST_ARG(kexp); - i < kexp->nr_args; - i++, karg = KEXP_NEXT_ARG(karg)) + assert(cmeta->extra_offset == 0 && + cmeta->extra_length == 0 && + slot_buf != NULL); + if (cmeta->attopts.decimal.bitWidth != 128) { - assert(__KEXP_IS_VALID(kexp, karg) && kexp->exptype == karg->exptype); - if (!EXEC_KERN_EXPRESSION(kcxt, karg, temp)) - return false; - if (XPU_DATUM_ISNULL(temp)) - continue; + STROM_ELOG(kcxt, "Arrow::Decimal unsupported bitWidth"); + return false; + } + if (cmeta->values_offset && + sizeof(int128_t) * (kds_index+1) <= __kds_unpack(cmeta->values_length)) + { + xpu_numeric_t *num = (xpu_numeric_t *)slot_buf; + int128_t *base = (int128_t *) + ((char *)kds + __kds_unpack(cmeta->values_offset)); - if (XPU_DATUM_ISNULL(__result)) - { - memcpy(__result, temp, sz); - } - else - { - if (!kexp_ops->xpu_datum_comp(kcxt, &comp, __result, temp)) - return false; - if (comp < 0) - memcpy(__result, temp, sz); - } + assert((((uintptr_t)base) & (sizeof(int128_t)-1)) == 0); + set_normalized_numeric(num, base[kds_index], + cmeta->attopts.decimal.scale); + *vclass = KVAR_CLASS__XPU_DATUM; + kvar->ptr = num; + } + else + { + *vclass = KVAR_CLASS__NULL; } return true; } STATIC_FUNCTION(bool) -pgfn_CaseWhenExpr(XPU_PGFUNCTION_ARGS) +__arrow_fetch_date_datum(kern_context *kcxt, + const kern_data_store *kds, + const kern_colmeta *cmeta, + uint32_t kds_index, + kern_variable *kvar, + int *vclass) { - const kern_expression *karg; - xpu_datum_t *comp = NULL; - xpu_datum_t *temp = NULL; - int i, temp_sz = 0; - - /* CASE expression, if any */ - if (kexp->u.casewhen.case_comp) - { - karg = (const kern_expression *) - ((char *)kexp + kexp->u.casewhen.case_comp); - assert(__KEXP_IS_VALID(kexp, karg)); - comp = (xpu_datum_t *)alloca(karg->expr_ops->xpu_type_sizeof); - if (!EXEC_KERN_EXPRESSION(kcxt, karg, comp)) - return false; - } + size_t values_length = __kds_unpack(cmeta->values_length); - /* evaluate each WHEN-clauses */ - assert((kexp->nr_args % 2) == 0); - for (i = 0, karg=KEXP_FIRST_ARG(kexp); - i < kexp->nr_args; - i += 2, karg=KEXP_NEXT_ARG(karg)) + assert(cmeta->extra_offset == 0 && + cmeta->extra_length == 0); + switch (cmeta->attopts.date.unit) { - bool matched = false; - - assert(__KEXP_IS_VALID(kexp, karg)); - if (comp) - { - int status; - - if (temp_sz < karg->expr_ops->xpu_type_sizeof) + case ArrowDateUnit__Day: + if (cmeta->values_offset && + sizeof(uint32_t) * (kds_index+1) <= values_length) { - temp_sz = karg->expr_ops->xpu_type_sizeof + 32; - temp = (xpu_datum_t *)alloca(temp_sz); + uint32_t *base = (uint32_t *) + ((char *)kds + __kds_unpack(cmeta->values_offset)); + kvar->u32 = base[kds_index] + - (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE); + *vclass = KVAR_CLASS__INLINE; + return true; } - if (!EXEC_KERN_EXPRESSION(kcxt, karg, temp)) - return false; - if (!karg->expr_ops->xpu_datum_comp(kcxt, &status, comp, temp)) - return false; - if (status == 0) - matched = true; - } - else - { - xpu_bool_t status; - - if (!EXEC_KERN_EXPRESSION(kcxt, karg, &status)) - return false; - if (!XPU_DATUM_ISNULL(&status) && status.value) - matched = true; - } + break; - karg = KEXP_NEXT_ARG(karg); - assert(__KEXP_IS_VALID(kexp, karg)); - if (matched) - { - assert(kexp->exptype == karg->exptype); - if (!EXEC_KERN_EXPRESSION(kcxt, karg, __result)) - return false; - /* OK */ - return true; - } - } + case ArrowDateUnit__MilliSecond: + if (cmeta->values_offset && + sizeof(uint64_t) * (kds_index+1) <= values_length) + { + uint64_t *base = (uint64_t *) + ((char *)kds + __kds_unpack(cmeta->values_offset)); + kvar->u32 = base[kds_index] / (SECS_PER_DAY * 1000) + - (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE); + *vclass = KVAR_CLASS__INLINE; + return true; + } + break; - /* ELSE clause, if any */ - if (kexp->u.casewhen.case_else == 0) - __result->expr_ops = NULL; - else - { - karg = (const kern_expression *) - ((char *)kexp + kexp->u.casewhen.case_else); - assert(__KEXP_IS_VALID(kexp, karg) && kexp->exptype == karg->exptype); - if (!EXEC_KERN_EXPRESSION(kcxt, karg, __result)) + default: + STROM_ELOG(kcxt, "unknown unit size of Arrow::Date"); return false; } + *vclass = KVAR_CLASS__NULL; return true; } -/* ---------------------------------------------------------------- - * - * Routines to support Projection - * - * ---------------------------------------------------------------- - */ -PUBLIC_FUNCTION(int) -kern_form_heaptuple(kern_context *kcxt, - const kern_expression *kproj, - const kern_data_store *kds_dst, - HeapTupleHeaderData *htup) +STATIC_FUNCTION(bool) +__arrow_fetch_time_datum(kern_context *kcxt, + const kern_data_store *kds, + const kern_colmeta *cmeta, + uint32_t kds_index, + kern_variable *kvar, + int *vclass) { - uint32_t t_hoff; - uint32_t t_next; - uint16_t t_infomask = 0; - bool t_hasnull = false; - int nattrs = kproj->u.proj.nattrs; + size_t values_length = __kds_unpack(cmeta->values_length); - if (kds_dst && kds_dst->ncols < nattrs) - nattrs = kds_dst->ncols; - /* has any NULL attributes? */ - for (int j=0; j < nattrs; j++) + assert(cmeta->extra_offset == 0 && + cmeta->extra_length == 0); + switch (cmeta->attopts.time.unit) { - uint32_t slot_id = kproj->u.proj.desc[j].slot_id; - - assert(slot_id < kcxt->kvars_nslots); - if (kcxt->kvars_class[slot_id] == KVAR_CLASS__NULL) - { - t_infomask |= HEAP_HASNULL; - t_hasnull = true; + case ArrowTimeUnit__Second: + if (cmeta->values_offset && + sizeof(int32_t) * (kds_index+1) <= values_length) + { + int32_t *base = (int32_t *) + ((char *)kds + __kds_unpack(cmeta->values_offset)); + kvar->i64 = (int64_t)base[kds_index] * 1000000L; + *vclass = KVAR_CLASS__INLINE; + return true; + } + break; + + case ArrowTimeUnit__MilliSecond: + if (cmeta->values_offset && + sizeof(int32_t) * (kds_index+1) <= values_length) + { + int32_t *base = (int32_t *) + ((char *)kds + __kds_unpack(cmeta->values_offset)); + kvar->i64 = (int64_t)base[kds_index] * 1000L; + *vclass = KVAR_CLASS__INLINE; + return true; + } break; - } - } - /* set up headers */ - t_hoff = offsetof(HeapTupleHeaderData, t_bits); - if (t_hasnull) - t_hoff += BITMAPLEN(nattrs); - t_hoff = MAXALIGN(t_hoff); + case ArrowTimeUnit__MicroSecond: + if (cmeta->values_offset && + sizeof(int64_t) * (kds_index+1) <= values_length) + { + int64_t *base = (int64_t *) + ((char *)kds + __kds_unpack(cmeta->values_offset)); + kvar->i64 = base[kds_index]; + *vclass = KVAR_CLASS__INLINE; + return true; + } + break; - if (htup) - { - memset(htup, 0, t_hoff); - htup->t_choice.t_datum.datum_typmod = kds_dst->tdtypmod; - htup->t_choice.t_datum.datum_typeid = kds_dst->tdtypeid; - htup->t_ctid.ip_blkid.bi_hi = 0xffff; /* InvalidBlockNumber */ - htup->t_ctid.ip_blkid.bi_lo = 0xffff; - htup->t_ctid.ip_posid = 0; /* InvalidOffsetNumber */ - htup->t_infomask2 = (nattrs & HEAP_NATTS_MASK); - htup->t_hoff = t_hoff; - } + case ArrowTimeUnit__NanoSecond: + if (cmeta->values_offset && + sizeof(int64_t) * (kds_index+1) <= values_length) + { + int64_t *base = (int64_t *) + ((char *)kds + __kds_unpack(cmeta->values_offset)); + kvar->i64 = base[kds_index] / 1000L; + *vclass = KVAR_CLASS__INLINE; + return true; + } + break; - /* walk on the columns */ - for (int j=0; j < nattrs; j++) - { - const kern_colmeta *cmeta = &kds_dst->colmeta[j]; - const kern_projection_desc *pdesc = &kproj->u.proj.desc[j]; - const kern_variable *kvar = &kcxt->kvars_slot[pdesc->slot_id]; - int vclass = kcxt->kvars_class[pdesc->slot_id]; - int nbytes; - char *buffer = NULL; + default: + STROM_ELOG(kcxt, "unknown unit size of Arrow::Time"); + return false; + } + *vclass = KVAR_CLASS__NULL; + return true; +} - if (vclass == KVAR_CLASS__NULL) - continue; - /* adjust alignment */ - t_next = TYPEALIGN(cmeta->attalign, t_hoff); - if (htup) - { - if (t_next > t_hoff) - memset((char *)htup + t_hoff, 0, t_next - t_hoff); - buffer = (char *)htup + t_next; - } +STATIC_FUNCTION(bool) +__arrow_fetch_timestamp_datum(kern_context *kcxt, + const kern_data_store *kds, + const kern_colmeta *cmeta, + uint32_t kds_index, + kern_variable *kvar, + int *vclass) - if (vclass == KVAR_CLASS__XPU_DATUM) - { - const xpu_datum_t *xdatum = (const xpu_datum_t *)kvar->ptr; +{ + size_t values_length = __kds_unpack(cmeta->values_length); - assert(xdatum->expr_ops != NULL); - nbytes = xdatum->expr_ops->xpu_datum_write(kcxt, buffer, xdatum); - if (nbytes < 0) - return -1; - } - else if (cmeta->attlen > 0) - { - if (vclass == KVAR_CLASS__INLINE) + assert(cmeta->extra_offset == 0 && + cmeta->extra_length == 0); + switch (cmeta->attopts.time.unit) + { + case ArrowTimeUnit__Second: + if (cmeta->values_offset && + sizeof(uint64_t) * (kds_index+1) <= values_length) { - assert(cmeta->attlen <= sizeof(kern_variable)); - if (buffer) - memcpy(buffer, kvar, cmeta->attlen); + uint64_t *base = (uint64_t *) + ((char *)kds + __kds_unpack(cmeta->values_offset)); + kvar->u64 = base[kds_index] * 1000000L - + (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * USECS_PER_DAY; + *vclass = KVAR_CLASS__INLINE; + return true; } - else if (vclass >= 0) - { - int sz = Min(vclass, cmeta->attlen); + break; - if (buffer) - { - if (sz > 0) - memcpy(buffer, kvar->ptr, sz); - if (sz < cmeta->attlen) - memset(buffer + sz, 0, cmeta->attlen - sz); - } - } - else - { - STROM_ELOG(kcxt, "Bug? unexpected kvar-class for fixed-length datum"); - return -1; - } - nbytes = cmeta->attlen; - } - else if (cmeta->attlen == -1) - { - if (vclass >= 0) + case ArrowTimeUnit__MilliSecond: + if (cmeta->values_offset && + sizeof(uint64_t) * (kds_index+1) <= values_length) { - nbytes = VARHDRSZ + vclass; - if (buffer) - { - if (vclass > 0) - memcpy(buffer+VARHDRSZ, kvar->ptr, vclass); - SET_VARSIZE(buffer, nbytes); - } + uint64_t *base = (uint64_t *) + ((char *)kds + __kds_unpack(cmeta->values_offset)); + kvar->u64 = base[kds_index] * 1000L - + (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * USECS_PER_DAY; + *vclass = KVAR_CLASS__INLINE; + return true; } - else if (vclass == KVAR_CLASS__VARLENA) + break; + + case ArrowTimeUnit__MicroSecond: + if (cmeta->values_offset && + sizeof(uint64_t) * (kds_index+1) <= values_length) { - nbytes = VARSIZE_ANY(kvar->ptr); - if (buffer) - memcpy(buffer, kvar->ptr, nbytes); - if (VARATT_IS_EXTERNAL(kvar->ptr)) - t_infomask |= HEAP_HASEXTERNAL; + uint64_t *base = (uint64_t *) + ((char *)kds + __kds_unpack(cmeta->values_offset)); + kvar->u64 = base[kds_index] - + (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * USECS_PER_DAY; + *vclass = KVAR_CLASS__INLINE; + return true; } - else + break; + + case ArrowTimeUnit__NanoSecond: + if (cmeta->values_offset && + sizeof(uint64_t) * (kds_index+1) <= values_length) { - STROM_ELOG(kcxt, "Bug? unexpected kvar-class for varlena datum"); - return -1; + uint64_t *base = (uint64_t *) + ((char *)kds + __kds_unpack(cmeta->values_offset)); + kvar->u64 = base[kds_index] / 1000L - + (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * USECS_PER_DAY; + *vclass = KVAR_CLASS__INLINE; + return true; } - t_infomask |= HEAP_HASVARWIDTH; - } - else - { - STROM_ELOG(kcxt, "Bug? unsupported attribute-length"); - return -1; - } - /* set not-null bit, if valid */ - if (htup && t_hasnull) - htup->t_bits[j>>3] |= (1<<(j & 7)); - t_hoff = t_next + nbytes; - } - if (htup) - { - int ctid_slot = kproj->u.proj.ctid_slot; - - /* assign ctid, if any */ - if (ctid_slot >= 0 && - ctid_slot < kcxt->kvars_nslots && - kcxt->kvars_class[ctid_slot] == sizeof(ItemPointerData)) - { - memcpy(&htup->t_ctid, - kcxt->kvars_slot[ctid_slot].ptr, - sizeof(ItemPointerData)); - } - else - { - ItemPointerSetInvalid(&htup->t_ctid); - } - htup->t_infomask = t_infomask; - SET_VARSIZE(&htup->t_choice.t_datum, t_hoff); - } - return t_hoff; -} - -EXTERN_FUNCTION(int) -kern_estimate_heaptuple(kern_context *kcxt, - const kern_expression *kproj, - const kern_data_store *kds_dst) -{ - const kern_expression *karg; - int i, sz; + break; - for (i=0, karg = KEXP_FIRST_ARG(kproj); - i < kproj->nr_args; - i++, karg = KEXP_NEXT_ARG(karg)) - { - assert(__KEXP_IS_VALID(kproj, karg) && - karg->opcode == FuncOpCode__SaveExpr); - if (!EXEC_KERN_EXPRESSION(kcxt, karg, NULL)) - return -1; + default: + STROM_ELOG(kcxt, "unknown unit size of Arrow::Timestamp"); + return false; } - /* then, estimate the length */ - sz = kern_form_heaptuple(kcxt, kproj, kds_dst, NULL); - if (sz < 0) - return -1; - return MAXALIGN(offsetof(kern_tupitem, htup) + sz); + *vclass = KVAR_CLASS__NULL; + return true; } STATIC_FUNCTION(bool) -pgfn_Projection(XPU_PGFUNCTION_ARGS) +__arrow_fetch_interval_datum(kern_context *kcxt, + const kern_data_store *kds, + const kern_colmeta *cmeta, + uint32_t kds_index, + kern_variable *kvar, + int *vclass, + char *slot_buf) { - /* - * FuncOpExpr_Projection should be handled by kern_estimate_heaptuple() - * and kern_form_heaptuple() by the caller. - */ - STROM_ELOG(kcxt, "pgfn_Projection is not implemented"); - return false; -} + size_t values_length = __kds_unpack(cmeta->values_length); -STATIC_FUNCTION(bool) -pgfn_HashValue(XPU_PGFUNCTION_ARGS) -{ - const kern_expression *karg; - xpu_int4_t *result = (xpu_int4_t *)__result; - xpu_datum_t *datum = (xpu_datum_t *)alloca(64); - int i, datum_sz = 64; - uint32_t hash = 0xffffffffU; - - for (i=0, karg = KEXP_FIRST_ARG(kexp); - i < kexp->nr_args; - i++, karg = KEXP_NEXT_ARG(karg)) + assert(cmeta->extra_offset == 0 && + cmeta->extra_length == 0 && + slot_buf != NULL); + switch (cmeta->attopts.interval.unit) { - const xpu_datum_operators *expr_ops = karg->expr_ops; - uint32_t __hash; - - if (expr_ops->xpu_type_sizeof > datum_sz) - { - datum_sz = expr_ops->xpu_type_sizeof; - datum = (xpu_datum_t *)alloca(datum_sz); - } - if (!EXEC_KERN_EXPRESSION(kcxt, karg, datum)) + case ArrowIntervalUnit__Year_Month: + if (cmeta->values_offset && + sizeof(uint32_t) * (kds_index+1) <= values_length) + { + xpu_interval_t *iv = (xpu_interval_t *)slot_buf; + uint32_t *base = (uint32_t *) + ((char *)kds + __kds_unpack(cmeta->values_offset)); + iv->value.month = base[kds_index]; + iv->value.day = 0; + iv->value.time = 0; + *vclass = KVAR_CLASS__XPU_DATUM; + kvar->ptr = iv; + return true; + } + break; + case ArrowIntervalUnit__Day_Time: + if (cmeta->values_offset && + sizeof(uint32_t) * 2 * (kds_index+1) <= values_length) + { + xpu_interval_t *iv = (xpu_interval_t *)slot_buf; + uint32_t *base = (uint32_t *) + ((char *)kds + __kds_unpack(cmeta->values_offset)); + iv->value.month = 0; + iv->value.day = base[2*kds_index]; + iv->value.time = base[2*kds_index+1]; + *vclass = KVAR_CLASS__XPU_DATUM; + kvar->ptr = iv; + return true; + } + break; + default: + STROM_ELOG(kcxt, "unknown unit-size of Arrow::Interval"); return false; - if (!XPU_DATUM_ISNULL(datum)) - { - if (!expr_ops->xpu_datum_hash(kcxt, &__hash, datum)) - return false; - hash ^= __hash; - } } - hash ^= 0xffffffffU; - - result->expr_ops = &xpu_int4_ops; - result->value = hash; + *vclass = KVAR_CLASS__NULL; return true; } STATIC_FUNCTION(bool) -pgfn_SaveExpr(XPU_PGFUNCTION_ARGS) +__arrow_fetch_fixed_size_binary_datum(kern_context *kcxt, + const kern_data_store *kds, + const kern_colmeta *cmeta, + uint32_t kds_index, + kern_variable *kvar, + int *vclass) { - const kern_expression *karg = KEXP_FIRST_ARG(kexp); - const xpu_datum_operators *expr_ops = kexp->expr_ops; - xpu_datum_t *result = __result; - uint32_t slot_id = kexp->u.save.slot_id; - uint32_t slot_off = kexp->u.save.slot_off; - xpu_datum_t *slot_buf = NULL; + unsigned int unitsz = cmeta->attopts.fixed_size_binary.byteWidth; - assert(slot_id < kcxt->kvars_nslots); - assert(kexp->nr_args == 1 && - kexp->exptype == karg->exptype); - if (slot_off > 0) - { - assert(slot_off + expr_ops->xpu_type_sizeof <= kcxt->kvars_nbytes); - slot_buf = (xpu_datum_t *)((char *)kcxt->kvars_slot + slot_off); - } - /* SaveExpr accept NULL result buffer! */ - if (!result) - { - if (slot_buf) - result = slot_buf; - else - result = (xpu_datum_t *)alloca(expr_ops->xpu_type_sizeof); - } - /* Run the expression */ - if (!EXEC_KERN_EXPRESSION(kcxt, karg, result)) - return false; - if (XPU_DATUM_ISNULL(result)) - { - kcxt->kvars_class[slot_id] = KVAR_CLASS__NULL; - } - else if (slot_buf) - { - if (slot_buf != result) - memcpy(slot_buf, result, expr_ops->xpu_type_sizeof); - kcxt->kvars_class[slot_id] = KVAR_CLASS__XPU_DATUM; - kcxt->kvars_slot[slot_id].ptr = slot_buf; - } - else + assert(cmeta->extra_offset == 0 && + cmeta->extra_length == 0); + if (cmeta->values_offset && + unitsz * (kds_index+1) <= __kds_unpack(cmeta->values_length)) { - if (!expr_ops->xpu_datum_store(kcxt, result, - &kcxt->kvars_class[slot_id], - &kcxt->kvars_slot[slot_id])) - return false; + char *base = ((char *)kds + __kds_unpack(cmeta->values_offset)); + + kvar->ptr = base + unitsz * kds_index; + *vclass = unitsz; + return true; } + *vclass = KVAR_CLASS__NULL; return true; } STATIC_FUNCTION(bool) -pgfn_JoinQuals(XPU_PGFUNCTION_ARGS) +__arrow_fetch_variable_datum(kern_context *kcxt, + const kern_data_store *kds, + const kern_colmeta *cmeta, + uint32_t kds_index, + bool is_large_offset, + kern_variable *kvar, + int *vclass) { - const kern_expression *karg; - xpu_int4_t *result = (xpu_int4_t *)__result; - int i, status = 1; + uint64_t start, end; + char *extra; - assert(kexp->exptype == TypeOpCode__bool); - for (i=0, karg = KEXP_FIRST_ARG(kexp); - i < kexp->nr_args; - i++, karg = KEXP_NEXT_ARG(karg)) + if (arrow_fetch_secondary_index(kcxt, kds, kds_index, + cmeta->values_offset, + cmeta->values_length, + is_large_offset, + &start, &end)) { - xpu_bool_t datum; - - if (status < 0 && (karg->expflags & KEXP_FLAG__IS_PUSHED_DOWN) != 0) - continue; - if (!EXEC_KERN_EXPRESSION(kcxt, karg, &datum)) - return false; - if (XPU_DATUM_ISNULL(&datum) || !datum.value) + /* sanity checks */ + if (start > end || end - start >= 0x40000000UL || + end > __kds_unpack(cmeta->extra_length)) { - /* - * NOTE: Even if JoinQual returns 'unmatched' status, we need - * to check whether the pure JOIN ... ON clause is satisfied, - * or not, if OUTER JOIN case. - * '-1' means JoinQual is not matched, because of the pushed- - * down qualifiers from WHERE-clause, not JOIN ... ON. - */ - if ((karg->expflags & KEXP_FLAG__IS_PUSHED_DOWN) == 0) - { - status = 0; - break; - } - status = -1; + STROM_ELOG(kcxt, "Arrow variable data corruption"); + return false; } + extra = (char *)kds + __kds_unpack(cmeta->extra_offset); + kvar->ptr = extra + start; + *vclass = (end - start); + return true; } - result->expr_ops = kexp->expr_ops; - result->value = status; - return true; -} - -STATIC_FUNCTION(bool) -pgfn_GiSTEval(XPU_PGFUNCTION_ARGS) -{ - STROM_ELOG(kcxt, "pgfn_GiSTEval should not be called as a normal kernel expression"); return false; } STATIC_FUNCTION(bool) -pgfn_Packed(XPU_PGFUNCTION_ARGS) +__arrow_fetch_array_datum(kern_context *kcxt, + const kern_data_store *kds, + const kern_colmeta *cmeta, + uint32_t kds_index, + bool is_large_offset, + kern_variable *kvar, + int *vclass, + char *slot_buf) { - STROM_ELOG(kcxt, "pgfn_Packed should not be called as a normal kernel expression"); + uint64_t start, end; + + assert(cmeta->idx_subattrs < kds->nr_colmeta && + cmeta->num_subattrs == 1 && + slot_buf != NULL); + if (arrow_fetch_secondary_index(kcxt, kds, kds_index, + cmeta->values_offset, + cmeta->values_length, + is_large_offset, + &start, &end)) + { + xpu_array_t *array = (xpu_array_t *)slot_buf; + + /* sanity checks */ + if (start > end) + { + STROM_ELOG(kcxt, "Arrow::List secondary index corruption"); + return false; + } + array->expr_ops = &xpu_array_ops; + array->length = end - start; + array->u.arrow.start = start; + array->u.arrow.smeta = &kds->colmeta[cmeta->idx_subattrs]; + assert(cmeta->num_subattrs == 1); + *vclass = KVAR_CLASS__XPU_DATUM; + kvar->ptr = array; + return true; + } return false; } STATIC_FUNCTION(bool) -pgfn_AggFuncs(XPU_PGFUNCTION_ARGS) +__arrow_fetch_composite_datum(kern_context *kcxt, + const kern_data_store *kds, + const kern_colmeta *cmeta, + uint32_t kds_index, + kern_variable *kvar, + int *vclass, + char *slot_buf) { - STROM_ELOG(kcxt, "pgfn_AggFuncs should not be called as a normal kernel expression"); - return false; + xpu_composite_t *comp = (xpu_composite_t *)slot_buf; + + comp->expr_ops = &xpu_composite_ops; + comp->comp_typid = cmeta->atttypid; + comp->comp_typmod = cmeta->atttypmod; + comp->rowidx = kds_index; + comp->nfields = cmeta->num_subattrs; + comp->smeta = &kds->colmeta[cmeta->idx_subattrs]; + comp->value = NULL; + *vclass = KVAR_CLASS__XPU_DATUM; + kvar->ptr = comp; + return true; } -/* ---------------------------------------------------------------- - * - * LoadVars / Projection - * - * ---------------------------------------------------------------- - */ STATIC_FUNCTION(bool) -__extract_heap_tuple_attr(kern_context *kcxt, - const kern_data_store *kds, - const kern_colmeta *cmeta, - const kern_vars_defitem *kvdef, - const char *addr) +__kern_extract_arrow_field(kern_context *kcxt, + const kern_data_store *kds, + const kern_colmeta *cmeta, + uint32_t kds_index, + kern_variable *kvar, + int *vclass, + char *slot_buf) { - uint32_t slot_id = kvdef->var_slot_id; - - if (slot_id >= kcxt->kvars_nslots) - { - STROM_ELOG(kcxt, "kvars::slot_id is out of range"); - return false; - } - else if (!addr) - { - kcxt->kvars_class[slot_id] = KVAR_CLASS__NULL; - } - else if (cmeta->atttypkind == TYPE_KIND__ARRAY) + switch (cmeta->attopts.tag) { - /* special case if xpu_array_t */ - xpu_array_t *array; + case ArrowType__Bool: + if (!__arrow_fetch_bool_datum(kcxt, kds, cmeta, + kds_index, + kvar, vclass)) + return false; + break; - assert(kvdef->var_slot_off > 0 && - kvdef->var_slot_off + sizeof(xpu_array_t) <= kcxt->kvars_nbytes); - assert((char *)kds + cmeta->kds_offset == (char *)cmeta); - array = (xpu_array_t *) - ((char *)kcxt->kvars_slot + kvdef->var_slot_off); - memset(array, 0, sizeof(xpu_array_t)); - array->expr_ops = &xpu_array_ops; - array->length = -1; - array->u.heap.attbyval = cmeta->attbyval; - array->u.heap.attalign = cmeta->attalign; - array->u.heap.attlen = cmeta->attlen; - array->u.heap.value = (const varlena *)addr; - } - else if (cmeta->atttypkind == TYPE_KIND__COMPOSITE) - { - /* special case if xpu_composite_t */ - xpu_composite_t *comp; + case ArrowType__Int: + if (!__arrow_fetch_int_datum(kcxt, kds, cmeta, + kds_index, + kvar, vclass)) + return false; + break; + + case ArrowType__FloatingPoint: + if (!__arrow_fetch_float_datum(kcxt, kds, cmeta, + kds_index, + kvar, vclass)) + return false; + break; - assert(kvdef->var_slot_off > 0 && - kvdef->var_slot_off + sizeof(xpu_composite_t) <= kcxt->kvars_nbytes); - assert((char *)kds + cmeta->kds_offset == (char *)cmeta); - comp = (xpu_composite_t *) - ((char *)kcxt->kvars_slot + kvdef->var_slot_off); - comp->expr_ops = &xpu_composite_ops; - comp->comp_typid = cmeta->atttypid; - comp->comp_typmod = cmeta->atttypmod; - comp->rowidx = 0; - comp->nfields = cmeta->num_subattrs; - comp->smeta = kds->colmeta + cmeta->idx_subattrs;; - comp->value = (const varlena *)addr; - } - else if (cmeta->attbyval) - { - assert(cmeta->attlen > 0 && cmeta->attlen <= sizeof(kern_variable)); - kcxt->kvars_class[slot_id] = KVAR_CLASS__INLINE; - memcpy(&kcxt->kvars_slot[slot_id], addr, cmeta->attlen); - } - else if (cmeta->attlen > 0) - { - kcxt->kvars_class[slot_id] = cmeta->attlen; - kcxt->kvars_slot[slot_id].ptr = (void *)addr; - } - else if (cmeta->attlen == -1) - { - kcxt->kvars_class[slot_id] = KVAR_CLASS__VARLENA; - kcxt->kvars_slot[slot_id].ptr = (void *)addr; - } - else - { - STROM_ELOG(kcxt, "not a supported attribute length"); - return false; + case ArrowType__Decimal: + if (!__arrow_fetch_decimal_datum(kcxt, kds, cmeta, + kds_index, + kvar, vclass, + slot_buf)) + return false; + break; + + case ArrowType__Date: + if (!__arrow_fetch_date_datum(kcxt, kds, cmeta, + kds_index, + kvar, vclass)) + return false; + break; + + case ArrowType__Time: + if (!__arrow_fetch_time_datum(kcxt, kds, cmeta, + kds_index, + kvar, vclass)) + return false; + break; + + case ArrowType__Timestamp: + if (!__arrow_fetch_timestamp_datum(kcxt, kds, cmeta, + kds_index, + kvar, vclass)) + return false; + break; + + case ArrowType__Interval: + if (!__arrow_fetch_interval_datum(kcxt, kds, cmeta, + kds_index, + kvar, vclass, + slot_buf)) + return false; + break; + + case ArrowType__FixedSizeBinary: + if (!__arrow_fetch_fixed_size_binary_datum(kcxt, kds, cmeta, + kds_index, + kvar, vclass)) + return false; + break; + + case ArrowType__Utf8: + case ArrowType__Binary: + if (!__arrow_fetch_variable_datum(kcxt, kds, cmeta, + kds_index, + false, + kvar, vclass)) + return false; + break; + + case ArrowType__LargeUtf8: + case ArrowType__LargeBinary: + if (!__arrow_fetch_variable_datum(kcxt, kds, cmeta, + kds_index, + true, + kvar, vclass)) + return false; + break; + + case ArrowType__List: + if (!__arrow_fetch_array_datum(kcxt, kds, cmeta, + kds_index, + false, + kvar, vclass, + slot_buf)) + return false; + break; + + case ArrowType__LargeList: + if (!__arrow_fetch_array_datum(kcxt, kds, cmeta, + kds_index, + true, + kvar, vclass, + slot_buf)) + return false; + break; + + case ArrowType__Struct: + if (!__arrow_fetch_composite_datum(kcxt, kds, cmeta, + kds_index, + kvar, vclass, + slot_buf)) + return false; + break; + default: + STROM_ELOG(kcxt, "Unsupported Apache Arrow type"); + return false; } return true; } INLINE_FUNCTION(bool) -__extract_heap_tuple_sysattr(kern_context *kcxt, - const kern_data_store *kds, - const HeapTupleHeaderData *htup, - const kern_vars_defitem *kvdef) +__extract_arrow_tuple_sysattr(kern_context *kcxt, + const kern_data_store *kds, + uint32_t kds_index, + const kern_vars_defitem *kvdef) { - uint32_t slot_id = kvdef->var_slot_id; + static ItemPointerData __invalid_ctid__ = {{0,0},0}; + uint32_t slot_id = kvdef->var_slot_id; /* out of range? */ if (slot_id >= kcxt->kvars_nslots) @@ -878,20 +982,20 @@ __extract_heap_tuple_sysattr(kern_context *kcxt, switch (kvdef->var_resno) { case SelfItemPointerAttributeNumber: - kcxt->kvars_slot[slot_id].ptr = (void *)&htup->t_ctid; + kcxt->kvars_slot[slot_id].ptr = (void *)&__invalid_ctid__; kcxt->kvars_class[slot_id] = sizeof(ItemPointerData); break; case MinTransactionIdAttributeNumber: - kcxt->kvars_slot[slot_id].u32 = htup->t_choice.t_heap.t_xmin; + kcxt->kvars_slot[slot_id].u32 = FrozenTransactionId; kcxt->kvars_class[slot_id] = KVAR_CLASS__INLINE; break; case MaxTransactionIdAttributeNumber: - kcxt->kvars_slot[slot_id].u32 = htup->t_choice.t_heap.t_xmax; + kcxt->kvars_slot[slot_id].u32 = InvalidTransactionId; kcxt->kvars_class[slot_id] = KVAR_CLASS__INLINE; break; case MinCommandIdAttributeNumber: case MaxCommandIdAttributeNumber: - kcxt->kvars_slot[slot_id].u32 = htup->t_choice.t_heap.t_field3.t_cid; + kcxt->kvars_slot[slot_id].u32 = FirstCommandId; kcxt->kvars_class[slot_id] = KVAR_CLASS__INLINE; break; case TableOidAttributeNumber: @@ -906,97 +1010,64 @@ __extract_heap_tuple_sysattr(kern_context *kcxt, } STATIC_FUNCTION(bool) -kern_extract_heap_tuple(kern_context *kcxt, - const kern_data_store *kds, - const HeapTupleHeaderData *htup, - const kern_vars_defitem *kvars_items, - int kvars_nloads) +kern_extract_arrow_tuple(kern_context *kcxt, + kern_data_store *kds, + uint32_t kds_index, + const kern_vars_defitem *kvars_items, + int kvars_nloads) { const kern_vars_defitem *kvdef = kvars_items; - uint32_t offset = htup->t_hoff; - int resno = 1; - int kvars_count = 0; - int ncols = Min(htup->t_infomask2 & HEAP_NATTS_MASK, kds->ncols); - bool heap_hasnull = ((htup->t_infomask & HEAP_HASNULL) != 0); + int kvars_count = 0; - /* extract system attributes, if rquired */ + assert(kds->format == KDS_FORMAT_ARROW); + /* fillup invalid values for system attribute, if any */ while (kvars_count < kvars_nloads && kvdef->var_resno < 0) { - if (!__extract_heap_tuple_sysattr(kcxt, kds, htup, kvdef)) - return; + if (!__extract_arrow_tuple_sysattr(kcxt, kds, kds_index, kvdef)) + return false; kvdef++; kvars_count++; } - /* try attcacheoff shortcut, if available. */ - if (!heap_hasnull) - { - while (kvars_count < kvars_nloads && - kvdef->var_resno > 0 && - kvdef->var_resno <= ncols) - { - const kern_colmeta *cmeta = &kds->colmeta[kvdef->var_resno-1]; - char *addr; - - if (cmeta->attcacheoff < 0) - break; - offset = htup->t_hoff + cmeta->attcacheoff; - addr = (char *)htup + offset; - if (!__extract_heap_tuple_attr(kcxt, kds, cmeta, kvdef, addr)) - return false; - /* next resno */ - resno = kvdef->var_resno + 1; - if (cmeta->attlen > 0) - offset += cmeta->attlen; - else - offset += VARSIZE_ANY(addr); - kvdef++; - kvars_count++; - } - } - /* extract slow path */ - while (resno <= ncols && kvars_count < kvars_nloads) + while (kvars_count < kvars_nloads && + kvdef->var_resno <= kds->ncols) { - const kern_colmeta *cmeta = &kds->colmeta[resno-1]; - char *addr; + kern_colmeta *cmeta = &kds->colmeta[kvdef->var_resno-1]; + uint32_t slot_id = kvdef->var_slot_id; + char *slot_buf = kcxt_slot_buf(kcxt, kvdef->var_slot_off); - if (heap_hasnull && att_isnull(resno-1, htup->t_bits)) + if (cmeta->nullmap_offset == 0 || + arrow_bitmap_check(kds, kds_index, + cmeta->nullmap_offset, + cmeta->nullmap_length)) { - addr = NULL; + if (!__kern_extract_arrow_field(kcxt, + kds, + cmeta, + kds_index, + &kcxt->kvars_slot[slot_id], + &kcxt->kvars_class[slot_id], + slot_buf)) + return false; } else { - if (cmeta->attlen > 0) - offset = TYPEALIGN(cmeta->attalign, offset); - else if (!VARATT_NOT_PAD_BYTE((char *)htup + offset)) - offset = TYPEALIGN(cmeta->attalign, offset); - - addr = ((char *)htup + offset); - if (cmeta->attlen > 0) - offset += cmeta->attlen; - else - offset += VARSIZE_ANY(addr); + kcxt->kvars_class[slot_id] = KVAR_CLASS__NULL; + kcxt->kvars_slot[slot_id].ptr = NULL; } + kvdef++; + kvars_count++; + } + /* other fields, which refers out of range, are NULL */ + while (kvars_count < kvars_nloads) + { + uint32_t slot_id = kvdef->var_slot_id; - if (kvdef->var_resno == resno) + if (slot_id < kcxt->kvars_nslots) { - if (!__extract_heap_tuple_attr(kcxt, kds, cmeta, kvdef, addr)) - return false; - kvdef++; - kvars_count++; - } - resno++; - } - /* fill-up by NULLs for the remained slot */ - while (kvars_count < kvars_nloads) - { - uint32_t slot_id = kvdef->var_slot_id; - - if (slot_id < kcxt->kvars_nslots) - { - kcxt->kvars_slot[slot_id].ptr = NULL; kcxt->kvars_class[slot_id] = KVAR_CLASS__NULL; + kcxt->kvars_slot[slot_id].ptr = NULL; } kvdef++; kvars_count++; @@ -1004,860 +1075,1017 @@ kern_extract_heap_tuple(kern_context *kcxt, return true; } +/* ---------------------------------------------------------------- + * + * Device-side Expression Support Routines + * + * ---------------------------------------------------------------- + */ + /* - * Routines to extract Arrow data store + * Const Expression */ -INLINE_FUNCTION(bool) -arrow_bitmap_check(kern_data_store *kds, - uint32_t kds_index, - uint32_t bitmap_offset, - uint32_t bitmap_length) +STATIC_FUNCTION(bool) +pgfn_ConstExpr(XPU_PGFUNCTION_ARGS) { - uint8_t *bitmap; - uint8_t mask = (1<<(kds_index & 7)); - uint32_t idx = (kds_index >> 3); - - if (bitmap_offset == 0 || /* no bitmap */ - bitmap_length == 0 || /* no bitmap */ - idx >= __kds_unpack(bitmap_length)) /* out of range */ - return false; - bitmap = (uint8_t *)kds + __kds_unpack(bitmap_offset); + if (!kexp->u.c.const_isnull) + { + const xpu_datum_operators *expr_ops = kexp->expr_ops; + int typlen = expr_ops->xpu_type_length; + kern_variable kvar; - return (bitmap[idx] & mask) != 0; + kvar.ptr = (void *)kexp->u.c.const_value; + if (typlen >= 0) + return expr_ops->xpu_datum_ref(kcxt, __result, typlen, &kvar); + else if (typlen == -1) + return expr_ops->xpu_datum_ref(kcxt, __result, KVAR_CLASS__VARLENA, &kvar); + else + { + STROM_ELOG(kcxt, "Bug? ConstExpr has unknown type length"); + return false; + } + } + __result->expr_ops = NULL; + return true; } STATIC_FUNCTION(bool) -arrow_fetch_secondary_index(kern_context *kcxt, - kern_data_store *kds, - uint32_t kds_index, - uint32_t values_offset, - uint32_t values_length, - bool is_large_offset, - uint64_t *p_start, - uint64_t *p_end) +pgfn_ParamExpr(XPU_PGFUNCTION_ARGS) { - if (!values_offset || !values_length) - { - STROM_ELOG(kcxt, "Arrow variable index/buffer is missing"); - return false; - } + kern_session_info *session = kcxt->session; + uint32_t param_id = kexp->u.p.param_id; - if (is_large_offset) + if (param_id < session->nparams && session->poffset[param_id] != 0) { - uint64_t *base = (uint64_t *)((char *)kds + __kds_unpack(values_offset)); + const xpu_datum_operators *expr_ops = kexp->expr_ops; + int typlen = expr_ops->xpu_type_length; + kern_variable kvar; - if (sizeof(uint64_t) * (kds_index+2) > __kds_unpack(values_length)) + kvar.ptr = ((char *)session + session->poffset[param_id]); + if (typlen >= 0) + return expr_ops->xpu_datum_ref(kcxt, __result, typlen, &kvar); + else if (typlen == -1) + return expr_ops->xpu_datum_ref(kcxt, __result, KVAR_CLASS__VARLENA, &kvar); + else { - STROM_ELOG(kcxt, "Arrow variable index[64bit] out of range"); + STROM_ELOG(kcxt, "Bug? ParamExpr has unknown type length"); return false; } - *p_start = base[kds_index]; - *p_end = base[kds_index+1]; } - else + __result->expr_ops = NULL; + return true; +} + +STATIC_FUNCTION(bool) +pgfn_VarExpr(XPU_PGFUNCTION_ARGS) +{ + uint32_t slot_id = kexp->u.v.var_slot_id; + + if (slot_id < kcxt->kvars_nslots) { - uint32_t *base = (uint32_t *)((char *)kds + __kds_unpack(values_offset)); + const xpu_datum_operators *expr_ops = kexp->expr_ops; + kern_variable *kvar = &kcxt->kvars_slot[slot_id]; + int vclass = kcxt->kvars_class[slot_id]; - if (sizeof(uint32_t) * (kds_index+2) > __kds_unpack(values_length)) + switch (vclass) { - STROM_ELOG(kcxt, "Arrow variable index[32bit] out of range"); - return false; - } - *p_start = base[kds_index]; - *p_end = base[kds_index+1]; + case KVAR_CLASS__NULL: + __result->expr_ops = NULL; + return true; + + case KVAR_CLASS__XPU_DATUM: + assert(((const xpu_datum_t *)kvar->ptr)->expr_ops == expr_ops); + memcpy(__result, kvar->ptr, expr_ops->xpu_type_sizeof); + return true; + default: + if (vclass < 0) + { + STROM_ELOG(kcxt, "Bug? KVAR_CLASS__* mismatch"); + return false; + } + case KVAR_CLASS__INLINE: + case KVAR_CLASS__VARLENA: + return expr_ops->xpu_datum_ref(kcxt, __result, vclass, kvar); + } } - return true; + STROM_ELOG(kcxt, "Bug? slot_id is out of range"); + return false; } STATIC_FUNCTION(bool) -__arrow_fetch_bool_datum(kern_context *kcxt, - kern_data_store *kds, - kern_colmeta *cmeta, - uint32_t kds_index, - kern_variable *kvar, - int *vclass) +pgfn_BoolExprAnd(XPU_PGFUNCTION_ARGS) { - assert(cmeta->extra_offset == 0 && - cmeta->extra_length == 0); - kvar->i8 = arrow_bitmap_check(kds, kds_index, - cmeta->values_offset, - cmeta->values_length); - *vclass = KVAR_CLASS__INLINE; + xpu_bool_t *result = (xpu_bool_t *)__result; + int i; + bool anynull = false; + const kern_expression *karg; + + assert(kexp->exptype == TypeOpCode__bool); + for (i=0, karg=KEXP_FIRST_ARG(kexp); + i < kexp->nr_args; + i++, karg=KEXP_NEXT_ARG(karg)) + { + xpu_bool_t status; + + assert(KEXP_IS_VALID(karg, bool)); + if (!EXEC_KERN_EXPRESSION(kcxt, karg, &status)) + return false; + if (XPU_DATUM_ISNULL(&status)) + anynull = true; + else if (!status.value) + { + result->expr_ops = kexp->expr_ops; + result->value = false; + return true; + } + } + result->expr_ops = (anynull ? NULL : kexp->expr_ops); + result->value = true; return true; } STATIC_FUNCTION(bool) -__arrow_fetch_int_datum(kern_context *kcxt, - kern_data_store *kds, - kern_colmeta *cmeta, - uint32_t kds_index, - kern_variable *kvar, - int *vclass) +pgfn_BoolExprOr(XPU_PGFUNCTION_ARGS) { - size_t values_length = __kds_unpack(cmeta->values_length); + xpu_bool_t *result = (xpu_bool_t *)__result; + int i; + bool anynull = false; + const kern_expression *karg; - assert(cmeta->extra_offset == 0 && - cmeta->extra_length == 0); - switch (cmeta->attopts.integer.bitWidth) + assert(kexp->exptype == TypeOpCode__bool); + for (i=0, karg=KEXP_FIRST_ARG(kexp); + i < kexp->nr_args; + i++, karg=KEXP_NEXT_ARG(karg)) { - case 8: - if (cmeta->values_offset && - sizeof(uint8_t) * (kds_index+1) <= values_length) - { - uint8_t *base = (uint8_t *) - ((char *)kds + __kds_unpack(cmeta->values_offset)); - kvar->u8 = base[kds_index]; - *vclass = KVAR_CLASS__INLINE; - return true; - } - break; - case 16: - if (cmeta->values_offset && - sizeof(uint16_t) * (kds_index+1) <= values_length) - { - uint16_t *base = (uint16_t *) - ((char *)kds + __kds_unpack(cmeta->values_offset)); - kvar->u16 = base[kds_index]; - *vclass = KVAR_CLASS__INLINE; - return true; - } - break; - case 32: - if (cmeta->values_offset && - sizeof(uint32_t) * (kds_index+1) <= values_length) - { - uint32_t *base = (uint32_t *) - ((char *)kds + __kds_unpack(cmeta->values_offset)); - kvar->u32 = base[kds_index]; - *vclass = KVAR_CLASS__INLINE; - return true; - } - break; - case 64: - if (cmeta->values_offset && - sizeof(uint64_t) * (kds_index+1) <= values_length) - { - uint64_t *base = (uint64_t *) - ((char *)kds + __kds_unpack(cmeta->values_offset)); - kvar->u64 = base[kds_index]; - *vclass = KVAR_CLASS__INLINE; - } - break; - default: - STROM_ELOG(kcxt, "Arrow::Int unsupported bitWidth"); + xpu_bool_t status; + + assert(KEXP_IS_VALID(karg, bool)); + if (!EXEC_KERN_EXPRESSION(kcxt, karg, &status)) return false; + if (XPU_DATUM_ISNULL(&status)) + anynull = true; + else if (status.value) + { + result->expr_ops = kexp->expr_ops; + result->value = true; + return true; + } } - *vclass = KVAR_CLASS__NULL; - return true; + result->expr_ops = (anynull ? NULL : kexp->expr_ops); + result->value = false; + return true; } STATIC_FUNCTION(bool) -__arrow_fetch_float_datum(kern_context *kcxt, - kern_data_store *kds, - kern_colmeta *cmeta, - uint32_t kds_index, - kern_variable *kvar, - int *vclass) +pgfn_BoolExprNot(XPU_PGFUNCTION_ARGS) { - size_t values_length = __kds_unpack(cmeta->values_length); - - assert(cmeta->extra_offset == 0 && - cmeta->extra_length == 0); - switch (cmeta->attopts.floating_point.precision) - { - case ArrowPrecision__Half: - if (cmeta->values_offset && - sizeof(float2_t) * (kds_index+1) <= cmeta->values_length) - { - float2_t *base = (float2_t *) - ((char *)kds + __kds_unpack(cmeta->values_offset)); - kvar->fp16 = base[kds_index]; - *vclass = KVAR_CLASS__INLINE; - return true; - } - break; - case ArrowPrecision__Single: - if (cmeta->values_offset && - sizeof(float4_t) * (kds_index+1) <= values_length) - { - float4_t *base = (float4_t *) - ((char *)kds + __kds_unpack(cmeta->values_offset)); - kvar->fp32 = base[kds_index]; - *vclass = KVAR_CLASS__INLINE; - return true; - } - break; - case ArrowPrecision__Double: - if (cmeta->values_offset && - sizeof(float8_t) * (kds_index+1) <= values_length) - { - float8_t *base = (float8_t *) - ((char *)kds + __kds_unpack(cmeta->values_offset)); - kvar->fp64 = base[kds_index]; - *vclass = KVAR_CLASS__INLINE; - return true; - } - break; - default: - STROM_ELOG(kcxt, "Arrow::FloatingPoint unsupported precision"); - return false; - } - *vclass = KVAR_CLASS__NULL; - return true; -} + xpu_bool_t *result = (xpu_bool_t *)__result; + xpu_bool_t status; + const kern_expression *karg = KEXP_FIRST_ARG(kexp); -STATIC_FUNCTION(bool) -__arrow_fetch_decimal_datum(kern_context *kcxt, - kern_data_store *kds, - kern_colmeta *cmeta, - uint32_t kds_index, - uint32_t slot_off, - kern_variable *kvar, - int *vclass) -{ - assert(cmeta->extra_offset == 0 && - cmeta->extra_length == 0); - if (cmeta->attopts.decimal.bitWidth != 128) - { - STROM_ELOG(kcxt, "Arrow::Decimal unsupported bitWidth"); + assert(kexp->exptype == TypeOpCode__bool && + kexp->nr_args == 1 && KEXP_IS_VALID(karg, bool)); + if (!EXEC_KERN_EXPRESSION(kcxt, karg, &status)) return false; - } - if (cmeta->values_offset && - sizeof(int128_t) * (kds_index+1) <= __kds_unpack(cmeta->values_length)) - { - xpu_numeric_t *num = (xpu_numeric_t *)((char *)kcxt->kvars_slot + slot_off); - int128_t *base = (int128_t *) - ((char *)kds + __kds_unpack(cmeta->values_offset)); - - assert((((uintptr_t)base) & (sizeof(int128_t)-1)) == 0); - set_normalized_numeric(num, base[kds_index], - cmeta->attopts.decimal.scale); - *vclass = KVAR_CLASS__XPU_DATUM; - kvar->ptr = num; - } + if (XPU_DATUM_ISNULL(&status)) + result->expr_ops = NULL; else { - *vclass = KVAR_CLASS__NULL; + result->expr_ops = kexp->expr_ops; + result->value = !result->value; } return true; } STATIC_FUNCTION(bool) -__arrow_fetch_date_datum(kern_context *kcxt, - kern_data_store *kds, - kern_colmeta *cmeta, - uint32_t kds_index, - kern_variable *kvar, - int *vclass) +pgfn_NullTestExpr(XPU_PGFUNCTION_ARGS) { - size_t values_length = __kds_unpack(cmeta->values_length); + xpu_bool_t *result = (xpu_bool_t *)__result; + xpu_datum_t *status; + const kern_expression *karg = KEXP_FIRST_ARG(kexp); - assert(cmeta->extra_offset == 0 && - cmeta->extra_length == 0); - switch (cmeta->attopts.date.unit) + assert(kexp->exptype == TypeOpCode__bool && + kexp->nr_args == 1 && __KEXP_IS_VALID(kexp, karg)); + status = (xpu_datum_t *)alloca(karg->expr_ops->xpu_type_sizeof); + if (!EXEC_KERN_EXPRESSION(kcxt, karg, status)) + return false; + result->expr_ops = kexp->expr_ops; + switch (kexp->opcode) { - case ArrowDateUnit__Day: - if (cmeta->values_offset && - sizeof(uint32_t) * (kds_index+1) <= values_length) - { - uint32_t *base = (uint32_t *) - ((char *)kds + __kds_unpack(cmeta->values_offset)); - kvar->u32 = base[kds_index] - - (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE); - *vclass = KVAR_CLASS__INLINE; - return true; - } + case FuncOpCode__NullTestExpr_IsNull: + result->value = XPU_DATUM_ISNULL(status); break; - - case ArrowDateUnit__MilliSecond: - if (cmeta->values_offset && - sizeof(uint64_t) * (kds_index+1) <= values_length) - { - uint64_t *base = (uint64_t *) - ((char *)kds + __kds_unpack(cmeta->values_offset)); - kvar->u32 = base[kds_index] / (SECS_PER_DAY * 1000) - - (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE); - *vclass = KVAR_CLASS__INLINE; - return true; - } + case FuncOpCode__NullTestExpr_IsNotNull: + result->value = !XPU_DATUM_ISNULL(status); break; - default: - STROM_ELOG(kcxt, "unknown unit size of Arrow::Date"); + STROM_ELOG(kcxt, "corrupted kernel expression"); return false; } - *vclass = KVAR_CLASS__NULL; return true; } STATIC_FUNCTION(bool) -__arrow_fetch_time_datum(kern_context *kcxt, - kern_data_store *kds, - kern_colmeta *cmeta, - uint32_t kds_index, - kern_variable *kvar, - int *vclass) +pgfn_BoolTestExpr(XPU_PGFUNCTION_ARGS) { - size_t values_length = __kds_unpack(cmeta->values_length); + xpu_bool_t *result = (xpu_bool_t *)__result; + xpu_bool_t status; + const kern_expression *karg = KEXP_FIRST_ARG(kexp); - assert(cmeta->extra_offset == 0 && - cmeta->extra_length == 0); - switch (cmeta->attopts.time.unit) + assert(kexp->exptype == TypeOpCode__bool && + kexp->nr_args == 1 && KEXP_IS_VALID(karg, bool)); + if (!EXEC_KERN_EXPRESSION(kcxt, karg, &status)) + return false; + result->expr_ops = kexp->expr_ops; + switch (kexp->opcode) { - case ArrowTimeUnit__Second: - if (cmeta->values_offset && - sizeof(int32_t) * (kds_index+1) <= values_length) - { - int32_t *base = (int32_t *) - ((char *)kds + __kds_unpack(cmeta->values_offset)); - kvar->i64 = (int64_t)base[kds_index] * 1000000L; - *vclass = KVAR_CLASS__INLINE; - return true; - } + case FuncOpCode__BoolTestExpr_IsTrue: + result->value = (!XPU_DATUM_ISNULL(&status) && status.value); break; - - case ArrowTimeUnit__MilliSecond: - if (cmeta->values_offset && - sizeof(int32_t) * (kds_index+1) <= values_length) - { - int32_t *base = (int32_t *) - ((char *)kds + __kds_unpack(cmeta->values_offset)); - kvar->i64 = (int64_t)base[kds_index] * 1000L; - *vclass = KVAR_CLASS__INLINE; - return true; - } + case FuncOpCode__BoolTestExpr_IsNotTrue: + result->value = (XPU_DATUM_ISNULL(&status) || !status.value); break; - - case ArrowTimeUnit__MicroSecond: - if (cmeta->values_offset && - sizeof(int64_t) * (kds_index+1) <= values_length) - { - int64_t *base = (int64_t *) - ((char *)kds + __kds_unpack(cmeta->values_offset)); - kvar->i64 = base[kds_index]; - *vclass = KVAR_CLASS__INLINE; - return true; - } + case FuncOpCode__BoolTestExpr_IsFalse: + result->value = (!XPU_DATUM_ISNULL(&status) && !status.value); break; - - case ArrowTimeUnit__NanoSecond: - if (cmeta->values_offset && - sizeof(int64_t) * (kds_index+1) <= values_length) - { - int64_t *base = (int64_t *) - ((char *)kds + __kds_unpack(cmeta->values_offset)); - kvar->i64 = base[kds_index] / 1000L; - *vclass = KVAR_CLASS__INLINE; - return true; - } + case FuncOpCode__BoolTestExpr_IsNotFalse: + result->value = (XPU_DATUM_ISNULL(&status) || status.value); + break; + case FuncOpCode__BoolTestExpr_IsUnknown: + result->value = XPU_DATUM_ISNULL(&status); + break; + case FuncOpCode__BoolTestExpr_IsNotUnknown: + result->value = !XPU_DATUM_ISNULL(&status); break; - default: - STROM_ELOG(kcxt, "unknown unit size of Arrow::Time"); + STROM_ELOG(kcxt, "corrupted kernel expression"); return false; } - *vclass = KVAR_CLASS__NULL; return true; } STATIC_FUNCTION(bool) -__arrow_fetch_timestamp_datum(kern_context *kcxt, - kern_data_store *kds, - kern_colmeta *cmeta, - uint32_t kds_index, - kern_variable *kvar, - int *vclass) - +pgfn_CoalesceExpr(XPU_PGFUNCTION_ARGS) { - size_t values_length = __kds_unpack(cmeta->values_length); + const kern_expression *karg; + int i; - assert(cmeta->extra_offset == 0 && - cmeta->extra_length == 0); - switch (cmeta->attopts.time.unit) + for (i=0, karg = KEXP_FIRST_ARG(kexp); + i < kexp->nr_args; + i++, karg = KEXP_NEXT_ARG(karg)) { - case ArrowTimeUnit__Second: - if (cmeta->values_offset && - sizeof(uint64_t) * (kds_index+1) <= values_length) - { - uint64_t *base = (uint64_t *) - ((char *)kds + __kds_unpack(cmeta->values_offset)); - kvar->u64 = base[kds_index] * 1000000L - - (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * USECS_PER_DAY; - *vclass = KVAR_CLASS__INLINE; - return true; - } - break; + assert(__KEXP_IS_VALID(kexp, karg) && kexp->exptype == karg->exptype); + if (!EXEC_KERN_EXPRESSION(kcxt, karg, __result)) + return false; + if (!XPU_DATUM_ISNULL(__result)) + return true; + } + __result->expr_ops = NULL; + return true; +} - case ArrowTimeUnit__MilliSecond: - if (cmeta->values_offset && - sizeof(uint64_t) * (kds_index+1) <= values_length) - { - uint64_t *base = (uint64_t *) - ((char *)kds + __kds_unpack(cmeta->values_offset)); - kvar->u64 = base[kds_index] * 1000L - - (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * USECS_PER_DAY; - *vclass = KVAR_CLASS__INLINE; - return true; - } - break; - - case ArrowTimeUnit__MicroSecond: - if (cmeta->values_offset && - sizeof(uint64_t) * (kds_index+1) <= values_length) - { - uint64_t *base = (uint64_t *) - ((char *)kds + __kds_unpack(cmeta->values_offset)); - kvar->u64 = base[kds_index] - - (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * USECS_PER_DAY; - *vclass = KVAR_CLASS__INLINE; - return true; - } - break; - - case ArrowTimeUnit__NanoSecond: - if (cmeta->values_offset && - sizeof(uint64_t) * (kds_index+1) <= values_length) - { - uint64_t *base = (uint64_t *) - ((char *)kds + __kds_unpack(cmeta->values_offset)); - kvar->u64 = base[kds_index] / 1000L - - (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * USECS_PER_DAY; - *vclass = KVAR_CLASS__INLINE; - return true; - } - break; +STATIC_FUNCTION(bool) +pgfn_LeastExpr(XPU_PGFUNCTION_ARGS) +{ + const xpu_datum_operators *kexp_ops = kexp->expr_ops; + const kern_expression *karg; + xpu_datum_t *temp; + int comp; + int i, sz = kexp_ops->xpu_type_sizeof; - default: - STROM_ELOG(kcxt, "unknown unit size of Arrow::Timestamp"); + temp = (xpu_datum_t *)alloca(sz); + memset(temp, 0, sz); + __result->expr_ops = NULL; + for (i=0, karg = KEXP_FIRST_ARG(kexp); + i < kexp->nr_args; + i++, karg = KEXP_NEXT_ARG(karg)) + { + assert(__KEXP_IS_VALID(kexp, karg) && kexp->exptype == karg->exptype); + if (!EXEC_KERN_EXPRESSION(kcxt, karg, temp)) return false; + if (XPU_DATUM_ISNULL(temp)) + continue; + + if (XPU_DATUM_ISNULL(__result)) + { + memcpy(__result, temp, sz); + } + else + { + if (!kexp_ops->xpu_datum_comp(kcxt, &comp, __result, temp)) + return false; + if (comp > 0) + memcpy(__result, temp, sz); + } } - *vclass = KVAR_CLASS__NULL; return true; } STATIC_FUNCTION(bool) -__arrow_fetch_interval_datum(kern_context *kcxt, - kern_data_store *kds, - kern_colmeta *cmeta, - uint32_t kds_index, - uint32_t slot_off, - kern_variable *kvar, - int *vclass) +pgfn_GreatestExpr(XPU_PGFUNCTION_ARGS) { - size_t values_length = __kds_unpack(cmeta->values_length); + const xpu_datum_operators *kexp_ops = kexp->expr_ops; + const kern_expression *karg; + xpu_datum_t *temp; + int comp; + int i, sz = kexp_ops->xpu_type_sizeof; - assert(cmeta->extra_offset == 0 && - cmeta->extra_length == 0); - switch (cmeta->attopts.interval.unit) + temp = (xpu_datum_t *)alloca(sz); + memset(temp, 0, sz); + __result->expr_ops = NULL; + for (i=0, karg = KEXP_FIRST_ARG(kexp); + i < kexp->nr_args; + i++, karg = KEXP_NEXT_ARG(karg)) { - case ArrowIntervalUnit__Year_Month: - if (cmeta->values_offset && - sizeof(uint32_t) * (kds_index+1) <= values_length) - { - xpu_interval_t *iv = (xpu_interval_t *) - ((char *)kcxt->kvars_slot + slot_off); - uint32_t *base = (uint32_t *) - ((char *)kds + __kds_unpack(cmeta->values_offset)); - iv->value.month = base[kds_index]; - iv->value.day = 0; - iv->value.time = 0; - *vclass = KVAR_CLASS__XPU_DATUM; - kvar->ptr = iv; - return true; - } - break; - case ArrowIntervalUnit__Day_Time: - if (cmeta->values_offset && - sizeof(uint32_t) * 2 * (kds_index+1) <= values_length) - { - xpu_interval_t *iv = (xpu_interval_t *) - ((char *)kcxt->kvars_slot + slot_off); - uint32_t *base = (uint32_t *) - ((char *)kds + __kds_unpack(cmeta->values_offset)); - iv->value.month = 0; - iv->value.day = base[2*kds_index]; - iv->value.time = base[2*kds_index+1]; - *vclass = KVAR_CLASS__XPU_DATUM; - kvar->ptr = iv; - return true; - } - break; - default: - STROM_ELOG(kcxt, "unknown unit-size of Arrow::Interval"); + assert(__KEXP_IS_VALID(kexp, karg) && kexp->exptype == karg->exptype); + if (!EXEC_KERN_EXPRESSION(kcxt, karg, temp)) return false; + if (XPU_DATUM_ISNULL(temp)) + continue; + + if (XPU_DATUM_ISNULL(__result)) + { + memcpy(__result, temp, sz); + } + else + { + if (!kexp_ops->xpu_datum_comp(kcxt, &comp, __result, temp)) + return false; + if (comp < 0) + memcpy(__result, temp, sz); + } } - *vclass = KVAR_CLASS__NULL; return true; } STATIC_FUNCTION(bool) -__arrow_fetch_fixed_size_binary_datum(kern_context *kcxt, - kern_data_store *kds, - kern_colmeta *cmeta, - uint32_t kds_index, - kern_variable *kvar, - int *vclass) +pgfn_CaseWhenExpr(XPU_PGFUNCTION_ARGS) { - unsigned int unitsz = cmeta->attopts.fixed_size_binary.byteWidth; + const kern_expression *karg; + xpu_datum_t *comp = NULL; + xpu_datum_t *temp = NULL; + int i, temp_sz = 0; - assert(cmeta->extra_offset == 0 && - cmeta->extra_length == 0); - if (cmeta->values_offset && - unitsz * (kds_index+1) <= __kds_unpack(cmeta->values_length)) + /* CASE expression, if any */ + if (kexp->u.casewhen.case_comp) { - char *base = ((char *)kds + __kds_unpack(cmeta->values_offset)); - - kvar->ptr = base + unitsz * kds_index; - *vclass = unitsz; - return true; + karg = (const kern_expression *) + ((char *)kexp + kexp->u.casewhen.case_comp); + assert(__KEXP_IS_VALID(kexp, karg)); + comp = (xpu_datum_t *)alloca(karg->expr_ops->xpu_type_sizeof); + if (!EXEC_KERN_EXPRESSION(kcxt, karg, comp)) + return false; } - *vclass = KVAR_CLASS__NULL; - return true; -} + /* evaluate each WHEN-clauses */ + assert((kexp->nr_args % 2) == 0); + for (i = 0, karg=KEXP_FIRST_ARG(kexp); + i < kexp->nr_args; + i += 2, karg=KEXP_NEXT_ARG(karg)) + { + bool matched = false; + assert(__KEXP_IS_VALID(kexp, karg)); + if (comp) + { + int status; -STATIC_FUNCTION(bool) -__arrow_fetch_variable_datum(kern_context *kcxt, - kern_data_store *kds, - kern_colmeta *cmeta, - uint32_t kds_index, - bool is_large_offset, - kern_variable *kvar, - int *vclass) -{ - uint64_t start, end; - char *extra; + if (temp_sz < karg->expr_ops->xpu_type_sizeof) + { + temp_sz = karg->expr_ops->xpu_type_sizeof + 32; + temp = (xpu_datum_t *)alloca(temp_sz); + } + if (!EXEC_KERN_EXPRESSION(kcxt, karg, temp)) + return false; + if (!karg->expr_ops->xpu_datum_comp(kcxt, &status, comp, temp)) + return false; + if (status == 0) + matched = true; + } + else + { + xpu_bool_t status; - if (arrow_fetch_secondary_index(kcxt, kds, kds_index, - cmeta->values_offset, - cmeta->values_length, - is_large_offset, - &start, &end)) - { - /* sanity checks */ - if (start > end || end - start >= 0x40000000UL || - end > __kds_unpack(cmeta->extra_length)) + if (!EXEC_KERN_EXPRESSION(kcxt, karg, &status)) + return false; + if (!XPU_DATUM_ISNULL(&status) && status.value) + matched = true; + } + + karg = KEXP_NEXT_ARG(karg); + assert(__KEXP_IS_VALID(kexp, karg)); + if (matched) { - STROM_ELOG(kcxt, "Arrow variable data corruption"); - return false; + assert(kexp->exptype == karg->exptype); + if (!EXEC_KERN_EXPRESSION(kcxt, karg, __result)) + return false; + /* OK */ + return true; } - extra = (char *)kds + __kds_unpack(cmeta->extra_offset); - kvar->ptr = extra + start; - *vclass = (end - start); - return true; } - return false; + + /* ELSE clause, if any */ + if (kexp->u.casewhen.case_else == 0) + __result->expr_ops = NULL; + else + { + karg = (const kern_expression *) + ((char *)kexp + kexp->u.casewhen.case_else); + assert(__KEXP_IS_VALID(kexp, karg) && kexp->exptype == karg->exptype); + if (!EXEC_KERN_EXPRESSION(kcxt, karg, __result)) + return false; + } + return true; } +/* + * ScalarArrayOpExpr + */ STATIC_FUNCTION(bool) -__arrow_fetch_array_datum(kern_context *kcxt, - kern_data_store *kds, - kern_colmeta *cmeta, - uint32_t kds_index, - bool is_large_offset, - uint32_t slot_off, - kern_variable *kvar, - int *vclass) +__ScalarArrayOpHeap(kern_context *kcxt, + xpu_bool_t *result, + const kern_expression *kexp, + const kern_expression *kcmp, + xpu_array_t *aval) { - uint64_t start, end; - - assert(cmeta->idx_subattrs < kds->nr_colmeta && - cmeta->num_subattrs == 1); - if (arrow_fetch_secondary_index(kcxt, kds, kds_index, - cmeta->values_offset, - cmeta->values_length, - is_large_offset, - &start, &end)) + const __ArrayTypeData *ar = (const __ArrayTypeData *)VARDATA_ANY(aval->u.heap.value); + uint8_t *nullmap = __pg_array_nullmap(ar); + char *base = __pg_array_dataptr(ar); + int ndim = __pg_array_ndim(ar); + uint32_t nitems = 0; + uint32_t offset = 0; + uint32_t slot_id = kexp->u.saop.slot_id; + kern_variable *kvar = &kcxt->kvars_slot[slot_id]; + int *vclass = &kcxt->kvars_class[slot_id]; + bool use_any = (kexp->opcode == FuncOpCode__ScalarArrayOpAny); + bool meet_nulls = false; + + /* determine the number of items */ + if (ndim > 0) + { + nitems = __pg_array_dim(ar, 0); + for (int k=1; k < ndim; k++) + nitems *= __pg_array_dim(ar,k); + } + /* walk on the array */ + for (uint32_t i=0; i < nitems; i++) { - xpu_array_t *array = (xpu_array_t *) - ((char *)kcxt->kvars_slot + slot_off); - /* sanity checks */ - if (start > end) + xpu_bool_t status; + char *addr; + + /* datum reference */ + if (nullmap && att_isnull(i, nullmap)) { - STROM_ELOG(kcxt, "Arrow::List secondary index corruption"); + *vclass = KVAR_CLASS__NULL; + kvar->ptr = NULL; + } + else + { + if (kexp->u.saop.elem_len > 0) + offset = TYPEALIGN(kexp->u.saop.elem_align, offset); + else if (!VARATT_NOT_PAD_BYTE(base + offset)) + offset = TYPEALIGN(kexp->u.saop.elem_align, offset); + addr = base + offset; + + if (kexp->u.saop.elem_byval) + { + assert(kexp->u.saop.elem_len > 0 && + kexp->u.saop.elem_len <= sizeof(kern_variable)); + *vclass = KVAR_CLASS__INLINE; + memcpy(kvar, addr, kexp->u.saop.elem_len); + offset += kexp->u.saop.elem_len; + } + else if (kexp->u.saop.elem_len > 0) + { + *vclass = kexp->u.saop.elem_len; + kvar->ptr = (void *)addr; + offset += kexp->u.saop.elem_len; + } + else if (kexp->u.saop.elem_len == -1) + { + *vclass = KVAR_CLASS__VARLENA; + kvar->ptr = (void *)addr; + offset += VARSIZE_ANY(addr); + } + else + { + STROM_ELOG(kcxt, "not a supported attribute length"); + return false; + } + } + /* call the comparator */ + if (!EXEC_KERN_EXPRESSION(kcxt, kcmp, &status)) return false; + if (!XPU_DATUM_ISNULL(&status)) + { + if (use_any) + { + if (status.value) + { + result->expr_ops = &xpu_bool_ops; + result->value = true; + return true; + } + } + else + { + if (!status.value) + { + result->expr_ops = &xpu_bool_ops; + result->value = false; + break; + } + } + } + else + { + meet_nulls = true; } - array->expr_ops = &xpu_array_ops; - array->length = end - start; - array->u.arrow.start = start; - array->u.arrow.smeta = &kds->colmeta[cmeta->idx_subattrs]; - assert(cmeta->num_subattrs == 1); - *vclass = KVAR_CLASS__XPU_DATUM; - kvar->ptr = array; - return true; } - return false; + + if (meet_nulls) + result->expr_ops = NULL; + else + { + result->expr_ops = &xpu_bool_ops; + result->value = !use_any; + } + return true; } -INLINE_FUNCTION(bool) -__arrow_fetch_composite_datum(kern_context *kcxt, - kern_data_store *kds, - kern_colmeta *cmeta, - uint32_t kds_index, - uint32_t slot_off, - kern_variable *kvar, - int *vclass) +STATIC_FUNCTION(bool) +__ScalarArrayOpArrow(kern_context *kcxt, + xpu_bool_t *result, + const kern_expression *kexp, + const kern_expression *kcmp, + xpu_array_t *aval) { - xpu_composite_t *comp = (xpu_composite_t *) - ((char *)kcxt->kvars_slot + slot_off); + const kern_colmeta *smeta = aval->u.arrow.smeta; + const kern_data_store *kds; + uint32_t slot_id = kexp->u.saop.slot_id; + char *slot_buf = NULL; + bool use_any = (kexp->opcode == FuncOpCode__ScalarArrayOpAny); + bool meet_nulls = false; + + result->value = !use_any; + kds = (const kern_data_store *) + ((char *)smeta - smeta->kds_offset); + if (kexp->u.saop.slot_bufsz > 0) + slot_buf = (char *)alloca(kexp->u.saop.slot_bufsz); + for (int k=0; k < aval->length; k++) + { + uint32_t index = aval->u.arrow.start + k; + xpu_bool_t status; - comp->expr_ops = &xpu_composite_ops; - comp->comp_typid = cmeta->atttypid; - comp->comp_typmod = cmeta->atttypmod; - comp->rowidx = kds_index; - comp->nfields = cmeta->num_subattrs; - comp->smeta = &kds->colmeta[cmeta->idx_subattrs]; - comp->value = NULL; - *vclass = KVAR_CLASS__XPU_DATUM; - kvar->ptr = comp; + if (smeta->nullmap_offset == 0 || + arrow_bitmap_check(kds, index, + smeta->nullmap_offset, + smeta->nullmap_length)) + { + if (!__kern_extract_arrow_field(kcxt, + kds, + smeta, + index, + &kcxt->kvars_slot[slot_id], + &kcxt->kvars_class[slot_id], + slot_buf)) + return false; + } + else + { + kcxt->kvars_class[slot_id] = KVAR_CLASS__NULL; + kcxt->kvars_slot[slot_id].ptr = NULL; + } + /* call the comparator */ + if (!EXEC_KERN_EXPRESSION(kcxt, kcmp, &status)) + return false; + if (!XPU_DATUM_ISNULL(&status)) + { + if (use_any) + { + if (status.value) + { + result->expr_ops = &xpu_bool_ops; + result->value = true; + break; + } + } + else + { + if (!status.value) + { + result->expr_ops = &xpu_bool_ops; + result->value = false; + break; + } + } + } + else + { + meet_nulls = true; + } + } + + if (meet_nulls) + result->expr_ops = NULL; + else + { + result->expr_ops = &xpu_bool_ops; + result->value = !use_any; + } return true; } -INLINE_FUNCTION(bool) -__kern_extract_arrow_field(kern_context *kcxt, - kern_data_store *kds, - kern_colmeta *cmeta, - uint32_t kds_index, - uint32_t slot_off, - kern_variable *kvar, - int *vclass) +STATIC_FUNCTION(bool) +pgfn_ScalarArrayOp(XPU_PGFUNCTION_ARGS) { - switch (cmeta->attopts.tag) + xpu_bool_t *result = (xpu_bool_t *)__result; + xpu_array_t aval; + uint32_t slot_id = kexp->u.saop.slot_id; + const kern_expression *karg; + + assert(kexp->exptype == TypeOpCode__bool && + kexp->nr_args == 2 && + slot_id < kcxt->kvars_nslots); + memset(result, 0, sizeof(xpu_bool_t)); + + /* fetch array value */ + karg = KEXP_FIRST_ARG(kexp); + assert(KEXP_IS_VALID(karg, array)); + if (!EXEC_KERN_EXPRESSION(kcxt, karg, &aval)) + return false; + /* comparator expression */ + karg = KEXP_NEXT_ARG(karg); + assert(KEXP_IS_VALID(karg, bool)); + if (aval.length < 0) { - case ArrowType__Bool: - assert(slot_off == 0); - if (!__arrow_fetch_bool_datum(kcxt, kds, cmeta, - kds_index, - kvar, vclass)) - return false; - break; + if (!__ScalarArrayOpHeap(kcxt, result, kexp, karg, &aval)) + return false; + } + else + { + if (!__ScalarArrayOpArrow(kcxt, result, kexp, karg, &aval)) + return false; + } + /* cleanup the slot */ + kcxt->kvars_class[slot_id] = KVAR_CLASS__NULL; + kcxt->kvars_slot[slot_id].ptr = NULL; + return true; +} - case ArrowType__Int: - assert(slot_off == 0); - if (!__arrow_fetch_int_datum(kcxt, kds, cmeta, - kds_index, - kvar, vclass)) - return false; - break; - - case ArrowType__FloatingPoint: - assert(slot_off == 0); - if (!__arrow_fetch_float_datum(kcxt, kds, cmeta, - kds_index, - kvar, vclass)) - return false; - break; +/* ---------------------------------------------------------------- + * + * Routines to support Projection + * + * ---------------------------------------------------------------- + */ +PUBLIC_FUNCTION(int) +kern_form_heaptuple(kern_context *kcxt, + const kern_expression *kproj, + const kern_data_store *kds_dst, + HeapTupleHeaderData *htup) +{ + uint32_t t_hoff; + uint32_t t_next; + uint16_t t_infomask = 0; + bool t_hasnull = false; + int nattrs = kproj->u.proj.nattrs; - case ArrowType__Decimal: - if (!__arrow_fetch_decimal_datum(kcxt, kds, cmeta, - kds_index, - slot_off, - kvar, vclass)) - return false; - break; + if (kds_dst && kds_dst->ncols < nattrs) + nattrs = kds_dst->ncols; + /* has any NULL attributes? */ + for (int j=0; j < nattrs; j++) + { + uint32_t slot_id = kproj->u.proj.desc[j].slot_id; - case ArrowType__Date: - assert(slot_off == 0); - if (!__arrow_fetch_date_datum(kcxt, kds, cmeta, - kds_index, - kvar, vclass)) - return false; - break; - - case ArrowType__Time: - assert(slot_off == 0); - if (!__arrow_fetch_time_datum(kcxt, kds, cmeta, - kds_index, - kvar, vclass)) - return false; + assert(slot_id < kcxt->kvars_nslots); + if (kcxt->kvars_class[slot_id] == KVAR_CLASS__NULL) + { + t_infomask |= HEAP_HASNULL; + t_hasnull = true; break; + } + } - case ArrowType__Timestamp: - assert(slot_off == 0); - if (!__arrow_fetch_timestamp_datum(kcxt, kds, cmeta, - kds_index, - kvar, vclass)) - return false; - break; + /* set up headers */ + t_hoff = offsetof(HeapTupleHeaderData, t_bits); + if (t_hasnull) + t_hoff += BITMAPLEN(nattrs); + t_hoff = MAXALIGN(t_hoff); - case ArrowType__Interval: - if (!__arrow_fetch_interval_datum(kcxt, kds, cmeta, - kds_index, - slot_off, - kvar, vclass)) - return false; - break; + if (htup) + { + memset(htup, 0, t_hoff); + htup->t_choice.t_datum.datum_typmod = kds_dst->tdtypmod; + htup->t_choice.t_datum.datum_typeid = kds_dst->tdtypeid; + htup->t_ctid.ip_blkid.bi_hi = 0xffff; /* InvalidBlockNumber */ + htup->t_ctid.ip_blkid.bi_lo = 0xffff; + htup->t_ctid.ip_posid = 0; /* InvalidOffsetNumber */ + htup->t_infomask2 = (nattrs & HEAP_NATTS_MASK); + htup->t_hoff = t_hoff; + } - case ArrowType__FixedSizeBinary: - assert(slot_off == 0); - if (!__arrow_fetch_fixed_size_binary_datum(kcxt, kds, cmeta, - kds_index, - kvar, vclass)) - return false; - break; + /* walk on the columns */ + for (int j=0; j < nattrs; j++) + { + const kern_colmeta *cmeta = &kds_dst->colmeta[j]; + const kern_projection_desc *pdesc = &kproj->u.proj.desc[j]; + const kern_variable *kvar = &kcxt->kvars_slot[pdesc->slot_id]; + int vclass = kcxt->kvars_class[pdesc->slot_id]; + int nbytes; + char *buffer = NULL; - case ArrowType__Utf8: - case ArrowType__Binary: - assert(slot_off == 0); - if (!__arrow_fetch_variable_datum(kcxt, kds, cmeta, - kds_index, - false, - kvar, vclass)) - return false; - break; + if (vclass == KVAR_CLASS__NULL) + continue; + /* adjust alignment */ + t_next = TYPEALIGN(cmeta->attalign, t_hoff); + if (htup) + { + if (t_next > t_hoff) + memset((char *)htup + t_hoff, 0, t_next - t_hoff); + buffer = (char *)htup + t_next; + } - case ArrowType__LargeUtf8: - case ArrowType__LargeBinary: - assert(slot_off == 0); - if (!__arrow_fetch_variable_datum(kcxt, kds, cmeta, - kds_index, - true, - kvar, vclass)) - return false; - break; + if (vclass == KVAR_CLASS__XPU_DATUM) + { + const xpu_datum_t *xdatum = (const xpu_datum_t *)kvar->ptr; - case ArrowType__List: - if (!__arrow_fetch_array_datum(kcxt, kds, cmeta, - kds_index, - false, - slot_off, - kvar, vclass)) - return false; - break; + assert(xdatum->expr_ops != NULL); + nbytes = xdatum->expr_ops->xpu_datum_write(kcxt, buffer, xdatum); + if (nbytes < 0) + return -1; + } + else if (cmeta->attlen > 0) + { + if (vclass == KVAR_CLASS__INLINE) + { + assert(cmeta->attlen <= sizeof(kern_variable)); + if (buffer) + memcpy(buffer, kvar, cmeta->attlen); + } + else if (vclass >= 0) + { + int sz = Min(vclass, cmeta->attlen); - case ArrowType__LargeList: - if (!__arrow_fetch_array_datum(kcxt, kds, cmeta, - kds_index, - false, - slot_off, - kvar, vclass)) - return false; - break; + if (buffer) + { + if (sz > 0) + memcpy(buffer, kvar->ptr, sz); + if (sz < cmeta->attlen) + memset(buffer + sz, 0, cmeta->attlen - sz); + } + } + else + { + STROM_ELOG(kcxt, "Bug? unexpected kvar-class for fixed-length datum"); + return -1; + } + nbytes = cmeta->attlen; + } + else if (cmeta->attlen == -1) + { + if (vclass >= 0) + { + nbytes = VARHDRSZ + vclass; + if (buffer) + { + if (vclass > 0) + memcpy(buffer+VARHDRSZ, kvar->ptr, vclass); + SET_VARSIZE(buffer, nbytes); + } + } + else if (vclass == KVAR_CLASS__VARLENA) + { + nbytes = VARSIZE_ANY(kvar->ptr); + if (buffer) + memcpy(buffer, kvar->ptr, nbytes); + if (VARATT_IS_EXTERNAL(kvar->ptr)) + t_infomask |= HEAP_HASEXTERNAL; + } + else + { + STROM_ELOG(kcxt, "Bug? unexpected kvar-class for varlena datum"); + return -1; + } + t_infomask |= HEAP_HASVARWIDTH; + } + else + { + STROM_ELOG(kcxt, "Bug? unsupported attribute-length"); + return -1; + } + /* set not-null bit, if valid */ + if (htup && t_hasnull) + htup->t_bits[j>>3] |= (1<<(j & 7)); + t_hoff = t_next + nbytes; + } + if (htup) + { + int ctid_slot = kproj->u.proj.ctid_slot; - case ArrowType__Struct: - if (!__arrow_fetch_composite_datum(kcxt, kds, cmeta, - kds_index, - slot_off, - kvar, vclass)) - return false; - break; - default: - STROM_ELOG(kcxt, "Unsupported Apache Arrow type"); - return false; + /* assign ctid, if any */ + if (ctid_slot >= 0 && + ctid_slot < kcxt->kvars_nslots && + kcxt->kvars_class[ctid_slot] == sizeof(ItemPointerData)) + { + memcpy(&htup->t_ctid, + kcxt->kvars_slot[ctid_slot].ptr, + sizeof(ItemPointerData)); + } + else + { + ItemPointerSetInvalid(&htup->t_ctid); + } + htup->t_infomask = t_infomask; + SET_VARSIZE(&htup->t_choice.t_datum, t_hoff); } - return true; + return t_hoff; +} + +EXTERN_FUNCTION(int) +kern_estimate_heaptuple(kern_context *kcxt, + const kern_expression *kproj, + const kern_data_store *kds_dst) +{ + const kern_expression *karg; + int i, sz; + + for (i=0, karg = KEXP_FIRST_ARG(kproj); + i < kproj->nr_args; + i++, karg = KEXP_NEXT_ARG(karg)) + { + assert(__KEXP_IS_VALID(kproj, karg) && + karg->opcode == FuncOpCode__SaveExpr); + if (!EXEC_KERN_EXPRESSION(kcxt, karg, NULL)) + return -1; + } + /* then, estimate the length */ + sz = kern_form_heaptuple(kcxt, kproj, kds_dst, NULL); + if (sz < 0) + return -1; + return MAXALIGN(offsetof(kern_tupitem, htup) + sz); +} + +STATIC_FUNCTION(bool) +pgfn_Projection(XPU_PGFUNCTION_ARGS) +{ + /* + * FuncOpExpr_Projection should be handled by kern_estimate_heaptuple() + * and kern_form_heaptuple() by the caller. + */ + STROM_ELOG(kcxt, "pgfn_Projection is not implemented"); + return false; } -INLINE_FUNCTION(bool) -__extract_arrow_tuple_sysattr(kern_context *kcxt, - const kern_data_store *kds, - uint32_t kds_index, - const kern_vars_defitem *kvdef) +STATIC_FUNCTION(bool) +pgfn_HashValue(XPU_PGFUNCTION_ARGS) { - static ItemPointerData __invalid_ctid__ = {{0,0},0}; - uint32_t slot_id = kvdef->var_slot_id; + const kern_expression *karg; + xpu_int4_t *result = (xpu_int4_t *)__result; + xpu_datum_t *datum = (xpu_datum_t *)alloca(64); + int i, datum_sz = 64; + uint32_t hash = 0xffffffffU; - /* out of range? */ - if (slot_id >= kcxt->kvars_nslots) - return true; - switch (kvdef->var_resno) + for (i=0, karg = KEXP_FIRST_ARG(kexp); + i < kexp->nr_args; + i++, karg = KEXP_NEXT_ARG(karg)) { - case SelfItemPointerAttributeNumber: - kcxt->kvars_slot[slot_id].ptr = (void *)&__invalid_ctid__; - kcxt->kvars_class[slot_id] = sizeof(ItemPointerData); - break; - case MinTransactionIdAttributeNumber: - kcxt->kvars_slot[slot_id].u32 = FrozenTransactionId; - kcxt->kvars_class[slot_id] = KVAR_CLASS__INLINE; - break; - case MaxTransactionIdAttributeNumber: - kcxt->kvars_slot[slot_id].u32 = InvalidTransactionId; - kcxt->kvars_class[slot_id] = KVAR_CLASS__INLINE; - break; - case MinCommandIdAttributeNumber: - case MaxCommandIdAttributeNumber: - kcxt->kvars_slot[slot_id].u32 = FirstCommandId; - kcxt->kvars_class[slot_id] = KVAR_CLASS__INLINE; - break; - case TableOidAttributeNumber: - kcxt->kvars_slot[slot_id].u32 = kds->table_oid; - kcxt->kvars_class[slot_id] = KVAR_CLASS__INLINE; - break; - default: - STROM_ELOG(kcxt, "not a supported system attribute reference"); + const xpu_datum_operators *expr_ops = karg->expr_ops; + uint32_t __hash; + + if (expr_ops->xpu_type_sizeof > datum_sz) + { + datum_sz = expr_ops->xpu_type_sizeof; + datum = (xpu_datum_t *)alloca(datum_sz); + } + if (!EXEC_KERN_EXPRESSION(kcxt, karg, datum)) return false; + if (!XPU_DATUM_ISNULL(datum)) + { + if (!expr_ops->xpu_datum_hash(kcxt, &__hash, datum)) + return false; + hash ^= __hash; + } } + hash ^= 0xffffffffU; + + result->expr_ops = &xpu_int4_ops; + result->value = hash; return true; } STATIC_FUNCTION(bool) -kern_extract_arrow_tuple(kern_context *kcxt, - kern_data_store *kds, - uint32_t kds_index, - const kern_vars_defitem *kvars_items, - int kvars_nloads) +pgfn_SaveExpr(XPU_PGFUNCTION_ARGS) { - const kern_vars_defitem *kvdef = kvars_items; - int kvars_count = 0; + const kern_expression *karg = KEXP_FIRST_ARG(kexp); + const xpu_datum_operators *expr_ops = kexp->expr_ops; + xpu_datum_t *result = __result; + uint32_t slot_id = kexp->u.save.slot_id; + uint32_t slot_off = kexp->u.save.slot_off; + xpu_datum_t *slot_buf = NULL; - assert(kds->format == KDS_FORMAT_ARROW); - /* fillup invalid values for system attribute, if any */ - while (kvars_count < kvars_nloads && - kvdef->var_resno < 0) + assert(slot_id < kcxt->kvars_nslots); + assert(kexp->nr_args == 1 && + kexp->exptype == karg->exptype); + if (slot_off > 0) { - if (!__extract_arrow_tuple_sysattr(kcxt, kds, kds_index, kvdef)) - return false; - kvdef++; - kvars_count++; + assert(slot_off + expr_ops->xpu_type_sizeof <= kcxt->kvars_nbytes); + slot_buf = (xpu_datum_t *)((char *)kcxt->kvars_slot + slot_off); } - - while (kvars_count < kvars_nloads && - kvdef->var_resno <= kds->ncols) + /* SaveExpr accept NULL result buffer! */ + if (!result) { - kern_colmeta *cmeta = &kds->colmeta[kvdef->var_resno-1]; - uint32_t slot_id = kvdef->var_slot_id; - - assert(slot_id < kcxt->kvars_nslots); - if (cmeta->nullmap_offset == 0 || - arrow_bitmap_check(kds, kds_index, - cmeta->nullmap_offset, - cmeta->nullmap_length)) - { - if (!__kern_extract_arrow_field(kcxt, - kds, - cmeta, - kds_index, - kvdef->var_slot_off, - &kcxt->kvars_slot[slot_id], - &kcxt->kvars_class[slot_id])) - return false; - } + if (slot_buf) + result = slot_buf; else - { - kcxt->kvars_class[slot_id] = KVAR_CLASS__NULL; - kcxt->kvars_slot[slot_id].ptr = NULL; - } - kvdef++; - kvars_count++; + result = (xpu_datum_t *)alloca(expr_ops->xpu_type_sizeof); } - /* other fields, which refers out of range, are NULL */ - while (kvars_count < kvars_nloads) + /* Run the expression */ + if (!EXEC_KERN_EXPRESSION(kcxt, karg, result)) + return false; + if (XPU_DATUM_ISNULL(result)) { - uint32_t slot_id = kvdef->var_slot_id; + kcxt->kvars_class[slot_id] = KVAR_CLASS__NULL; + } + else if (slot_buf) + { + if (slot_buf != result) + memcpy(slot_buf, result, expr_ops->xpu_type_sizeof); + kcxt->kvars_class[slot_id] = KVAR_CLASS__XPU_DATUM; + kcxt->kvars_slot[slot_id].ptr = slot_buf; + } + else + { + if (!expr_ops->xpu_datum_store(kcxt, result, + &kcxt->kvars_class[slot_id], + &kcxt->kvars_slot[slot_id])) + return false; + } + return true; +} - if (slot_id < kcxt->kvars_nslots) +STATIC_FUNCTION(bool) +pgfn_JoinQuals(XPU_PGFUNCTION_ARGS) +{ + const kern_expression *karg; + xpu_int4_t *result = (xpu_int4_t *)__result; + int i, status = 1; + + assert(kexp->exptype == TypeOpCode__bool); + for (i=0, karg = KEXP_FIRST_ARG(kexp); + i < kexp->nr_args; + i++, karg = KEXP_NEXT_ARG(karg)) + { + xpu_bool_t datum; + + if (status < 0 && (karg->expflags & KEXP_FLAG__IS_PUSHED_DOWN) != 0) + continue; + if (!EXEC_KERN_EXPRESSION(kcxt, karg, &datum)) + return false; + if (XPU_DATUM_ISNULL(&datum) || !datum.value) { - kcxt->kvars_class[slot_id] = KVAR_CLASS__NULL; - kcxt->kvars_slot[slot_id].ptr = NULL; + /* + * NOTE: Even if JoinQual returns 'unmatched' status, we need + * to check whether the pure JOIN ... ON clause is satisfied, + * or not, if OUTER JOIN case. + * '-1' means JoinQual is not matched, because of the pushed- + * down qualifiers from WHERE-clause, not JOIN ... ON. + */ + if ((karg->expflags & KEXP_FLAG__IS_PUSHED_DOWN) == 0) + { + status = 0; + break; + } + status = -1; } - kvdef++; - kvars_count++; } + result->expr_ops = kexp->expr_ops; + result->value = status; return true; } +STATIC_FUNCTION(bool) +pgfn_GiSTEval(XPU_PGFUNCTION_ARGS) +{ + STROM_ELOG(kcxt, "pgfn_GiSTEval should not be called as a normal kernel expression"); + return false; +} + +STATIC_FUNCTION(bool) +pgfn_Packed(XPU_PGFUNCTION_ARGS) +{ + STROM_ELOG(kcxt, "pgfn_Packed should not be called as a normal kernel expression"); + return false; +} + +STATIC_FUNCTION(bool) +pgfn_AggFuncs(XPU_PGFUNCTION_ARGS) +{ + STROM_ELOG(kcxt, "pgfn_AggFuncs should not be called as a normal kernel expression"); + return false; +} + /* ------------------------------------------------------------ * * Extract GpuCache tuples @@ -2368,31 +2596,155 @@ ExecGiSTIndexPostQuals(kern_context *kcxt, */ STATIC_FUNCTION(bool) xpu_array_datum_ref(kern_context *kcxt, - xpu_datum_t *result, + xpu_datum_t *__result, int vclass, const kern_variable *kvar) { - STROM_ELOG(kcxt, "xpu_array_datum_ref is not implemented"); - return false; + xpu_array_t *result = (xpu_array_t *)__result; + + if (vclass == KVAR_CLASS__VARLENA) + { + result->expr_ops = &xpu_array_ops; + result->length = -1; + result->u.heap.value = (const varlena *)kvar->ptr; + } + else + { + STROM_ELOG(kcxt, "unexpected vclass for device numeric data type."); + } + return true; } STATIC_FUNCTION(bool) xpu_array_datum_store(kern_context *kcxt, - const xpu_datum_t *arg, + const xpu_datum_t *__arg, int *p_vclass, kern_variable *p_kvar) { - STROM_ELOG(kcxt, "xpu_array_datum_store is not implemented"); - return false; + const xpu_array_t *arg = (const xpu_array_t *)__arg; + + if (XPU_DATUM_ISNULL(arg)) + { + *p_vclass = KVAR_CLASS__NULL; + } + else if (arg->length < 0) + { + *p_vclass = KVAR_CLASS__VARLENA; + p_kvar->ptr = (void *)arg->u.heap.value; + } + else + { + STROM_ELOG(kcxt, "unable to use intermediation results of xpu_array_t"); + return false; + } + return true; } STATIC_FUNCTION(int) xpu_array_datum_write(kern_context *kcxt, char *buffer, - const xpu_datum_t *xdatum) + const xpu_datum_t *__arg) { - STROM_ELOG(kcxt, "xpu_array_datum_write is not implemented"); - return -1; + const xpu_array_t *arg = (const xpu_array_t *)__arg; + int nbytes; + + if (XPU_DATUM_ISNULL(arg)) + return 0; + if (arg->length < 0) + { + nbytes = VARSIZE_ANY(arg->u.heap.value); + if (buffer) + memcpy(buffer, arg->u.heap.value, nbytes); + } + else + { + const kern_colmeta *smeta = arg->u.arrow.smeta; + const kern_data_store *kds; + uint8_t *nullmap = NULL; + kern_variable kvar; + int vclass; + char *slot_buf = NULL; + + kds = (const kern_data_store *) + ((char *)smeta - smeta->kds_offset); + if (smeta->dtype_sizeof > 0) + slot_buf = (char *)alloca(smeta->dtype_sizeof); + nbytes = (VARHDRSZ + + offsetof(__ArrayTypeData, data[2]) + + MAXALIGN(BITMAPLEN(arg->length))); + if (buffer) + { + __ArrayTypeData *arr = (__ArrayTypeData *)(buffer + VARHDRSZ); + + memset(arr, 0, nbytes - VARHDRSZ); + arr->ndim = 1; + arr->elemtype = smeta->atttypid; + arr->data[0] = arg->length; + arr->data[1] = 1; + if (smeta->nullmap_offset != 0) + nullmap = (uint8_t *)&arr->data[2]; + } + + for (int k=0; k < arg->length; k++) + { + uint32_t index = arg->u.arrow.start + k; + + if (smeta->nullmap_offset != 0 && + !arrow_bitmap_check(kds, index, + smeta->nullmap_offset, + smeta->nullmap_length)) + continue; + if (!__kern_extract_arrow_field(kcxt, + kds, + smeta, + index, + &kvar, + &vclass, + slot_buf)) + return false; + if (vclass != KVAR_CLASS__NULL) + { + if (nullmap) + nullmap[k>>3] |= (1<<(k & 7)); + + nbytes = TYPEALIGN(smeta->attalign, nbytes); + if (vclass == KVAR_CLASS__INLINE) + { + if (buffer) + memcpy(buffer + nbytes, &kvar, smeta->attlen); + nbytes += smeta->attlen; + } + else if (vclass == KVAR_CLASS__VARLENA) + { + int sz = VARSIZE_ANY(kvar.ptr); + + if (buffer) + memcpy(buffer + nbytes, kvar.ptr, sz); + nbytes += sz; + } + else if (vclass == KVAR_CLASS__XPU_DATUM) + { + xpu_datum_t *xdatum = (xpu_datum_t *)kvar.ptr; + char *dst = (buffer ? buffer + nbytes : NULL); + int sz; + + sz = xdatum->expr_ops->xpu_datum_write(kcxt, dst, xdatum); + if (sz < 0) + return false; + nbytes += sz; + } + else + { + assert(vclass > 0); + + if (buffer) + memcpy(buffer + nbytes, kvar.ptr, vclass); + nbytes += vclass; + } + } + } + } + return nbytes; } STATIC_FUNCTION(bool) @@ -2477,6 +2829,8 @@ PGSTROM_SQLTYPE_OPERATORS(composite,false,8,-1); { TypeOpCode__##NAME, &xpu_##NAME##_ops }, PUBLIC_DATA xpu_type_catalog_entry builtin_xpu_types_catalog[] = { #include "xpu_opcodes.h" + //{ TypeOpCode__composite, &xpu_composite_ops }, + { TypeOpCode__array, &xpu_array_ops }, { TypeOpCode__Invalid, NULL } }; @@ -2506,6 +2860,8 @@ PUBLIC_DATA xpu_function_catalog_entry builtin_xpu_functions_catalog[] = { {FuncOpCode__LeastExpr, pgfn_LeastExpr}, {FuncOpCode__GreatestExpr, pgfn_GreatestExpr}, {FuncOpCode__CaseWhenExpr, pgfn_CaseWhenExpr}, + {FuncOpCode__ScalarArrayOpAny, pgfn_ScalarArrayOp}, + {FuncOpCode__ScalarArrayOpAll, pgfn_ScalarArrayOp}, #include "xpu_opcodes.h" {FuncOpCode__Projection, pgfn_Projection}, {FuncOpCode__LoadVars, pgfn_LoadVars}, diff --git a/src/xpu_common.h b/src/xpu_common.h index 0823aff48..4ed5a8330 100644 --- a/src/xpu_common.h +++ b/src/xpu_common.h @@ -250,6 +250,8 @@ typedef enum { FuncOpCode__LeastExpr, FuncOpCode__GreatestExpr, FuncOpCode__CaseWhenExpr, + FuncOpCode__ScalarArrayOpAny, + FuncOpCode__ScalarArrayOpAll, #include "xpu_opcodes.h" /* for projection */ FuncOpCode__Projection = 9999, @@ -413,6 +415,17 @@ kcxt_reset(kern_context *kcxt) kcxt->vlpos = kcxt->vlbuf; } +INLINE_FUNCTION(char *) +kcxt_slot_buf(kern_context *kcxt, uint32_t slot_off) +{ + if (slot_off == 0) + return NULL; + assert(slot_off >= (sizeof(kern_variable) * kcxt->kvars_nslots + + sizeof(int) * kcxt->kvars_nslots) && + slot_off < kcxt->kvars_nbytes); + return ((char *)kcxt->kvars_slot + slot_off); +} + INLINE_FUNCTION(void) __strncpy(char *d, const char *s, uint32_t n) { @@ -460,6 +473,8 @@ struct kern_colmeta { int8_t atttypkind; /* copy of kds->format */ char kds_format; + /* copy of sizeof(xpu_xxxx_t) if any */ + int16_t dtype_sizeof; /* * offset from kds for the reverse reference. * kds = (kern_data_store *)((char *)cmeta - cmeta->kds_offset) @@ -1521,9 +1536,6 @@ struct xpu_array_t { int32_t length; union { struct { - bool attbyval; - int8_t attalign; - int16_t attlen; const varlena *value; } heap; struct { @@ -1535,6 +1547,64 @@ struct xpu_array_t { typedef struct xpu_array_t xpu_array_t; EXTERN_DATA xpu_datum_operators xpu_array_ops; +/* access macros for heap array */ +typedef struct +{ + int32_t ndim; /* # of dimensions */ + int32_t dataoffset; /* offset to data, or 0 if no bitmap */ + Oid elemtype; /* element type OID */ + uint32_t data[1]; +} __ArrayTypeData; /* payload of ArrayType */ + +INLINE_FUNCTION(int32_t) +__pg_array_ndim(const __ArrayTypeData *ar) +{ + return __Fetch(&ar->ndim); +} +INLINE_FUNCTION(int32_t) +__pg_array_dataoff(const __ArrayTypeData *ar) +{ + return __Fetch(&ar->dataoffset); +} +INLINE_FUNCTION(bool) +__pg_array_hasnull(const __ArrayTypeData *ar) +{ + return (__pg_array_dataoff(ar) != 0); +} +INLINE_FUNCTION(int32_t) +__pg_array_dim(const __ArrayTypeData *ar, int k) +{ + return __Fetch(&ar->data[k]); +} +INLINE_FUNCTION(uint8_t *) +__pg_array_nullmap(const __ArrayTypeData *ar) +{ + uint32_t dataoff = __pg_array_dataoff(ar); + + if (dataoff > 0) + { + int32_t ndim = __pg_array_ndim(ar); + + return (uint8_t *)((char *)&ar->data[2 * ndim]); + } + return NULL; +} +INLINE_FUNCTION(char *) +__pg_array_dataptr(const __ArrayTypeData *ar) +{ + uint32_t dataoff = __pg_array_dataoff(ar); + + if (dataoff == 0) + { + int32_t ndim = __pg_array_ndim(ar); + + dataoff = MAXALIGN(VARHDRSZ + offsetof(__ArrayTypeData, + data[2 * ndim])); + } + assert(dataoff >= VARHDRSZ + offsetof(__ArrayTypeData, data)); + return (char *)ar + dataoff - VARHDRSZ; +} + /* * xpu_composite_t - composite type support * @@ -1743,22 +1813,30 @@ struct kern_expression uint32_t case_comp; /* key value to be compared, if any */ uint32_t case_else; /* ELSE clause, if any */ char data[1] __MAXALIGNED__; - } casewhen; + } casewhen; /* Case-When */ + struct { + uint32_t slot_id; /* temporary slot-id */ + uint32_t slot_bufsz; /* if >0, alloca(slot_bufsz) */ + bool elem_byval; /* attbyval of the element type */ + int8_t elem_align; /* attalign of the element type */ + int16_t elem_len; /* attlen of the element type */ + char data[1] __MAXALIGNED__; + } saop; /* ScalarArrayOp */ struct { int depth; int nloads; kern_vars_defitem kvars[1]; } load; /* VarLoads */ struct { - int gist_depth; /* special depth for GiST index */ - uint32_t gist_oid; /* OID of GiST index (for EXPLAIN) */ - kern_vars_defitem ivar; /* index item reference */ - char data[1] __MAXALIGNED__; + int gist_depth; /* special depth for GiST index */ + uint32_t gist_oid; /* OID of GiST index (for EXPLAIN) */ + kern_vars_defitem ivar; /* index item reference */ + char data[1] __MAXALIGNED__; } gist; /* GiSTEval */ struct { uint32_t slot_id; /* destination slot-id */ uint32_t slot_off; /* kvars-slot buffer offset, if needed */ - char data[1]; + char data[1] __MAXALIGNED__; } save; /* SaveExpr */ struct { int nattrs;