Skip to content

Commit

Permalink
[apache#737] flink-rss-support first commit.
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 committed May 19, 2024
1 parent 93f4347 commit cc2d8c7
Show file tree
Hide file tree
Showing 32 changed files with 4,111 additions and 0 deletions.
68 changes: 68 additions & 0 deletions client-flink/common/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.uniffle</groupId>
<artifactId>uniffle-parent</artifactId>
<version>0.10.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>rss-client-flink-common</artifactId>
<packaging>jar</packaging>
<name>Apache Uniffle Client (Flink Common)</name>

<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>rss-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>rss-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>shuffle-storage</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.26.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<exclusions>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.flink.buffer;

import org.apache.flink.runtime.io.network.buffer.Buffer;

import static org.apache.flink.util.Preconditions.checkNotNull;

/** Buffer and the corresponding subpartition index. */
public class RssBufferWithSubpartition {

private final Buffer buffer;

private final int subpartitionIndex;

public RssBufferWithSubpartition(Buffer buffer, int subpartitionIndex) {
this.buffer = checkNotNull(buffer);
this.subpartitionIndex = subpartitionIndex;
}

public Buffer getBuffer() {
return buffer;
}

public int getSubpartitionIndex() {
return subpartitionIndex;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.flink.buffer;

import java.io.IOException;
import java.nio.ByteBuffer;
import javax.annotation.Nullable;

import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.buffer.Buffer;

/**
* Data of different channels can be appended to a {@link RssDataBuffer} and after the {@link
* RssDataBuffer} is full or finished, the appended data can be copied from it in channel index
* order.
*
* <p>The lifecycle of a {@link RssDataBuffer} can be: new, write, [read, reset, write], finish,
* read, release. There can be multiple [read, reset, write] operations before finish.
*
* <p>我们在完成这个类的时候,参考DataBuffer,但是因为我们要支持多个版本,没办法
*/
public interface RssDataBuffer {

/**
* Appends data of the specified channel to this {@link RssDataBuffer} and returns true if this
* {@link RssDataBuffer} is full.
*/
boolean append(ByteBuffer source, int targetSubpartition, Buffer.DataType dataType)
throws IOException;

/**
* Copies data in this {@link RssDataBuffer} to the target {@link MemorySegment} in channel index
* order and returns {@link RssBufferWithSubpartition} which contains the copied data and the
* corresponding channel index.
*/
RssBufferWithSubpartition getNextBuffer(@Nullable MemorySegment transitBuffer);

/** Returns the total number of records written to this {@link RssDataBuffer}. */
long numTotalRecords();

/** Returns the total number of bytes written to this {@link RssDataBuffer}. */
long numTotalBytes();

/** Returns true if not all data appended to this {@link RssDataBuffer} is consumed. */
boolean hasRemaining();

/** Finishes this {@link RssDataBuffer} which means no record can be appended any more. */
void finish();

/** Whether this {@link RssDataBuffer} is finished or not. */
boolean isFinished();

/** Releases this {@link RssDataBuffer} which releases all resources. */
void release();

/** Whether this {@link RssDataBuffer} is released or not. */
boolean isReleased();
}
Loading

0 comments on commit cc2d8c7

Please sign in to comment.