Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,50 +149,67 @@ protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.Lis
protected Map<?, ?> convertMap(Object sourceValue, Schema sourceSchema, Types.MapType targetType) {
if (sourceValue instanceof GenericData.Array) {
GenericData.Array<?> arrayValue = (GenericData.Array<?>) sourceValue;
// Handle map represented as an array of key-value records
Map<Object, Object> recordMap = new HashMap<>(arrayValue.size());
Schema kvSchema = sourceSchema.getElementType();

if (kvSchema.getType() == Schema.Type.UNION) {
kvSchema = kvSchema.getTypes().stream()
.filter(s -> s.getType() == Schema.Type.RECORD)
.findFirst()
.orElseThrow(() -> new IllegalStateException(
"Map element UNION schema does not contain a RECORD type: " + sourceSchema.getElementType()));
}

Schema kvSchema = resolveUnionElement(sourceSchema.getElementType(), Schema.Type.RECORD,
"Map element UNION schema does not contain a RECORD type");

Schema.Field keyField = kvSchema.getFields().get(0);
Schema.Field valueField = kvSchema.getFields().get(1);
if (keyField == null || valueField == null) {
throw new IllegalStateException("Map entry schema missing key/value fields: " + kvSchema);
}

Schema keySchema = keyField.schema();
Schema valueSchema = valueField.schema();
Type keyType = targetType.keyType();
Type valueType = targetType.valueType();

for (Object element : arrayValue) {
if (element == null) {
continue;
}
GenericRecord record = (GenericRecord) element;
Object key = convert(record.get(keyField.pos()), keyField.schema(), targetType.keyType());
Object value = convert(record.get(valueField.pos()), valueField.schema(), targetType.valueType());
Object key = convert(record.get(keyField.pos()), keySchema, keyType);
Object value = convert(record.get(valueField.pos()), valueSchema, valueType);
recordMap.put(key, value);
}
return recordMap;
} else {
Schema mapSchema = sourceSchema;
if (mapSchema.getType() == Schema.Type.UNION) {
mapSchema = mapSchema.getTypes().stream()
.filter(s -> s.getType() == Schema.Type.MAP)
.findFirst()
.orElseThrow(() -> new IllegalStateException(
"UNION schema does not contain a Map type: " + sourceSchema.getElementType()));
}

Schema mapSchema = resolveUnionElement(sourceSchema, Schema.Type.MAP,
"UNION schema does not contain a MAP type");

Map<?, ?> sourceMap = (Map<?, ?>) sourceValue;
Map<Object, Object> adaptedMap = new HashMap<>(sourceMap.size());

Schema valueSchema = mapSchema.getValueType();
Type keyType = targetType.keyType();
Type valueType = targetType.valueType();

for (Map.Entry<?, ?> entry : sourceMap.entrySet()) {
Object rawKey = entry.getKey();
Object key = convert(rawKey, STRING_SCHEMA_INSTANCE, keyType);
Object value = convert(entry.getValue(), valueSchema, valueType);
adaptedMap.put(key, value);
}
return adaptedMap;
}

private Schema resolveUnionElement(Schema schema, Schema.Type expectedType, String errorMessage) {
Schema resolved = schema;
if (schema.getType() == Schema.Type.UNION) {
resolved = null;
for (Schema unionMember : schema.getTypes()) {
if (unionMember.getType() == expectedType) {
resolved = unionMember;
break;
}
}
// Handle standard Java Map
Map<?, ?> sourceMap = (Map<?, ?>) sourceValue;
Map<Object, Object> adaptedMap = new HashMap<>();
for (Map.Entry<?, ?> entry : sourceMap.entrySet()) {
// Avro map keys are always strings
Object key = convert(entry.getKey(), STRING_SCHEMA_INSTANCE, targetType.keyType());
Object value = convert(entry.getValue(), mapSchema.getValueType(), targetType.valueType());
adaptedMap.put(key, value);
if (resolved == null) {
throw new IllegalStateException(errorMessage + ": " + schema);
}
return adaptedMap;
}
return resolved;
}
}
10 changes: 7 additions & 3 deletions core/src/main/java/kafka/automq/table/binder/RecordBinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public Object get(int pos) {
}

FieldMapping mapping = fieldMappings[pos];
if (mapping == null || !avroRecord.hasField(mapping.avroKey())) {
if (mapping == null) {
return null;
}

Expand Down Expand Up @@ -298,8 +298,12 @@ private long calculateMapFieldCount(Object map, Types.MapType mapType) {

long total = 1;
if (map instanceof Map) {
for (Map.Entry<?, ?> entry : ((Map<?, ?>) map).entrySet()) {
total += FieldMetric.count(entry.getKey().toString());
Map<?, ?> typedMap = (Map<?, ?>) map;
if (typedMap.isEmpty()) {
return total;
}
for (Map.Entry<?, ?> entry : typedMap.entrySet()) {
total += calculateFieldCount(entry.getKey(), mapType.keyType());
total += calculateFieldCount(entry.getValue(), mapType.valueType());
}
}
Expand Down
43 changes: 29 additions & 14 deletions core/src/main/java/kafka/automq/table/metric/FieldMetric.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,40 +23,55 @@

import java.nio.ByteBuffer;

public class FieldMetric {
public final class FieldMetric {

private static final int STRING_BASE_COST = 3; // base cost for small strings
private static final int STRING_UNIT_BYTES = 32; // granularity for string scaling
private static final int STRING_UNIT_STEP = 1; // aggressive scaling for long strings

private static final int BINARY_BASE_COST = 4; // small binary payloads slightly heavier than primitives
private static final int BINARY_UNIT_BYTES = 32; // granularity for binary buffers
private static final int BINARY_UNIT_STEP = 1; // scaling factor for binary payloads

private FieldMetric() {
}

public static int count(CharSequence value) {
if (value == null) {
return 0;
}
if (value instanceof Utf8) {
return (((Utf8) value).getByteLength() + 23) / 24;
}
if (value.isEmpty()) {
return 1;
int lengthBytes = value instanceof Utf8
? ((Utf8) value).getByteLength()
: value.length();

if (lengthBytes <= STRING_UNIT_BYTES) {
return STRING_BASE_COST;
}
return (value.length() + 23) / 24;
int segments = (lengthBytes + STRING_UNIT_BYTES - 1) / STRING_UNIT_BYTES;
return STRING_BASE_COST + (segments - 1) * STRING_UNIT_STEP;
}

public static int count(ByteBuffer value) {
if (value == null) {
return 0;
}
int remaining = value.remaining();
if (remaining == 0) {
return 1;
if (remaining <= BINARY_UNIT_BYTES) {
return BINARY_BASE_COST;
}
return (remaining + 31) >> 5;
int segments = (remaining + BINARY_UNIT_BYTES - 1) / BINARY_UNIT_BYTES;
return BINARY_BASE_COST + (segments - 1) * BINARY_UNIT_STEP;
}

public static int count(byte[] value) {
if (value == null) {
return 0;
}
if (value.length == 0) {
return 1;
int length = value.length;
if (length <= BINARY_UNIT_BYTES) {
return BINARY_BASE_COST;
}
return (value.length + 31) >> 5;
int segments = (length + BINARY_UNIT_BYTES - 1) / BINARY_UNIT_BYTES;
return BINARY_BASE_COST + (segments - 1) * BINARY_UNIT_STEP;
}

}
49 changes: 37 additions & 12 deletions core/src/main/java/kafka/automq/table/perf/BenchmarkResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,31 @@

package kafka.automq.table.perf;

import java.util.concurrent.TimeUnit;

public class BenchmarkResult {
private final String formatName;
private final String dataTypeName;
private final long durationMs;
private final long durationNs;
private final long recordsProcessed;
private final long fieldCount;
private final String errorMessage;

private BenchmarkResult(String formatName, String dataTypeName, long durationMs, long recordsProcessed, String errorMessage) {
private BenchmarkResult(String formatName, String dataTypeName, long durationNs, long recordsProcessed, long fieldCount,
String errorMessage) {
this.formatName = formatName;
this.dataTypeName = dataTypeName;
this.durationMs = durationMs;
this.durationNs = durationNs;
this.recordsProcessed = recordsProcessed;
this.fieldCount = fieldCount;
this.errorMessage = errorMessage;
}

public static BenchmarkResult success(String formatName, String dataTypeName, long durationMs, long recordsProcessed) {
return new BenchmarkResult(formatName, dataTypeName, durationMs, recordsProcessed, null);
public static BenchmarkResult success(String formatName, String dataTypeName, long durationNs,
long recordsProcessed, long fieldCount) {
return new BenchmarkResult(formatName, dataTypeName, durationNs, recordsProcessed, fieldCount, null);
}

public static BenchmarkResult failure(String formatName, String dataTypeName, String errorMessage) {
return new BenchmarkResult(formatName, dataTypeName, 0, 0, errorMessage);
return new BenchmarkResult(formatName, dataTypeName, 0, 0, 0, errorMessage);
}

public String getFormatName() {
Expand All @@ -52,14 +54,22 @@ public String getDataTypeName() {
return dataTypeName;
}

public long getDurationNs() {
return durationNs;
}

public long getDurationMs() {
return durationMs;
return durationNs / 1_000_000L;
}

public long getRecordsProcessed() {
return recordsProcessed;
}

public long getFieldCount() {
return fieldCount;
}

public String getErrorMessage() {
return errorMessage;
}
Expand All @@ -69,17 +79,32 @@ public boolean isSuccess() {
}

public long getThroughput() {
long durationMs = getDurationMs();
if (durationMs == 0) {
return 0;
}
return TimeUnit.SECONDS.toMillis(recordsProcessed) / durationMs;
return (recordsProcessed * 1000L) / durationMs;
}

public double getNsPerField() {
if (fieldCount == 0) {
return 0.0d;
}
return (double) durationNs / (double) fieldCount;
}

public double getNsPerRecord() {
if (recordsProcessed == 0) {
return 0.0d;
}
return (double) durationNs / (double) recordsProcessed;
}

@Override
public String toString() {
if (isSuccess()) {
return String.format("%s %s: %d ms, %d records, %d records/sec",
formatName, dataTypeName, durationMs, recordsProcessed, getThroughput());
return String.format("%s %s: %d ms, %d records, fieldCount=%d, ns/field=%.2f",
formatName, dataTypeName, getDurationMs(), recordsProcessed, fieldCount, getNsPerField());
} else {
return String.format("%s %s: FAILED - %s", formatName, dataTypeName, errorMessage);
}
Expand Down
Loading
Loading