Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions scripts/tests/calibnet_export_check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ fi

echo "Exporting zstd compressed snapshot in $format format"
$FOREST_CLI_PATH snapshot export --format "$format" > snapshot_export.log 2>&1 &

echo "Testing that export is in progress"
for ((i=1; i<=retries; i++)); do
output=$($FOREST_CLI_PATH snapshot export-status --format json)
Expand Down Expand Up @@ -62,7 +61,33 @@ echo "Exporting zstd compressed snapshot at genesis"
$FOREST_CLI_PATH snapshot export --tipset 0 --format "$format"

echo "Exporting zstd compressed snapshot in $format format"
$FOREST_CLI_PATH snapshot export --format "$format"
$FOREST_CLI_PATH snapshot export --format "$format" &
EXPORT_CMD_PID=$!
sleep 5
# another export job should be disallowed
export_error=$($FOREST_CLI_PATH snapshot export 2>&1 || true)
if echo "$export_error" | grep -q "active chain export job has started"; then
echo "verified another export job is disallowed"
else
echo "another export job should be disallowed"
echo "output was: $export_error"
exit 1
fi
# another export-diff job should be disallowed
export_diff_error=$($FOREST_CLI_PATH snapshot export-diff --from 11000 --to 10100 -d 900 2>&1 || true)
if echo "$export_diff_error" | grep -q "active chain export job has started"; then
echo "verified another export-diff job is disallowed"
else
echo "another export-diff job should be disallowed"
echo "output was: $export_diff_error"
exit 1
fi
# Killing the CLI should not cancel the export
echo "killing cli command"
kill -KILL $EXPORT_CMD_PID
# Wait on the same export job
echo "waiting on export-status"
$FOREST_CLI_PATH snapshot export-status --wait

$FOREST_CLI_PATH shutdown --force

Expand Down
63 changes: 62 additions & 1 deletion scripts/tests/calibnet_export_diff_check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,73 @@ source "$(dirname "$0")/harness.sh"

forest_init "$@"

retries=30
sleep_interval=0.5

db_path=$($FOREST_TOOL_PATH db stats --chain calibnet | grep "Database path:" | cut -d':' -f2- | xargs)
snapshot=$(find "$db_path/car_db"/*.car.zst | tail -n 1)
snapshot_epoch=$(forest_query_epoch "$snapshot")

echo "Exporting diff snapshot @ $snapshot_epoch with forest-cli snapshot export-diff"
$FOREST_CLI_PATH snapshot export-diff --from "$snapshot_epoch" --to "$((snapshot_epoch - 900))" -d 900 -o diff1
$FOREST_CLI_PATH snapshot export-diff --from "$snapshot_epoch" --to "$((snapshot_epoch - 900))" -d 900 -o diff1 &

echo "Testing that export is in progress"
for ((i=1; i<=retries; i++)); do
output=$($FOREST_CLI_PATH snapshot export-status --format json)
is_exporting=$(echo "$output" | jq -r '.exporting')
if [ "$is_exporting" == "true" ]; then
break
fi
if [ $i -eq $retries ]; then
echo "export should be in progress"
exit 1
fi
sleep $sleep_interval
done

$FOREST_CLI_PATH snapshot export-cancel

echo "Testing that export has been cancelled"
for ((i=1; i<=retries; i++)); do
output=$($FOREST_CLI_PATH snapshot export-status --format json)
is_exporting=$(echo "$output" | jq -r '.exporting')
is_cancelled=$(echo "$output" | jq -r '.cancelled')
if [ "$is_exporting" == "false" ] && [ "$is_cancelled" == "true" ]; then
break
fi
if [ $i -eq $retries ]; then
echo "export should be cancelled"
exit 1
fi
sleep $sleep_interval
done

echo "Exporting diff snapshot @ $snapshot_epoch with forest-cli snapshot export-diff"
$FOREST_CLI_PATH snapshot export-diff --from "$snapshot_epoch" --to "$((snapshot_epoch - 900))" -d 900 -o diff1 &
EXPORT_CMD_PID=$!
sleep 5
# another export job should be disallowed
export_error=$($FOREST_CLI_PATH snapshot export 2>&1 || true)
if echo "$export_error" | grep -q "active chain export job has started"; then
echo "verified another export job is disallowed"
else
echo "another export job should be disallowed"
echo "output was: $export_error"
exit 1
fi
# another export-diff job should be disallowed
export_diff_error=$($FOREST_CLI_PATH snapshot export-diff --from 11000 --to 10100 -d 900 2>&1 || true)
if echo "$export_diff_error" | grep -q "active chain export job has started"; then
echo "verified another export-diff job is disallowed"
else
echo "another export-diff job should be disallowed"
echo "output was: $export_diff_error"
exit 1
fi
# Killing the CLI should not cancel the export
kill -KILL $EXPORT_CMD_PID
# Wait on the same export job
$FOREST_CLI_PATH snapshot export-status --wait

$FOREST_CLI_PATH shutdown --force

Expand Down
97 changes: 32 additions & 65 deletions src/cli/subcommands/snapshot_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,18 @@
use crate::chain::FilecoinSnapshotVersion;
use crate::chain_sync::chain_muxer::DEFAULT_RECENT_STATE_ROOTS;
use crate::cli_shared::snapshot::{self, TrustedVendor};
use crate::db::car::forest::new_forest_car_temp_path_in;
use crate::db::car::forest::tmp_exporting_forest_car_path;
use crate::networks::calibnet;
use crate::prelude::*;
use crate::rpc::chain::ForestChainExportDiffParams;
use crate::rpc::types::ApiExportResult;
use crate::rpc::{self, chain::ForestChainExportParams, prelude::*};
use crate::shim::policy::policy_constants::CHAIN_FINALITY;
use anyhow::Context as _;
use chrono::DateTime;
use clap::Subcommand;
use indicatif::{ProgressBar, ProgressStyle};
use std::{
path::{Path, PathBuf},
time::Duration,
};
use tokio::io::AsyncWriteExt;
use std::{path::PathBuf, time::Duration};
use tokio_util::sync::CancellationToken;

#[derive(Debug, Clone, clap::ValueEnum)]
pub enum Format {
Expand Down Expand Up @@ -120,7 +117,7 @@ impl SnapshotCommands {
ChainHead::call(&client, ()).await?
};

let output_path = match output_path.is_dir() {
let output_path = std::path::absolute(match output_path.is_dir() {
true => output_path.join(snapshot::filename(
TrustedVendor::Forest,
chain_name,
Expand All @@ -132,16 +129,14 @@ impl SnapshotCommands {
true,
)),
false => output_path.clone(),
};

let output_dir = output_path.parent().context("invalid output path")?;
let temp_path = new_forest_car_temp_path_in(output_dir)?;
})
.context("failed to make output path absolute")?;

let params = ForestChainExportParams {
version: format,
epoch: tipset.epoch(),
recent_roots: depth,
output_path: temp_path.to_path_buf(),
output_path: output_path.clone(),
tipset_keys: tipset.key().clone().into(),
include_receipts: false,
include_events: false,
Expand All @@ -158,7 +153,7 @@ impl SnapshotCommands {
).with_message(format!("Exporting v{} snapshot to {} ...", format as u64, output_path.display()));
pb.enable_steady_tick(std::time::Duration::from_millis(80));
let handle = tokio::spawn({
let path: PathBuf = (&temp_path).into();
let path = tmp_exporting_forest_car_path(&output_path);
let pb = pb.clone();
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
async move {
Expand All @@ -181,21 +176,8 @@ impl SnapshotCommands {
pb.finish();
_ = handle.await;

if !dry_run {
match export_result.clone() {
ApiExportResult::Done(hash_opt) => {
// Move the file first; prevents orphaned checksum on persist error.
temp_path.persist(&output_path)?;
if let Some(hash) = hash_opt {
save_checksum(&output_path, hash).await?;
}
}
ApiExportResult::Cancelled => { /* no file to persist on cancel */ }
}
}

match export_result {
ApiExportResult::Done(_) => {
ApiExportResult::Done => {
Comment thread
coderabbitai[bot] marked this conversation as resolved.
println!("Export completed.");
}
ApiExportResult::Cancelled => {
Expand Down Expand Up @@ -245,13 +227,12 @@ impl SnapshotCommands {
if result.cancelled {
pb.set_message("Export cancelled");
pb.abandon();

return Ok(());
}
let position = (result.progress.clamp(0.0, 1.0) * 10000.0).trunc() as u64;
pb.set_position(position);

if position >= 10000 {
if !result.exporting {
break;
}
tokio::time::sleep(Duration::from_millis(500)).await;
Expand Down Expand Up @@ -303,18 +284,16 @@ impl SnapshotCommands {
let depth = depth.unwrap_or_else(|| from - to);
anyhow::ensure!(depth > 0, "depth must be positive");

let output_path = match output_path.is_dir() {
let output_path = std::path::absolute(match output_path.is_dir() {
true => output_path.join(format!(
"forest_snapshot_diff_{chain_name}_{from}_{to}+{depth}.car.zst"
)),
false => output_path.clone(),
};

let output_dir = output_path.parent().context("invalid output path")?;
let temp_path = new_forest_car_temp_path_in(output_dir)?;
})
.context("failed to make output path absolute")?;

let params = ForestChainExportDiffParams {
output_path: temp_path.to_path_buf(),
output_path: output_path.clone(),
from,
to,
depth,
Expand All @@ -327,55 +306,43 @@ impl SnapshotCommands {
.expect("indicatif template must be valid"),
).with_message(format!("Exporting {} ...", output_path.display()));
pb.enable_steady_tick(std::time::Duration::from_millis(80));
let cancellation_token = CancellationToken::new();
// Make sure token is cancelled on error path
let _cancellation_token_drop_guard = cancellation_token.drop_guard_ref();
let handle = tokio::spawn({
let path: PathBuf = (&temp_path).into();
let cancellation_token = cancellation_token.clone();
let path = tmp_exporting_forest_car_path(&output_path);
let pb = pb.clone();
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
async move {
loop {
while !cancellation_token.is_cancelled() {
interval.tick().await;
if let Ok(meta) = std::fs::metadata(&path) {
pb.set_position(meta.len());
}
}
}
});

// Manually construct RpcRequest because snapshot export could
// take a few hours on mainnet
client
let export_result = client
.call(ForestChainExportDiff::request((params,))?.with_timeout(Duration::MAX))
.await?;

handle.abort();
// cancel before `handle.await` to avoid deadlock
cancellation_token.cancel();
pb.finish();
_ = handle.await;
Comment thread
coderabbitai[bot] marked this conversation as resolved.

temp_path.persist(output_path)?;
println!("Export completed.");
match export_result {
ApiExportResult::Done => {
println!("Export completed.");
}
ApiExportResult::Cancelled => {
println!("Export cancelled.");
}
}
Ok(())
}
}
}
}

/// Prints hex-encoded representation of SHA-256 checksum and saves it to a file
/// with the same name but with a `.sha256sum` extension.
async fn save_checksum(source: &Path, encoded_hash: String) -> anyhow::Result<()> {
let checksum_file_content = format!(
"{encoded_hash} {}\n",
source
.file_name()
.and_then(std::ffi::OsStr::to_str)
.context("Failed to retrieve file name while saving checksum")?
);

let checksum_path = PathBuf::from(source).with_extension("sha256sum");

let mut checksum_file = tokio::fs::File::create(&checksum_path).await?;
checksum_file
.write_all(checksum_file_content.as_bytes())
.await?;
checksum_file.flush().await?;
Ok(())
}
14 changes: 13 additions & 1 deletion src/db/car/forest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use integer_encoding::VarIntReader;
use nunny::Vec as NonEmpty;
use positioned_io::{Cursor, ReadAt, Size as _, SizeCursor};
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::path::Path;
use std::path::{Path, PathBuf};
use std::sync::OnceLock;
use std::task::Poll;
use std::time::Duration;
Expand Down Expand Up @@ -508,6 +508,18 @@ pub fn new_forest_car_temp_path_in(
.into_temp_path())
}

pub fn tmp_exporting_forest_car_path(output_path: &Path) -> PathBuf {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's add a simple test here. The code is kind of obvious but obvious things managed to bite us in the past. Plus, we want coverage to be high.

let mut p = output_path.to_owned();
p.add_extension("tmp");
p
}

pub fn forest_car_sha256sum_path(output_path: &Path) -> PathBuf {
let mut p = output_path.to_owned();
p.add_extension("sha256sum");
p
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
4 changes: 4 additions & 0 deletions src/ipld/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ impl ChainExportGuard {

impl Drop for ChainExportGuard {
fn drop(&mut self) {
// In case some tasks are waiting on this token
if !self.cancellation_token.is_cancelled() {
self.cancellation_token.cancel();
}
Comment on lines +90 to +92

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check if it's cancelled? Might be a TOCTOU issue

end_export()
}
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ mod prelude {
pub use ahash::{HashMapExt as _, HashSetExt as _};
pub use anyhow::Context as _;
pub use cid::Cid;
pub use futures::FutureExt as _;
pub use itertools::Itertools as _;
pub use std::{ops::Deref as _, sync::Arc};
}
Expand Down
Loading
Loading