Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ endif
# ========== Docker Install/Uninstall Targets ==========

# Valid service targets for docker install/uninstall
VALID_SERVICE_TARGETS := datamate backend frontend runtime mineru "deer-flow" milvus "label-studio"
VALID_SERVICE_TARGETS := datamate backend frontend runtime mineru "deer-flow" milvus "label-studio" "data-juicer" dj

# Generic docker service install target
.PHONY: %-docker-install
Expand All @@ -263,6 +263,8 @@ VALID_SERVICE_TARGETS := datamate backend frontend runtime mineru "deer-flow" mi
REGISTRY=$(REGISTRY) docker compose -f deployment/docker/deer-flow/docker-compose.yml up -d; \
elif [ "$*" = "milvus" ]; then \
docker compose -f deployment/docker/milvus/docker-compose.yml up -d; \
elif [ "$*" = "data-juicer" ] || [ "$*" = "dj" ]; then \
REGISTRY=$(REGISTRY) && docker compose -f deployment/docker/datamate/docker-compose.yml up -d datamate-data-juicer; \
else \
$(call docker-compose-service,$*,up -d,deployment/docker/datamate); \
fi
Expand Down Expand Up @@ -300,14 +302,16 @@ VALID_SERVICE_TARGETS := datamate backend frontend runtime mineru "deer-flow" mi
else \
docker compose -f deployment/docker/milvus/docker-compose.yml down; \
fi; \
elif [ "$*" = "data-juicer" ] || [ "$*" = "dj" ]; then \
$(call docker-compose-service,datamate-data-juicer,down,deployment/docker/datamate); \
else \
$(call docker-compose-service,$*,down,deployment/docker/datamate); \
fi

# ========== Kubernetes Install/Uninstall Targets ==========

# Valid k8s targets
VALID_K8S_TARGETS := mineru datamate deer-flow milvus label-studio
VALID_K8S_TARGETS := mineru datamate deer-flow milvus label-studio data-juicer dj

# Generic k8s install target
.PHONY: %-k8s-install
Expand All @@ -334,6 +338,8 @@ VALID_K8S_TARGETS := mineru datamate deer-flow milvus label-studio
helm upgrade milvus deployment/helm/milvus -n $(NAMESPACE) --install; \
elif [ "$*" = "label-studio" ]; then \
helm upgrade label-studio deployment/helm/label-studio -n $(NAMESPACE) --install; \
elif [ "$*" = "data-juicer" ] || [ "$*" = "dj" ]; then \
kubectl apply -f deployment/kubernetes/data-juicer/deploy.yaml -n $(NAMESPACE); \
fi

# Generic k8s uninstall target
Expand All @@ -357,6 +363,8 @@ VALID_K8S_TARGETS := mineru datamate deer-flow milvus label-studio
helm uninstall milvus -n $(NAMESPACE) --ignore-not-found; \
elif [ "$*" = "label-studio" ]; then \
helm uninstall label-studio -n $(NAMESPACE) --ignore-not-found; \
elif [ "$*" = "data-juicer" ] || [ "$*" = "dj" ]; then \
kubectl delete -f deployment/kubernetes/data-juicer/deploy.yaml -n $(NAMESPACE); \
fi

# ========== Upgrade Targets ==========
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ public CleaningTaskDto createTask(CreateCleaningTaskRequest request) {
cleanTaskValidator.checkNameDuplication(request.getName());
cleanTaskValidator.checkInputAndOutput(request.getInstance());

ExecutorType executorType = cleanTaskValidator.checkAndGetExecutorType(request.getInstance());

CreateDatasetRequest createDatasetRequest = new CreateDatasetRequest();
createDatasetRequest.setName(request.getDestDatasetName());
createDatasetRequest.setDatasetType(DatasetType.valueOf(request.getDestDatasetType()));
Expand All @@ -131,7 +133,7 @@ public CleaningTaskDto createTask(CreateCleaningTaskRequest request) {

operatorInstanceRepo.insertInstance(taskId, request.getInstance());

prepareTask(task, request.getInstance());
prepareTask(task, request.getInstance(), executorType);
scanDataset(taskId, request.getSrcDatasetId());
taskScheduler.executeTask(taskId);
return task;
Expand Down Expand Up @@ -209,20 +211,20 @@ public void executeTask(String taskId) {
taskScheduler.executeTask(taskId);
}

private void prepareTask(CleaningTaskDto task, List<OperatorInstanceDto> instances) {
private void prepareTask(CleaningTaskDto task, List<OperatorInstanceDto> instances, ExecutorType executorType) {
List<OperatorDto> allOperators = operatorRepo.findAllOperators();
Map<String, OperatorDto> defaultSettings = allOperators.stream()
Map<String, OperatorDto> operatorDtoMap = allOperators.stream()
.collect(Collectors.toMap(OperatorDto::getId, Function.identity()));

TaskProcess process = new TaskProcess();
process.setInstanceId(task.getId());
process.setDatasetId(task.getDestDatasetId());
process.setExecutorType(executorType.getValue());
process.setDatasetPath(FLOW_PATH + "/" + task.getId() + "/dataset.jsonl");
process.setExportPath(DATASET_PATH + "/" + task.getDestDatasetId());
process.setExecutorType(ExecutorType.DATAMATE.getValue());
process.setProcess(instances.stream()
.map(instance -> {
OperatorDto operatorDto = defaultSettings.get(instance.getId());
OperatorDto operatorDto = operatorDtoMap.get(instance.getId());
Map<String, Object> stringObjectMap = getDefaultValue(operatorDto);
stringObjectMap.putAll(instance.getOverrides());
Map<String, Object> runtime = getRuntime(operatorDto);
Expand All @@ -240,7 +242,7 @@ private void prepareTask(CleaningTaskDto task, List<OperatorInstanceDto> instanc
options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK);
Yaml yaml = new Yaml(options);

File file = new File(FLOW_PATH + "/" + process.getInstanceId() + "/process.yaml");
File file = new File(FLOW_PATH + "/" + task.getId() + "/process.yaml");
file.getParentFile().mkdirs();

try (FileWriter writer = new FileWriter(file)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public List<CleaningTemplateDto> getTemplates(String keywords) {
@Transactional
public CleaningTemplateDto createTemplate(CreateCleaningTemplateRequest request) {
cleanTaskValidator.checkInputAndOutput(request.getInstance());
cleanTaskValidator.checkAndGetExecutorType(request.getInstance());
CleaningTemplateDto template = new CleaningTemplateDto();
String templateId = UUID.randomUUID().toString();
template.setId(templateId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ public enum CleanErrorCode implements ErrorCode {
*/
DUPLICATE_TASK_NAME("clean.0001", "清洗任务名称重复"),

IN_AND_OUT_NOT_MATCH("clean.0002", "算子输入输出不匹配");
OPERATOR_LIST_EMPTY("clean.0002", "任务列表为空"),

IN_AND_OUT_NOT_MATCH("clean.0003", "算子输入输出不匹配"),

EXECUTOR_NOT_MATCH("clean.0004", "算子执行器不匹配");

private final String code;
private final String message;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package com.datamate.cleaning.infrastructure.validator;

import com.datamate.cleaning.common.enums.ExecutorType;
import com.datamate.cleaning.common.exception.CleanErrorCode;
import com.datamate.cleaning.domain.repository.CleaningTaskRepository;
import com.datamate.cleaning.interfaces.dto.OperatorInstanceDto;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.SystemErrorCode;
import com.datamate.common.setting.application.SysParamApplicationService;
import com.datamate.operator.domain.contants.OperatorConstant;
import lombok.RequiredArgsConstructor;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

Expand All @@ -19,6 +23,8 @@
public class CleanTaskValidator {
private final CleaningTaskRepository cleaningTaskRepo;

private final SysParamApplicationService sysParamApplicationService;

private final Pattern UUID_PATTERN = Pattern.compile(
"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$"
);
Expand Down Expand Up @@ -51,4 +57,28 @@ public void checkTaskId(String id) {
throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER);
}
}

public ExecutorType checkAndGetExecutorType(List<OperatorInstanceDto> operators) {
if (operators == null || operators.isEmpty()) {
throw BusinessException.of(CleanErrorCode.OPERATOR_LIST_EMPTY);
}
for (int i = 1; i < operators.size(); i++) {
OperatorInstanceDto front = operators.get(i - 1);
OperatorInstanceDto back = operators.get(i);
boolean frontHas = CollectionUtils.isNotEmpty(front.getCategories())
&& front.getCategories().contains(OperatorConstant.CATEGORY_DATA_JUICER_ID);
boolean backHas = CollectionUtils.isNotEmpty(back.getCategories())
&& back.getCategories().contains(OperatorConstant.CATEGORY_DATA_JUICER_ID);
if (frontHas == backHas) {
continue;
}
throw BusinessException.of(CleanErrorCode.EXECUTOR_NOT_MATCH,
String.format(Locale.ROOT, "ops(name: [%s, %s]) executor does not match",
front.getName(), back.getName()));
}
if (operators.getFirst().getCategories().contains(OperatorConstant.CATEGORY_DATA_JUICER_ID)) {
return ExecutorType.fromValue(sysParamApplicationService.getParamByKey("DATA_JUICER_EXECUTOR"));
}
return ExecutorType.DATAMATE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class OperatorInstanceDto {

private String outputs;

private List<Integer> categories;
private List<String> categories;

private Map<String, Object> overrides = new HashMap<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class OperatorConstant {

public static String CATEGORY_PREDEFINED_ID = "96a3b07a-3439-4557-a835-525faad60ca3";

public static String CATEGORY_DATA_JUICER_ID = "79b385b4-fde8-4617-bcba-02a176938996";

public static Map<String, String> CATEGORY_MAP = new HashMap<>();

static {
Expand Down
15 changes: 15 additions & 0 deletions deployment/docker/datamate/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,21 @@ services:
- "6379:6379"
networks: [ datamate ]

datamate-data-juicer:
container_name: datamate-data-juicer
image: datajuicer/data-juicer:v1.4.4
restart: on-failure
command:
- uvicorn
- service:app
- --host
- "0.0.0.0"
volumes:
- dataset_volume:/dataset
- flow_volume:/flow
networks: [ datamate ]
profiles: [ data-juicer ]

volumes:
dataset_volume:
name: datamate-dataset-volume
Expand Down
12 changes: 0 additions & 12 deletions deployment/helm/datamate/charts/ray-cluster/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,3 @@ Name of image
{{- $name }}:{{ $tag }}
{{- end }}
{{- end }}

{{/*
Name of sidecar image
*/}}
{{- define "ray-cluster-sidecar.image" -}}
{{- $name := default (printf "%s:%s" .Values.image.repository .Values.image.tag) .Values.head.sidecarContainers.image }}
{{- if .Values.global.image.repository }}
{{- .Values.global.image.repository | trimSuffix "/" }}/{{ $name }}
{{- else }}
{{- $name }}
{{- end }}
{{- end }}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ spec:
securityContext:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- $defult := printf "%s:%s" .Values.image.repository .Values.image.tag }}
{{- $defult := include "ray-cluster.image" . -}}
{{- with .Values.head.sidecarContainers }}
{{- range $index, $container := . }}
{{- $image := default $defult $container.image -}}
Expand Down Expand Up @@ -313,10 +313,14 @@ spec:
- name: ray-worker
{{- if $values.image }}
image: {{ $values.image.repository }}:{{ $values.image.tag }}
{{- if $values.image.pullPolicy }}
imagePullPolicy: {{ $values.image.pullPolicy }}
{{- else }}
image: {{ $.Values.image.repository }}:{{ $.Values.image.tag }}
imagePullPolicy: {{ $.Values.image.pullPolicy }}
imagePullPolicy: {{ default $.Values.image.pullPolicy $.Values.global.image.pullPolicy }}
{{- end }}
{{- else }}
image: {{ include "ray-cluster.image" $ }}
imagePullPolicy: {{ default $.Values.image.pullPolicy $.Values.global.image.pullPolicy }}
{{- end }}
{{- with $values.command }}
command:
Expand Down
6 changes: 5 additions & 1 deletion deployment/helm/datamate/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ ray-cluster:
subPath: site-packages
sidecarContainers:
- name: runtime
image: datamate-runtime
imagePullPolicy: IfNotPresent
args: *runtimeArgs
env: *runtimeEnv
Expand Down Expand Up @@ -338,6 +337,9 @@ ray-cluster:
- *flowVolume
- *logVolume
- *operatorVolume
- name: ascend
hostPath:
path: /usr/local/Ascend
volumeMounts:
- mountPath: /tmp/ray
name: log-volume
Expand All @@ -352,3 +354,5 @@ ray-cluster:
- mountPath: /usr/local/lib/ops/site-packages
name: operator-volume
subPath: site-packages
- mountPath: /usr/local/Ascend
name: ascend
74 changes: 74 additions & 0 deletions deployment/kubernetes/data-juicer/deploy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: datamate-data-juicer
labels:
app: datamate
tier: data-juicer
spec:
replicas: 1
selector:
matchLabels:
app: datamate
tier: data-juicer
template:
metadata:
labels:
app: datamate
tier: data-juicer
spec:
containers:
- name: data-juicer
image: datajuicer/data-juicer:v1.4.4
imagePullPolicy: IfNotPresent
command:
- uvicorn
args:
- service:app
- --host
- "0.0.0.0"
ports:
- containerPort: 8000
resources:
limits:
cpu: 8
memory: 32Gi
requests:
cpu: 100m
memory: 100Mi
volumeMounts:
- name: dataset-volume
mountPath: /dataset
- name: log-volume
mountPath: /var/log/datamate/data-juicer
subPath: data-juicer
- name: flow-volume
mountPath: /flow
volumes:
- name: dataset-volume
persistentVolumeClaim:
claimName: datamate-dataset-pvc
- name: flow-volume
persistentVolumeClaim:
claimName: datamate-flow-pvc
- name: log-volume
persistentVolumeClaim:
claimName: datamate-log-pvc

---
apiVersion: v1
kind: Service
metadata:
name: datamate-data-juicer
labels:
app: datamate
tier: data-juicer
spec:
type: ClusterIP
ports:
- port: 8000
targetPort: 8000
protocol: TCP
selector:
app: datamate
tier: data-juicer
1 change: 1 addition & 0 deletions frontend/src/pages/DataCleansing/Create/CreateTask.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export default function CleansingTaskCreate() {
...item.defaultParams,
...item.overrides,
},
categories: item.categories,
inputs: item.inputs,
outputs: item.outputs,
})),
Expand Down
1 change: 1 addition & 0 deletions frontend/src/pages/DataCleansing/Create/CreateTemplate.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export default function CleansingTemplateCreate() {
...item.defaultParams,
...item.overrides,
},
categories: item.categories,
inputs: item.inputs,
outputs: item.outputs,
})),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ const OperatorLibrary: React.FC<OperatorLibraryProps> = ({
<div className="pb-4 border-b border-gray-200">
<span className="flex items-center font-semibold text-base">
<Layers className="w-4 h-4 mr-2" />
算子库({filteredOperators.length})
算子库
</span>
</div>
<div className="flex flex-col h-full pt-4 pr-4 overflow-hidden">
Expand Down
Loading
Loading