Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions auron-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
Expand Down
11 changes: 8 additions & 3 deletions auron-core/src/main/java/org/apache/auron/jni/JniBridge.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.lang.management.BufferPoolMXBean;
import java.lang.management.ManagementFactory;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -71,12 +72,16 @@ public static void putResource(String key, Object value) {
}

public static FSDataInputWrapper openFileAsDataInputWrapper(FileSystem fs, String path) throws Exception {
// the path is a URI string, so we need to convert it to a URI object
return FSDataInputWrapper.wrap(fs.open(new Path(new URI(path))));
return FSDataInputWrapper.wrap(fs.open(toInputPath(path)));
}
Comment on lines 72 to 74

public static FSDataOutputWrapper createFileAsDataOutputWrapper(FileSystem fs, String path) throws Exception {
return FSDataOutputWrapper.wrap(fs.create(new Path(new URI(path))));
return FSDataOutputWrapper.wrap(fs.create(new Path(path)));
}

private static Path toInputPath(String path) throws URISyntaxException {
String safePath = path.indexOf('#') >= 0 ? path.replace("#", "%23") : path;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a little brittle; why do we need this? can we do something like so?

 public static FSDataInputWrapper openFileAsDataInputWrapper(FileSystem fs, String path) throws Exception {
     return FSDataInputWrapper.wrap(fs.open(new Path(path)));
 }

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I went with this approach.

return new Path(new URI(safePath));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two issues here:

  1. Inconsistent fix: createFileAsDataOutputWrapper was changed to new Path(path) (the correct simple fix), but openFileAsDataInputWrapper still goes through new URI(...) after escaping #. If new Path(path) is correct for writes, the same change should work for reads — the PR description itself says the fix is "change from new Path(new URI(path)) to new Path(path)". Why does the read path need a different approach?

  2. Double-encoding bug: path.replace("#", "%23") will corrupt a path that already contains a literal %23 (i.e. a percent-encoded #) by turning it into %2523. If the simpler new Path(path) works for writes, applying it uniformly to reads as well would fix both issues at once.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I moved the normalization earlier and removed the read-side workaround.

}

public static long getDirectMemoryUsed() {
Expand Down
106 changes: 106 additions & 0 deletions auron-core/src/test/java/org/apache/auron/jni/JniBridgeTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.auron.jni;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.junit.jupiter.api.Test;

public class JniBridgeTest {

@Test
public void testFileWrappersPreserveLiteralHashInHdfsPath() throws Exception {
String path = "hdfs://mycluster/tmp/channel=wx_repro#mini/spark-submit-123/part-00000.json";
CapturingFileSystem cfs = new CapturingFileSystem();

JniBridge.openFileAsDataInputWrapper(cfs, path).close();
JniBridge.createFileAsDataOutputWrapper(cfs, path).close();

assertPathPreservesHash(cfs.openedPath);
assertPathPreservesHash(cfs.createdPath);
}

@Test
public void testFileWrappersHandleReadUriAndWriteRawPercentPaths() throws Exception {
String readPath = "file:/tmp/t1/part=test%2520test/part-00000.parquet";
String writePath = "file:/tmp/t1/part=test%20test/part-00001.parquet";
CapturingFileSystem cfs = new CapturingFileSystem();

JniBridge.openFileAsDataInputWrapper(cfs, readPath).close();
JniBridge.createFileAsDataOutputWrapper(cfs, writePath).close();

assertEquals(
"/tmp/t1/part=test%20test/part-00000.parquet",
cfs.openedPath.toUri().getPath());
assertEquals(
"/tmp/t1/part=test%20test/part-00001.parquet",
cfs.createdPath.toUri().getPath());
}

private static void assertPathPreservesHash(Path path) {
assertEquals(
"/tmp/channel=wx_repro#mini/spark-submit-123/part-00000.json",
path.toUri().getPath());
assertNull(path.toUri().getFragment());
}

private static class CapturingFileSystem extends RawLocalFileSystem {
private final Statistics statistics = new Statistics("hdfs");
private Path openedPath;
private Path createdPath;

@Override
public FSDataInputStream open(Path path) throws IOException {
openedPath = path;
return new FSDataInputStream(new EmptyFSInputStream());
}

@Override
public FSDataOutputStream create(Path path) throws IOException {
createdPath = path;
return new FSDataOutputStream(new ByteArrayOutputStream(), statistics);
}
}

private static class EmptyFSInputStream extends FSInputStream {
@Override
public void seek(long pos) {}

@Override
public long getPos() {
return 0;
}

@Override
public boolean seekToNewSource(long targetPos) {
return false;
}

@Override
public int read() {
return -1;
}
}
}
Loading