Skip to content

Commit

Permalink
Code quality
Browse files Browse the repository at this point in the history
  • Loading branch information
vpinna80 committed Dec 22, 2023
1 parent 45550ac commit f8cab1a
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 43 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public DataPointImpl decode(Row row, int start)
{
ScalarValue<?, ?, ?, ?>[] vals = new ScalarValue<?, ?, ?, ?>[components.length];
for (int i = 0; i < components.length; i++)
vals[i] = getScalarFor(row.get(i + start), components[i]);
vals[i] = getScalarFor(components[i], row.get(i + start));

Object lineage = row.get(components.length + start);
if (lineage instanceof byte[])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,19 +495,19 @@ private <TT> UDF2<Object, Object, Object[]> udfForComponent(DataStructureCompone
Serializable[] source = (Serializable[]) kryo.readClassAndObject(new Input((byte[]) newV));
result = Arrays.stream(source)
.map(v -> v instanceof byte[] ? kryo.readClassAndObject(new Input((byte[]) v)) : v)
.map(v -> getScalarFor(v, comp))
.map(v -> getScalarFor(comp, v))
.collect(toList());
}
else if (newV instanceof Seq)
{
@SuppressWarnings("unchecked")
SeqOps<Object, Seq<Object>, Seq<Object>> ravV = (SeqOps<Object, Seq<Object>, Seq<Object>>) newV;
result = asJava(ravV.map(serialized -> getScalarFor(serialized, comp)));
result = asJava(ravV.map(serialized -> getScalarFor(comp, serialized)));
}
else
result = getScalarFor(newV, comp);
result = getScalarFor(comp, newV);

Collection<ScalarValue<?, ?, ?, ?>> finished = finisher.apply((TT) result, getScalarFor(oldV, comp));
Collection<ScalarValue<?, ?, ?, ?>> finished = finisher.apply((TT) result, getScalarFor(comp, oldV));

return finished.stream()
.map(ScalarValue::get)
Expand Down Expand Up @@ -544,7 +544,7 @@ public DataSet aggr(DataSetMetadata structure, Set<DataStructureComponent<Identi

MapGroupsFunction<Row, Row, Row> aggregator = (keyRow, s) -> {
Map<DataStructureComponent<Identifier, ?, ?>, ScalarValue<?, ?, ?, ?>> keyValues = keys.stream()
.collect(toMapWithValues(c -> getScalarFor(keyRow.getAs(c.getName()), c)));
.collect(toMapWithValues(c -> getScalarFor(c, keyRow.getAs(c.getName()))));
return StreamSupport.stream(new SparkSpliterator(s, bufferSize), !Utils.SEQUENTIAL)
.map(encoder::decode)
.collect(collectingAndThen(groupCollector, r -> resultEncoder.encode(finisher.apply(r, keyValues))));
Expand Down Expand Up @@ -603,7 +603,7 @@ public <A, T, TT> Stream<T> streamByKeys(Set<DataStructureComponent<Identifier,
// decode Row[] from the UDF into List<DataPoint>
.map(group -> Arrays.stream(group)
.map(array -> IntStream.range(0, array.length - 1)
.mapToObj(i -> new SimpleEntry<>(resultComponents.get(i), getScalarFor(array[i], resultComponents.get(i))))
.mapToObj(i -> new SimpleEntry<>(resultComponents.get(i), getScalarFor(resultComponents.get(i), array[i])))
.collect(toDataPoint((Lineage) array[array.length - 1], getMetadata())))
.collect(toList()))
.map(out -> (T) out);
Expand Down Expand Up @@ -673,7 +673,7 @@ private static <TT, A, T> MapGroupsFunction<Row, Row, Serializable[][]> groupMap

Map<DataStructureComponent<Identifier, ?, ?>, ScalarValue<?, ?, ?, ?>> keyMap = new HashMap<>();
for (int i = 0; i < keyRow.size(); i++)
keyMap.put(sortedKeys[i], getScalarFor(keyRow.get(i), sortedKeys[i]));
keyMap.put(sortedKeys[i], getScalarFor(sortedKeys[i], keyRow.get(i)));

// Each group is mapped to an array of rows where each row is an array of values
Serializable[][] array = ((Collection<?>) finisher.apply(before, keyMap)).stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public class SparkUtils
return DataStructureComponentImpl.of(field.name(), role, domain);
}

public static ScalarValue<?, ?, ?, ?> getScalarFor(Object serialized, DataStructureComponent<?, ?, ?> component)
public static ScalarValue<?, ?, ?, ?> getScalarFor(DataStructureComponent<?, ?, ?> component, Object serialized)
{
SerFunction<Object, ScalarValue<?, ?, ?, ?>> builder = null;
ValueDomainSubset<?, ?> domain;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public Encoder<A> bufferEncoder()
@Override
public A reduce(A acc, Serializable value)
{
coll.accumulator().accept(acc, SparkUtils.getScalarFor(value, oldComp));
coll.accumulator().accept(acc, SparkUtils.getScalarFor(oldComp, value));
return acc;
}

Expand Down

0 comments on commit f8cab1a

Please sign in to comment.