Skip to content

Commit 20ca8d1

Browse files
committed
integration with pipe
1 parent 944a342 commit 20ca8d1

File tree

65 files changed

+704
-143
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+704
-143
lines changed

iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,9 @@ private static void processSetParams() {
402402
// ImportTsFileRemotely
403403
ImportTsFileRemotely.setHost(host);
404404
ImportTsFileRemotely.setPort(port);
405+
// TODO: Figure out how to fetch userId here
405406
ImportTsFileRemotely.setUsername(username);
407+
ImportTsFileRemotely.setCliHostname(host);
406408
ImportTsFileRemotely.setPassword(password);
407409
ImportTsFileRemotely.setValidateTsFile(verify);
408410

iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
6868
private static String host;
6969
private static String port;
7070

71+
private static String userId = "-1";
7172
private static String username = SessionConfig.DEFAULT_USER;
73+
private static String cliHostname = "";
7274
private static String password = SessionConfig.DEFAULT_PASSWORD;
7375
private static boolean validateTsFile;
7476

@@ -184,7 +186,9 @@ private Map<String, String> constructParamsMap() {
184186
PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
185187
Boolean.toString(true));
186188
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, LOAD_STRATEGY);
189+
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USER_ID, userId);
187190
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
191+
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLI_HOSTNAME, cliHostname);
188192
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);
189193
params.put(
190194
PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
@@ -339,10 +343,18 @@ public static void setPort(final String port) {
339343
ImportTsFileRemotely.port = port;
340344
}
341345

346+
public static void setUserId(String userId) {
347+
ImportTsFileRemotely.userId = userId;
348+
}
349+
342350
public static void setUsername(final String username) {
343351
ImportTsFileRemotely.username = username;
344352
}
345353

354+
public static void setCliHostname(String cliHostname) {
355+
ImportTsFileRemotely.cliHostname = cliHostname;
356+
}
357+
346358
public static void setPassword(final String password) {
347359
ImportTsFileRemotely.password = password;
348360
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public PipeConfigRegionSnapshotEvent() {
108108

109109
public PipeConfigRegionSnapshotEvent(
110110
final String snapshotPath, final String templateFilePath, final CNSnapshotFileType type) {
111-
this(snapshotPath, templateFilePath, type, null, 0, null, null, null, null, true);
111+
this(snapshotPath, templateFilePath, type, null, 0, null, null, null, null, null, null, true);
112112
}
113113

114114
public PipeConfigRegionSnapshotEvent(
@@ -120,15 +120,19 @@ public PipeConfigRegionSnapshotEvent(
120120
final PipeTaskMeta pipeTaskMeta,
121121
final TreePattern treePattern,
122122
final TablePattern tablePattern,
123+
final String userId,
123124
final String userName,
125+
final String cliHostname,
124126
final boolean skipIfNoPrivileges) {
125127
super(
126128
pipeName,
127129
creationTime,
128130
pipeTaskMeta,
129131
treePattern,
130132
tablePattern,
133+
userId,
131134
userName,
135+
cliHostname,
132136
skipIfNoPrivileges,
133137
PipeConfigNodeResourceManager.snapshot());
134138
this.snapshotPath = snapshotPath;
@@ -191,7 +195,9 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
191195
final PipeTaskMeta pipeTaskMeta,
192196
final TreePattern treePattern,
193197
final TablePattern tablePattern,
198+
final String userId,
194199
final String userName,
200+
final String cliHostname,
195201
final boolean skipIfNoPrivileges,
196202
final long startTime,
197203
final long endTime) {
@@ -204,7 +210,9 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
204210
pipeTaskMeta,
205211
treePattern,
206212
tablePattern,
213+
userId,
207214
userName,
215+
cliHostname,
208216
skipIfNoPrivileges);
209217
}
210218

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public PipeConfigRegionWritePlanEvent() {
4242

4343
public PipeConfigRegionWritePlanEvent(
4444
final ConfigPhysicalPlan configPhysicalPlan, final boolean isGeneratedByPipe) {
45-
this(configPhysicalPlan, null, 0, null, null, null, null, true, isGeneratedByPipe);
45+
this(configPhysicalPlan, null, 0, null, null, null, null, null, null, true, isGeneratedByPipe);
4646
}
4747

4848
public PipeConfigRegionWritePlanEvent(
@@ -52,7 +52,9 @@ public PipeConfigRegionWritePlanEvent(
5252
final PipeTaskMeta pipeTaskMeta,
5353
final TreePattern treePattern,
5454
final TablePattern tablePattern,
55+
final String userId,
5556
final String userName,
57+
final String cliHostname,
5658
final boolean skipIfNoPrivileges,
5759
final boolean isGeneratedByPipe) {
5860
super(
@@ -61,7 +63,9 @@ public PipeConfigRegionWritePlanEvent(
6163
pipeTaskMeta,
6264
treePattern,
6365
tablePattern,
66+
userId,
6467
userName,
68+
cliHostname,
6569
skipIfNoPrivileges,
6670
isGeneratedByPipe);
6771
this.configPhysicalPlan = configPhysicalPlan;
@@ -78,7 +82,9 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
7882
final PipeTaskMeta pipeTaskMeta,
7983
final TreePattern treePattern,
8084
final TablePattern tablePattern,
85+
final String userId,
8186
final String userName,
87+
final String cliHostname,
8288
final boolean skipIfNoPrivileges,
8389
final long startTime,
8490
final long endTime) {
@@ -89,7 +95,9 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
8995
pipeTaskMeta,
9096
treePattern,
9197
tablePattern,
98+
userId,
9299
userName,
100+
cliHostname,
93101
skipIfNoPrivileges,
94102
false);
95103
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/client/IoTDBConfigNodeSyncClientManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.confignode.manager.pipe.sink.client;
2121

2222
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
23+
import org.apache.iotdb.commons.audit.UserEntity;
2324
import org.apache.iotdb.commons.conf.CommonDescriptor;
2425
import org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClientManager;
2526
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferHandshakeV2Req;
@@ -41,7 +42,7 @@ public IoTDBConfigNodeSyncClientManager(
4142
/* The following parameters are used locally. */
4243
String loadBalanceStrategy,
4344
/* The following parameters are used to handshake with the receiver. */
44-
String username,
45+
UserEntity userEntity,
4546
String password,
4647
boolean shouldReceiverConvertOnTypeMismatch,
4748
String loadTsFileStrategy,
@@ -54,7 +55,7 @@ public IoTDBConfigNodeSyncClientManager(
5455
trustStorePwd,
5556
false,
5657
loadBalanceStrategy,
57-
username,
58+
userEntity,
5859
password,
5960
shouldReceiverConvertOnTypeMismatch,
6061
loadTsFileStrategy,

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,9 @@ protected byte[] generateHandShakeV2Payload() throws IOException {
7676
Boolean.toString(shouldReceiverConvertOnTypeMismatch));
7777
params.put(
7878
PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, loadTsFileStrategy);
79+
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USER_ID, userId);
7980
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
81+
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLI_HOSTNAME, cliHostname);
8082
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);
8183
params.put(
8284
PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
2323
import org.apache.iotdb.common.rpc.thrift.TSStatus;
24+
import org.apache.iotdb.commons.audit.UserEntity;
2425
import org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient;
2526
import org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClientManager;
2627
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFilePieceReq;
@@ -70,7 +71,7 @@ protected IoTDBSyncClientManager constructClient(
7071
final boolean useLeaderCache,
7172
final String loadBalanceStrategy,
7273
/* The following parameters are used to handshake with the receiver. */
73-
final String username,
74+
final UserEntity userEntity,
7475
final String password,
7576
final boolean shouldReceiverConvertOnTypeMismatch,
7677
final String loadTsFileStrategy,
@@ -82,7 +83,7 @@ protected IoTDBSyncClientManager constructClient(
8283
Objects.nonNull(trustStorePath) ? ConfigNodeConfig.addHomeDir(trustStorePath) : null,
8384
trustStorePwd,
8485
loadBalanceStrategy,
85-
username,
86+
userEntity,
8687
password,
8788
shouldReceiverConvertOnTypeMismatch,
8889
loadTsFileStrategy,

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/DNAuditLogger.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,8 @@ public class DNAuditLogger extends AbstractAuditLogger {
101101
new SessionInfo(
102102
0,
103103
new UserEntity(
104-
AuthorityChecker.SUPER_USER_ID,
105-
AuthorityChecker.SUPER_USER,
104+
AuthorityChecker.INTERNAL_AUDIT_USER_ID,
105+
AuthorityChecker.INTERNAL_AUDIT_USER,
106106
IoTDBDescriptor.getInstance().getConfig().getInternalAddress()),
107107
ZoneId.systemDefault());
108108

@@ -239,7 +239,7 @@ public void createViewIfNecessary() {
239239
new InternalClientSession(
240240
String.format(
241241
"%s_%s", DNAuditLogger.class.getSimpleName(), SystemConstant.AUDIT_DATABASE));
242-
session.setUsername(AuthorityChecker.SUPER_USER);
242+
session.setUsername(AuthorityChecker.INTERNAL_AUDIT_USER);
243243
session.setZoneId(ZoneId.systemDefault());
244244
session.setClientVersion(IoTDBConstant.ClientVersion.V_1_0);
245245
session.setDatabaseName(SystemConstant.AUDIT_DATABASE);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ public class AuthorityChecker {
7272
public static int SUPER_USER_ID = 0;
7373
public static String SUPER_USER = CommonDescriptor.getInstance().getConfig().getAdminName();
7474

75+
public static int INTERNAL_AUDIT_USER_ID = -1;
76+
public static String INTERNAL_AUDIT_USER = "__auditor";
77+
7578
public static String ANY_SCOPE = "any";
7679

7780
public static final TSStatus SUCCEED = RpcUtils.SUCCESS_STATUS;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.db.pipe.agent.task.connection;
2121

22+
import org.apache.iotdb.commons.audit.UserEntity;
2223
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
2324
import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
2425
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
@@ -177,7 +178,11 @@ private void parseAndCollectEvent(final PipeDeleteDataNodeEvent deleteDataEvent)
177178
.flatMap(
178179
planNode ->
179180
IoTDBSchemaRegionSource.TABLE_PRIVILEGE_PARSE_VISITOR.process(
180-
planNode, deleteDataEvent.getUserName())))
181+
planNode,
182+
new UserEntity(
183+
Long.parseLong(deleteDataEvent.getUserId()),
184+
deleteDataEvent.getUserName(),
185+
deleteDataEvent.getCliHostname()))))
181186
.map(
182187
planNode ->
183188
new PipeDeleteDataNodeEvent(
@@ -187,7 +192,9 @@ private void parseAndCollectEvent(final PipeDeleteDataNodeEvent deleteDataEvent)
187192
deleteDataEvent.getPipeTaskMeta(),
188193
deleteDataEvent.getTreePattern(),
189194
deleteDataEvent.getTablePattern(),
195+
deleteDataEvent.getUserId(),
190196
deleteDataEvent.getUserName(),
197+
deleteDataEvent.getCliHostname(),
191198
deleteDataEvent.isSkipIfNoPrivileges(),
192199
deleteDataEvent.isGeneratedByPipe()))
193200
.ifPresent(

0 commit comments

Comments
 (0)