| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984 |
- use std::collections::HashMap;
- use std::mem::size_of;
- use std::str::FromStr;
- use std::sync::Arc;
- use bytes::Bytes;
- use either::{Either, Left, Right};
- use futures::{Future, StreamExt};
- use libp2p::identify::Event::Received;
- use libp2p::identity::Keypair;
- use libp2p::kad::NoKnownPeers;
- use libp2p::peer_store::Store;
- use libp2p::request_response::Event::*;
- use libp2p::request_response::Message::*;
- use libp2p::request_response::{self};
- use libp2p::swarm::SwarmEvent;
- use libp2p::{
- allow_block_list, connection_limits, memory_connection_limits, Multiaddr, PeerId, Swarm,
- };
- use nockapp::driver::{IODriverFn, NockAppHandle, PokeResult};
- use nockapp::noun::slab::NounSlab;
- use nockapp::utils::make_tas;
- use nockapp::utils::scry::*;
- use nockapp::wire::{Wire, WireRepr};
- use nockapp::{AtomExt, NockAppError, NounExt};
- use nockvm::noun::{Atom, Noun, D, T};
- use nockvm_macros::tas;
- use serde_bytes::ByteBuf;
- use tokio::sync::{mpsc, Mutex};
- use tokio::task::{AbortHandle, JoinError, JoinSet};
- use tracing::{debug, error, info, instrument, trace, warn};
- use crate::metrics::NockchainP2PMetrics;
- use crate::p2p::*;
- use crate::p2p_util::{log_fail2ban_ipv4, log_fail2ban_ipv6, MessageTracker, PeerIdExt};
- use crate::tip5_util::tip5_hash_to_base58;
- //TODO This wire is a placeholder for now. The libp2p driver is entangled with the other types of nockchain pokes
- //for historical reasons, and should be disentangled in the future.
- pub enum NockchainWire {
- Local,
- }
- impl Wire for NockchainWire {
- const VERSION: u64 = 1;
- const SOURCE: &'static str = "nc";
- }
- #[derive(Debug)]
- pub enum Libp2pWire {
- Gossip(PeerId),
- Response(PeerId),
- }
- impl Libp2pWire {
- fn verb(&self) -> &'static str {
- match self {
- Libp2pWire::Gossip(_) => "gossip",
- Libp2pWire::Response(_) => "response",
- }
- }
- fn peer_id(&self) -> &PeerId {
- match self {
- Libp2pWire::Gossip(peer_id) => peer_id,
- Libp2pWire::Response(peer_id) => peer_id,
- }
- }
- }
- impl Wire for Libp2pWire {
- const VERSION: u64 = 1;
- const SOURCE: &'static str = "libp2p";
- fn to_wire(&self) -> WireRepr {
- let tags = vec![self.verb().into(), "peer-id".into(), self.peer_id().to_base58().into()];
- WireRepr::new(Libp2pWire::SOURCE, Libp2pWire::VERSION, tags)
- }
- }
- enum EffectType {
- Gossip,
- Request,
- LiarPeer,
- LiarBlockId,
- Track,
- Seen,
- Unknown,
- }
- impl EffectType {
- fn from_noun_slab(noun_slab: &NounSlab) -> Self {
- let Ok(effect_cell) = (unsafe { noun_slab.root().as_cell() }) else {
- return EffectType::Unknown;
- };
- let head = effect_cell.head();
- let Ok(atom) = head.as_atom() else {
- return EffectType::Unknown;
- };
- let bytes = atom
- .to_bytes_until_nul()
- .expect("failed to strip null bytes");
- match bytes.as_slice() {
- b"gossip" => EffectType::Gossip,
- b"request" => EffectType::Request,
- b"liar-peer" => EffectType::LiarPeer,
- b"liar-block-id" => EffectType::LiarBlockId,
- b"track" => EffectType::Track,
- b"seen" => EffectType::Seen,
- _ => EffectType::Unknown,
- }
- }
- }
- struct TrackedJoinSet<T> {
- inner: JoinSet<T>,
- tasks: HashMap<String, AbortHandle>,
- }
- impl<T: 'static> TrackedJoinSet<T> {
- fn new() -> Self {
- Self {
- inner: JoinSet::new(),
- tasks: HashMap::new(),
- }
- }
- fn spawn(&mut self, name: String, task: impl Future<Output = T> + Send + 'static)
- where
- T: Send + 'static,
- {
- let handle = self.inner.spawn(task);
- self.tasks.insert(name, handle);
- }
- async fn join_next(&mut self) -> Option<Result<T, JoinError>> {
- let result = self.inner.join_next().await;
- if result.is_some() {
- // Remove the completed task from our tracking
- self.tasks.retain(|_, v| !v.is_finished());
- }
- result
- }
- // Keep this around for debugging
- #[allow(dead_code)]
- fn get_running_tasks(&self) -> Vec<String> {
- self.tasks.keys().cloned().collect()
- }
- }
- const POKE_VERSION: u64 = 0;
- #[instrument(skip(keypair, bind, allowed, limits, memory_limits, equix_builder))]
- pub fn make_libp2p_driver(
- keypair: Keypair,
- bind: Vec<Multiaddr>,
- allowed: Option<allow_block_list::Behaviour<allow_block_list::AllowedPeers>>,
- limits: connection_limits::ConnectionLimits,
- memory_limits: Option<memory_connection_limits::Behaviour>,
- initial_peers: &[Multiaddr],
- equix_builder: equix::EquiXBuilder,
- init_complete_tx: Option<tokio::sync::oneshot::Sender<()>>,
- ) -> IODriverFn {
- let initial_peers = Vec::from(initial_peers);
- Box::new(|mut handle| {
- let metrics = NockchainP2PMetrics::register(gnort::global_metrics_registry())
- .expect("Failed to register metrics!");
- Box::pin(async move {
- let mut swarm =
- match crate::p2p::start_swarm(keypair, bind, allowed, limits, memory_limits) {
- Ok(swarm) => swarm,
- Err(e) => {
- error!("Could not create swarm: {}", e);
- let (_, handle_clone) = handle.dup();
- tokio::spawn(async move {
- if let Err(e) = handle_clone.exit.exit(1).await {
- error!("Failed to send exit signal: {}", e);
- }
- });
- return Err(NockAppError::OtherError);
- }
- };
- let (swarm_tx, mut swarm_rx) = mpsc::channel::<SwarmAction>(1000); // number needs to be high enough to send gossips to peers
- let mut join_set = TrackedJoinSet::<Result<(), NockAppError>>::new();
- let message_tracker = Arc::new(Mutex::new(MessageTracker::new()));
- let mut kad_bootstrap = tokio::time::interval(KADEMLIA_BOOTSTRAP_INTERVAL);
- let mut initial_peer_retries_remaining = INITIAL_PEER_RETRIES;
- dial_initial_peers(&mut swarm, &initial_peers)?;
- if let Some(tx) = init_complete_tx {
- let _ = tx.send(());
- debug!("libp2p driver initialization complete signal sent");
- }
- loop {
- tokio::select! {
- Ok(noun_slab) = handle.next_effect() => {
- let _span = tracing::trace_span!("broadcast").entered();
- let swarm_tx_clone = swarm_tx.clone();
- let equix_builder_clone = equix_builder.clone();
- let local_peer_id = *swarm.local_peer_id();
- let connected_peers: Vec<PeerId> = swarm.connected_peers().cloned().collect();
- let message_tracker_clone = Arc::clone(&message_tracker); // Clone the Arc, not the MessageTracker
- join_set.spawn("handle_effect".to_string(), async move {
- handle_effect(noun_slab, swarm_tx_clone, equix_builder_clone, local_peer_id, connected_peers, message_tracker_clone).await
- });
- },
- Some(event) = swarm.next() => {
- match event {
- SwarmEvent::NewListenAddr { address, .. } => {
- info!("SEvent: Listening on {address:?}");
- },
- SwarmEvent::Behaviour(NockchainEvent::Identify(Received { connection_id: _, peer_id, info })) => {
- trace!("SEvent: identify_received");
- identify_received(&mut swarm, peer_id, info)?;
- },
- SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => {
- debug!("SEvent: {peer_id} is new friend via: {endpoint:?}");
- },
- SwarmEvent::ConnectionClosed { peer_id, endpoint, cause, .. } => {
- info!("SEvent: friendship ended with {peer_id} via: {endpoint:?}. cause: {cause:?}");
- // Clean up the message tracker when a peer disconnects
- let mut tracker = message_tracker.lock().await;
- tracker.remove_peer(&peer_id);
- },
- SwarmEvent::IncomingConnectionError { local_addr, send_back_addr, error, .. } => {
- error!("SEvent: Failed incoming connection from {} to {}: {}",
- send_back_addr, local_addr, error);
- },
- SwarmEvent::Behaviour(NockchainEvent::RequestResponse(Message { connection_id: _, peer, message })) => {
- trace!("SEvent: received RequestResponse");
- let _span = tracing::debug_span!("SwarmEvent::Behavior(NockchainEvent::RequestResponse(…))").entered();
- let swarm_tx_clone = swarm_tx.clone();
- let mut equix_builder_clone = equix_builder.clone();
- let local_peer_id = *swarm.local_peer_id();
- // We have to dup and move a handle back into `handle` to propitiate the borrow checker
- let (orig_handle, request_response_handle) = handle.dup();
- handle = orig_handle;
- let metrics = metrics.clone();
- let message_tracker_clone = Arc::clone(&message_tracker); // Clone the Arc, not the MessageTracker
- join_set.spawn("handle_request_response".to_string(), async move {
- handle_request_response(peer, message, swarm_tx_clone, &mut equix_builder_clone, local_peer_id, request_response_handle, metrics, message_tracker_clone).await
- });
- },
- SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
- error!("Failed outgoing connection to {:?}: {}", peer_id, error);
- },
- SwarmEvent::IncomingConnection {
- local_addr,
- send_back_addr,
- connection_id,
- ..
- } => {
- debug!("SEvent: Incoming connection from {local_addr:?} to {send_back_addr:?} with {connection_id:?}");
- },
- SwarmEvent::Dialing { peer_id, connection_id } => {
- debug!("SEvent: Dialing {peer_id:?} {connection_id}");
- },
- _ => {
- // Handle other swarm events
- trace!("SEvent: other swarm event {:?}", event);
- }
- }
- },
- Some(swarm_action) = swarm_rx.recv() => {
- // We do this because Swarm doesn't implement Send, and so we can't pass it into the tasks
- // being spawned in the match cases above.
- match swarm_action {
- SwarmAction::SendRequest { peer_id, request } => {
- trace!("SAction: SendRequest: {peer_id}");
- let _ = swarm.behaviour_mut().request_response.send_request(&peer_id, request);
- },
- SwarmAction::SendResponse { channel, response } => {
- trace!("SAction: SendResponse");
- let _ = swarm.behaviour_mut().request_response.send_response(channel, response);
- },
- SwarmAction::BlockPeer { peer_id } => {
- warn!("SAction: Blocking peer {peer_id}");
- // Block the peer in the allow_block_list
- swarm.behaviour_mut().allow_block_list.block_peer(peer_id);
- {
- // get peer IP address from the swarm
- let peer_addresses = swarm.behaviour_mut().peer_store.store().addresses_of_peer(&peer_id);
- if let Some(peer_multi_addrs) = peer_addresses {
- for multi_addr in peer_multi_addrs {
- for protocol in multi_addr.iter() {
- match protocol {
- libp2p::core::multiaddr::Protocol::Ip4(ip) => {
- log_fail2ban_ipv4(&peer_id, &ip);
- },
- libp2p::core::multiaddr::Protocol::Ip6(ip) => {
- log_fail2ban_ipv6(&peer_id, &ip);
- },
- // TODO: Dns?
- _ => {}
- }
- }
- }
- } else {
- error!("Failed to get peer IP address for peer id: {peer_id}");
- };
- }
- // Disconnect the peer if they're currently connected
- let _ = swarm.disconnect_peer_id(peer_id);
- },
- }
- },
- _ = kad_bootstrap.tick() => {
- // If we don't have any peers, we should retry dialing our initial peers
- if let Err(NoKnownPeers())= swarm.behaviour_mut().kad.bootstrap() {
- if initial_peer_retries_remaining > 0 {
- info!("Failed to bootstrap: {}", NoKnownPeers());
- initial_peer_retries_remaining -= 1;
- dial_initial_peers(&mut swarm, &initial_peers)?;
- } else {
- warn!("Failed to bootstrap after {} retries, will not attempt to redial initial peers.", INITIAL_PEER_RETRIES);
- }
- }
- },
- Some(result) = join_set.join_next() => {
- if let Err(e) = result {
- error!("Task error: {:?}", e);
- }
- },
- }
- }
- })
- })
- }
- #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
- /// Network struct (in serde/CBOR) for requests
- pub enum NockchainRequest {
- /// Request a block or TX from another node, carry PoW
- Request {
- pow: equix::SolutionByteArray,
- nonce: u64,
- message: ByteBuf,
- },
- /// Gossip a block or TX to another node
- Gossip { message: ByteBuf },
- }
- impl NockchainRequest {
- /// Make a new "request" which gossips a block or a TX
- fn new_gossip(message: &NounSlab) -> NockchainRequest {
- let message_bytes = ByteBuf::from(message.jam().as_ref());
- NockchainRequest::Gossip {
- message: message_bytes,
- }
- }
- /// Make a new request for a block or a TX
- fn new_request(
- builder: &mut equix::EquiXBuilder,
- local_peer_id: &libp2p::PeerId,
- remote_peer_id: &libp2p::PeerId,
- message: &NounSlab,
- ) -> NockchainRequest {
- let message_bytes = ByteBuf::from(message.jam().as_ref());
- let local_peer_bytes = (*local_peer_id).to_bytes();
- let remote_peer_bytes = (*remote_peer_id).to_bytes();
- let mut pow_buf = Vec::with_capacity(
- size_of::<u64>()
- + local_peer_bytes.len()
- + remote_peer_bytes.len()
- + message_bytes.len(),
- );
- pow_buf.extend_from_slice(&[0; size_of::<u64>()][..]);
- pow_buf.extend_from_slice(&local_peer_bytes[..]);
- pow_buf.extend_from_slice(&remote_peer_bytes[..]);
- pow_buf.extend_from_slice(&message_bytes[..]);
- let mut nonce = 0u64;
- let sol_bytes = loop {
- {
- let nonce_buf = &mut pow_buf[0..size_of::<u64>()];
- nonce_buf.copy_from_slice(&nonce.to_le_bytes()[..]);
- }
- if let Ok(sols) = builder.solve(&pow_buf[..]) {
- if !sols.is_empty() {
- break sols[0].to_bytes();
- }
- }
- nonce += 1;
- };
- NockchainRequest::Request {
- pow: sol_bytes,
- nonce,
- message: message_bytes,
- }
- }
- /// Verify the EquiX PoW attached to a request
- fn verify_pow(
- &self,
- builder: &mut equix::EquiXBuilder,
- local_peer_id: &libp2p::PeerId,
- remote_peer_id: &libp2p::PeerId,
- ) -> Result<(), equix::Error> {
- match self {
- NockchainRequest::Request {
- pow,
- nonce,
- message,
- } => {
- // This looks backwards, but it's because which node is local and which is remote
- // is swapped between generation at the sender and verification at the receiver.
- let local_peer_bytes = (*remote_peer_id).to_bytes();
- let remote_peer_bytes = (*local_peer_id).to_bytes();
- let nonce_bytes = nonce.to_le_bytes();
- let mut pow_buf = Vec::with_capacity(
- size_of::<u64>()
- + local_peer_bytes.len()
- + remote_peer_bytes.len()
- + message.len(),
- );
- pow_buf.extend_from_slice(&nonce_bytes[..]);
- pow_buf.extend_from_slice(&local_peer_bytes[..]);
- pow_buf.extend_from_slice(&remote_peer_bytes[..]);
- pow_buf.extend_from_slice(&message[..]);
- builder.verify_bytes(&pow_buf[..], pow)
- }
- NockchainRequest::Gossip { message: _ } => Ok(()),
- }
- }
- }
- #[derive(Debug, serde::Serialize, serde::Deserialize)]
- /// Responses to Nockchain requests
- pub enum NockchainResponse {
- /// The requested block or raw-tx
- Result { message: ByteBuf },
- /// If the request was a gossip, no actual response is needed
- Ack,
- }
- impl NockchainResponse {
- fn new_response_result(message: impl AsRef<[u8]>) -> NockchainResponse {
- let message_bytes: &[u8] = message.as_ref();
- let message_bytebuf = ByteBuf::from(message_bytes.to_vec());
- NockchainResponse::Result {
- message: message_bytebuf,
- }
- }
- }
- // fn emit_fail2ban(peer_ip: u128) -> Result<(), NockAppError> {
- // // get peer ip address
- // let peer_ip = peer_id.to_base58();
- // }
- async fn handle_effect(
- noun_slab: NounSlab,
- swarm_tx: mpsc::Sender<SwarmAction>,
- equix_builder: equix::EquiXBuilder,
- local_peer_id: PeerId,
- connected_peers: Vec<PeerId>,
- message_tracker: Arc<Mutex<MessageTracker>>,
- ) -> Result<(), NockAppError> {
- match EffectType::from_noun_slab(&noun_slab) {
- EffectType::Gossip => {
- // Get the tail of the gossip effect (after %gossip head)
- let mut tail_slab = NounSlab::new();
- let gossip_cell = unsafe { noun_slab.root().as_cell()?.tail() };
- // Skip version number
- // TODO: add version negotiation, reject unknown/incompatible versions
- let data_cell = gossip_cell.as_cell()?.tail();
- tail_slab.copy_into(data_cell);
- // Check if this is a heard-block gossip
- let gossip_noun = unsafe { tail_slab.root() };
- if let Ok(data_cell) = gossip_noun.as_cell() {
- if data_cell.head().eq_bytes(b"heard-block") {
- trace!("Gossip effect for heard-block, clearing block cache");
- let mut tracker = message_tracker.lock().await;
- tracker.block_cache.clear();
- }
- }
- let gossip_request = NockchainRequest::new_gossip(&tail_slab);
- for peer_id in connected_peers.clone() {
- let gossip_request_clone = gossip_request.clone();
- swarm_tx
- .send(SwarmAction::SendRequest {
- peer_id,
- request: gossip_request_clone,
- })
- .await
- .map_err(|_e| NockAppError::OtherError)?;
- }
- }
- EffectType::Request => {
- // Extract request details to check if it's a peer-specific request
- let request_cell = unsafe { noun_slab.root().as_cell()? };
- let request_body = request_cell.tail().as_cell()?;
- let request_type = request_body.head().as_direct()?;
- let target_peers = if request_type.data() == tas!(b"block") {
- let block_cell = request_body.tail().as_cell()?;
- if block_cell.head().eq_bytes(b"elders") {
- // Extract peer ID from elders request
- let elders_cell = block_cell.tail().as_cell()?;
- let peer_id_atom = elders_cell.tail().as_atom()?;
- if let Ok(bytes) = peer_id_atom.to_bytes_until_nul() {
- if let Ok(peer_id) = PeerId::from_bytes(&bytes) {
- vec![peer_id]
- } else {
- connected_peers.clone()
- }
- } else {
- connected_peers.clone()
- }
- } else {
- connected_peers.clone()
- }
- } else {
- connected_peers.clone()
- };
- for peer_id in target_peers {
- let local_peer_id_clone = local_peer_id;
- let mut equix_builder_clone = equix_builder.clone();
- let request = NockchainRequest::new_request(
- &mut equix_builder_clone, &local_peer_id_clone, &peer_id, &noun_slab,
- );
- swarm_tx
- .send(SwarmAction::SendRequest { peer_id, request })
- .await
- .map_err(|_e| NockAppError::OtherError)?;
- }
- }
- EffectType::LiarPeer => {
- let effect_cell = unsafe { noun_slab.root().as_cell()? };
- let peer_id_atom = effect_cell.tail().as_atom().map_err(|_| {
- NockAppError::IoError(std::io::Error::new(
- std::io::ErrorKind::Other,
- "Expected peer ID atom in liar-peer effect",
- ))
- })?;
- let bytes = peer_id_atom
- .to_bytes_until_nul()
- .expect("failed to strip null bytes");
- let peer_id_str = String::from_utf8(bytes).map_err(|_| {
- NockAppError::IoError(std::io::Error::new(
- std::io::ErrorKind::Other,
- "Invalid UTF-8 in peer ID",
- ))
- })?;
- let peer_id = PeerId::from_str(&peer_id_str).map_err(|_| {
- NockAppError::IoError(std::io::Error::new(
- std::io::ErrorKind::Other,
- "Invalid peer ID format",
- ))
- })?;
- swarm_tx
- .send(SwarmAction::BlockPeer { peer_id })
- .await
- .map_err(|_| NockAppError::OtherError)?;
- }
- EffectType::LiarBlockId => {
- let effect_cell = unsafe { noun_slab.root().as_cell()? };
- let block_id = effect_cell.tail();
- // Add the bad block ID
- let mut tracker = message_tracker.lock().await;
- let peers_to_ban = tracker.process_bad_block_id(block_id)?;
- // Ban each peer that sent this block
- for peer_id in peers_to_ban {
- swarm_tx
- .send(SwarmAction::BlockPeer { peer_id })
- .await
- .map_err(|_| NockAppError::OtherError)?;
- }
- }
- EffectType::Track => {
- let effect_cell = unsafe { noun_slab.root().as_cell()? };
- let track_cell = effect_cell.tail().as_cell()?;
- let action = track_cell.head();
- if action.eq_bytes(b"add") {
- // Handle [%track %add block-id peer-id]
- let data_cell = track_cell.tail().as_cell()?;
- let block_id = data_cell.head();
- let peer_id_atom = data_cell.tail().as_atom()?;
- // Convert peer_id from base58 string to PeerId
- let Ok(peer_id) = PeerId::from_noun(peer_id_atom.as_noun()) else {
- return Err(NockAppError::OtherError);
- };
- // Add to message tracker
- let mut tracker = message_tracker.lock().await;
- tracker.track_block_id_and_peer(block_id, peer_id)?;
- } else if action.eq_bytes(b"remove") {
- // Handle [%track %remove block-id]
- let block_id = track_cell.tail();
- // Remove from message tracker
- let mut tracker = message_tracker.lock().await;
- tracker.remove_block_id(block_id)?;
- } else {
- return Err(NockAppError::IoError(std::io::Error::new(
- std::io::ErrorKind::Other,
- "Invalid track action",
- )));
- }
- }
- EffectType::Seen => {
- let effect_cell = unsafe { noun_slab.root().as_cell()? };
- let seen_cell = effect_cell.tail().as_cell()?;
- let seen_type = seen_cell.head();
- if seen_type.eq_bytes(b"block") {
- let block_id = seen_cell.tail().as_cell()?;
- let mut tracker = message_tracker.lock().await;
- let block_id_str = tip5_hash_to_base58(block_id.as_noun())
- .expect("failed to convert block ID to base58");
- debug!("seen block id: {:?}", &block_id_str);
- tracker.seen_blocks.insert(block_id_str);
- } else if seen_type.eq_bytes(b"tx") {
- let tx_id = seen_cell.tail().as_cell()?;
- let mut tracker = message_tracker.lock().await;
- let tx_id_str = tip5_hash_to_base58(tx_id.as_noun())
- .expect("failed to convert tx ID to base58");
- tracker.seen_txs.insert(tx_id_str);
- }
- }
- EffectType::Unknown => {
- // This isn't unexpected - any effect that this driver doesn't handle
- // will hit this case.
- }
- }
- Ok(())
- }
- // TODO: Wrap some of this up.
- #[allow(clippy::too_many_arguments)]
- async fn handle_request_response(
- peer: PeerId,
- message: request_response::Message<NockchainRequest, NockchainResponse>,
- swarm_tx: mpsc::Sender<SwarmAction>,
- equix_builder: &mut equix::EquiXBuilder,
- local_peer_id: PeerId,
- nockapp: NockAppHandle,
- metrics: NockchainP2PMetrics,
- message_tracker: Arc<Mutex<MessageTracker>>,
- ) -> Result<(), NockAppError> {
- trace!("handle_request_response peer: {peer}");
- match message {
- Request {
- request, channel, ..
- } => {
- let Ok(()) = request.verify_pow(equix_builder, &local_peer_id, &peer) else {
- warn!("bad libp2p powork from {peer}, blocking!");
- swarm_tx
- .send(SwarmAction::BlockPeer { peer_id: peer })
- .await
- .map_err(|_| NockAppError::OtherError)?;
- return Ok(());
- };
- trace!("handle_request_response: powork verified");
- let mut request_slab = NounSlab::new();
- match request {
- NockchainRequest::Request {
- pow: _,
- nonce: _,
- message,
- } => {
- trace!("handle_request_response: Request received");
- let message_bytes = Bytes::from(message.to_vec());
- let request_noun = request_slab.cue_into(message_bytes)?;
- let (scry_res_slab, cache_hit) = if let Ok(Some(cache_result)) = {
- let mut tracker = message_tracker.lock().await;
- tracker.check_cache(&request_noun, &metrics).await
- } {
- debug!("found cached response for request");
- (cache_result, true)
- } else {
- let scry_slab = request_to_scry_slab(&request_noun)?;
- let Some(scry_res_slab) = (match nockapp.try_peek(scry_slab).await {
- Ok(Some(res_slab)) => {
- metrics.requests_peeked_some.increment();
- Some(res_slab)
- }
- Ok(None) => {
- metrics.requests_peeked_none.increment();
- trace!(
- "No data found for incoming request from: {}, request type: {:?}",
- peer,
- request_noun.as_cell()?.tail().as_cell().map(|c| c.head())
- );
- None
- }
- Err(NockAppError::MPSCFullError(act)) => {
- metrics.requests_dropped.increment();
- trace!(
- "handle_request_response: Request dropped due to backpressure"
- );
- Err(NockAppError::MPSCFullError(act))?
- }
- Err(err) => {
- metrics.requests_erred.increment();
- trace!("handle_request_response: Error getting response");
- Err(err)?
- }
- }) else {
- return Ok(());
- };
- (scry_res_slab, false)
- };
- let Ok(request_cell) = request_noun.as_cell() else {
- error!("request noun not a cell");
- return Err(NockAppError::OtherError);
- };
- let Ok(scry_tag) = request_cell.tail().as_cell()?.head().as_direct() else {
- error!("request tag axis not an atom");
- return Err(NockAppError::OtherError);
- };
- trace!("handle_request_response: request cell parsed");
- let mut res_slab = NounSlab::new();
- let response = match scry_tag.data() {
- tas!(b"block") => {
- trace!("handle_request_response: block tag");
- let scry_res = unsafe { scry_res_slab.root() };
- // Extract the request type from under %block
- let request_type = request_cell
- .tail()
- .as_cell()
- .and_then(|c| c.tail().as_cell())
- .map(|c| c.head().eq_bytes(b"elders"))
- .map_err(|_| {
- NockAppError::IoError(std::io::Error::new(
- std::io::ErrorKind::Other,
- "invalid block request structure",
- ))
- })?;
- // Use heard-elders for elders requests, heard-block otherwise
- let heard_type = if request_type {
- "heard-elders"
- } else {
- "heard-block"
- };
- match create_scry_response(scry_res, heard_type, &mut res_slab) {
- Left(()) => {
- trace!(
- "No data found for incoming block request, type: {}",
- heard_type
- );
- return Ok(());
- }
- Right(result) => {
- // cache response
- if !cache_hit && heard_type == "heard-block" {
- let height = request_cell
- .tail()
- .as_cell()?
- .tail()
- .as_cell()?
- .tail()
- .as_direct()?
- .data();
- let mut tracker = message_tracker.lock().await;
- tracker.block_cache.insert(height, scry_res_slab.clone());
- debug!("cacheing block request by height={:?}", height);
- }
- result?
- }
- }
- }
- tas!(b"raw-tx") => {
- trace!("handle_request_response: raw-tx tag");
- let scry_res = unsafe { scry_res_slab.root() };
- match create_scry_response(scry_res, "heard-tx", &mut res_slab) {
- Left(()) => {
- trace!("No data found for incoming raw-tx request");
- return Ok(());
- }
- Right(result) => {
- if !cache_hit {
- let tx_id = request_cell.tail().as_cell()?.tail();
- let tx_id_str = tip5_hash_to_base58(tx_id)?;
- let mut tracker = message_tracker.lock().await;
- debug!("cacheing tx request by id={:?}", tx_id_str);
- tracker.tx_cache.insert(tx_id_str, scry_res_slab.clone());
- }
- result?
- }
- }
- }
- tag => {
- error!("Unknown request tag: {:?}", tag);
- return Err(NockAppError::OtherError);
- }
- };
- swarm_tx
- .send(SwarmAction::SendResponse { channel, response })
- .await
- .map_err(|_| NockAppError::OtherError)?;
- }
- NockchainRequest::Gossip { message } => {
- trace!("handle_request_response: Gossip received");
- let message_bytes = Bytes::from(message.to_vec());
- let request_noun = request_slab.cue_into(message_bytes)?;
- trace!("handle_request_response: Gossip noun parsed");
- let send_response: tokio::task::JoinHandle<Result<(), NockAppError>> =
- tokio::spawn(async move {
- let response = NockchainResponse::Ack;
- swarm_tx
- .send(SwarmAction::SendResponse { channel, response })
- .await
- .map_err(|_| NockAppError::OtherError)?;
- Ok(())
- });
- let poke_kernel = tokio::task::spawn(async move {
- let head = request_noun.as_cell()?.head();
- if head.eq_bytes(b"heard-block") {
- let page = request_noun.as_cell()?.tail();
- let block_id = page.as_cell()?.head();
- let block_id_str = tip5_hash_to_base58(block_id)?;
- let tracker = message_tracker.lock().await;
- if tracker.seen_blocks.contains(&block_id_str) {
- debug!("Block already seen, not processing: {:?}", block_id_str);
- metrics.block_seen_cache_hits.increment();
- return Ok(());
- } else {
- debug!("block not seen, processing: {:?}", block_id_str);
- metrics.block_seen_cache_misses.increment();
- }
- }
- if head.eq_bytes(b"heard-tx") {
- let raw_tx = request_noun.as_cell()?.tail();
- let tx_id = raw_tx.as_cell()?.head();
- let tracker = message_tracker.lock().await;
- let tx_id_str = tip5_hash_to_base58(tx_id)?;
- if tracker.seen_txs.contains(&tx_id_str) {
- debug!("Tx already seen, not processing: {:?}", tx_id_str);
- metrics.tx_seen_cache_hits.increment();
- return Ok(());
- } else {
- debug!("tx not seen, processing: {:?}", tx_id_str);
- metrics.tx_seen_cache_misses.increment();
- }
- }
- let request_fact = prepend_tas(
- &mut request_slab,
- "fact",
- vec![D(POKE_VERSION), request_noun],
- )?;
- request_slab.set_root(request_fact);
- let wire = Libp2pWire::Gossip(peer);
- trace!(
- "Poking kernel with wire: {:?} noun: {:?}",
- wire,
- nockvm::noun::FullDebugCell(unsafe { &request_slab.root().as_cell()? })
- );
- match nockapp.try_poke(wire.to_wire(), request_slab).await {
- Ok(PokeResult::Ack) => {
- metrics.gossip_acked.increment();
- }
- Ok(PokeResult::Nack) => {
- metrics.gossip_nacked.increment();
- trace!("handle_request_response: gossip poke nacked");
- return Ok(());
- }
- Err(NockAppError::MPSCFullError(act)) => {
- metrics.gossip_dropped.increment();
- trace!(
- "handle_request_response: gossip poke dropped due to backpressure"
- );
- return Err(NockAppError::MPSCFullError(act));
- }
- Err(err) => {
- metrics.gossip_erred.increment();
- trace!("handle_request_response: Poke errored");
- return Err(err);
- }
- };
- trace!("handle_request_response: Poke successful");
- Ok(())
- });
- send_response.await??;
- poke_kernel.await??;
- }
- }
- }
- Response { response, .. } => match response {
- NockchainResponse::Result { message } => {
- trace!("handle_request_response: Response result received");
- let mut response_slab = NounSlab::new();
- let message_bytes = Bytes::from(message.to_vec());
- let response_noun = response_slab.cue_into(message_bytes)?;
- trace!("Received response from peer");
- trace!(
- "Response noun: {:?}",
- nockvm::noun::FullDebugCell(&response_noun.as_cell()?)
- );
- let response_fact = prepend_tas(
- &mut response_slab,
- "fact",
- vec![D(POKE_VERSION), response_noun],
- )?;
- response_slab.set_root(response_fact);
- let wire = Libp2pWire::Response(peer);
- match nockapp.try_poke(wire.to_wire(), response_slab).await {
- Ok(PokeResult::Ack) => {
- metrics.responses_acked.increment();
- }
- Ok(PokeResult::Nack) => {
- metrics.responses_nacked.increment();
- trace!("handle_request_response: Poke failed");
- return Ok(());
- }
- Err(NockAppError::MPSCFullError(act)) => {
- trace!("handle_request_response: Response dropped due to backpressure.");
- metrics.responses_dropped.increment();
- return Err(NockAppError::MPSCFullError(act));
- }
- Err(_) => {
- trace!("handle_request_response: Error poking with response");
- metrics.responses_erred.increment();
- trace!("Error sending poke")
- }
- }
- trace!("handle_request_response: Poke successful");
- }
- NockchainResponse::Ack => {
- trace!("Received acknowledgement from peer {}", peer);
- }
- },
- }
- Ok(())
- }
- /// Converts a request noun into a scry path that can be used to query the Nockchain state.
- ///
- /// The request noun is expected to be in the format:
- /// `[%request [type data]]` where:
- /// - `type` can be either "block" or "raw-tx"
- /// - For "block" type:
- /// - `data` can be either `[%by-height height]` or `[%elders block-id peer-id]`
- /// - For "raw-tx" type:
- /// - `data` must be `[%by-id id]`
- ///
- /// # Arguments
- /// * `noun` - The request noun to convert
- ///
- /// # Returns
- /// * `Ok(NounSlab)` - A noun slab containing the constructed scry path
- /// * `Err(NockAppError)` - If the request format is invalid or unknown
- ///
- /// # Examples
- /// For a block by height request:
- /// [%request [%block [%by-height 123]]] -> [%heavy-n 123 0]
- /// For a block by id request:
- /// [%request [%block [%elders [1 2 3 4 5] abcDEF]]] -> [%elders base58-block-id peer-id 0]
- /// For a raw transaction request:
- /// [%request [%raw-tx [%by-id [1 2 3 4 5]]]] -> [%raw-transaction base58-tx-id 0]
- fn request_to_scry_slab(noun: &Noun) -> Result<NounSlab, NockAppError> {
- let Ok(request) = noun.as_cell() else {
- return Err(NockAppError::IoError(std::io::Error::new(
- std::io::ErrorKind::Other,
- "Unknown request - not a cell",
- )));
- };
- let Ok(tag) = request.head().as_direct() else {
- return Err(NockAppError::IoError(std::io::Error::new(
- std::io::ErrorKind::Other,
- "Unknown request - not a direct",
- )));
- };
- if tag.data() != tas!(b"request") {
- return Err(NockAppError::IoError(std::io::Error::new(
- std::io::ErrorKind::Other,
- "Unknown request - not a request",
- )));
- }
- let mut scry_path_slab = NounSlab::new();
- let request_body = request.tail().as_cell()?;
- if request_body.head().eq_bytes(b"block") {
- let Ok(tail_cell) = request_body.tail().as_cell() else {
- return Err(NockAppError::IoError(std::io::Error::new(
- std::io::ErrorKind::Other,
- "Invalid block request",
- )));
- };
- if tail_cell.head().eq_bytes(b"by-height") && tail_cell.tail().is_atom() {
- let pax = T(
- &mut scry_path_slab,
- &[D(tas!(b"heavy-n")), tail_cell.tail(), D(0)],
- );
- scry_path_slab.set_root(pax);
- trace!(
- "block by-height: {:?}",
- nockvm::noun::DebugPath(&pax.as_cell()?)
- );
- return Ok(scry_path_slab);
- } else if tail_cell.head().eq_bytes(b"elders") {
- let Ok(elders_cell) = tail_cell.tail().as_cell() else {
- return Err(NockAppError::IoError(std::io::Error::new(
- std::io::ErrorKind::Other,
- "Invalid elders request",
- )));
- };
- let block_id_b58 = tip5_hash_to_base58(elders_cell.head())?;
- let block_id_atom =
- Atom::from_value(&mut scry_path_slab, block_id_b58).unwrap_or_else(|_| {
- panic!(
- "Called `expect()` at {}:{} (git sha: {})",
- file!(),
- line!(),
- option_env!("GIT_SHA").unwrap_or("unknown")
- )
- });
- let peer_id = elders_cell.tail();
- let pax = T(
- &mut scry_path_slab,
- &[D(tas!(b"elders")), block_id_atom.as_noun(), peer_id, D(0)],
- );
- debug!(
- "block elders: {:?}",
- nockvm::noun::DebugPath(&pax.as_cell()?)
- );
- scry_path_slab.set_root(pax);
- return Ok(scry_path_slab);
- }
- } else if request_body.head().eq_bytes(b"raw-tx") {
- let Ok(tail_cell) = request_body.tail().as_cell() else {
- return Err(NockAppError::IoError(std::io::Error::new(
- std::io::ErrorKind::Other,
- "Invalid raw-tx request",
- )));
- };
- if tail_cell.head().eq_bytes(b"by-id") {
- let tx_id_b58 = tip5_hash_to_base58(tail_cell.tail())?;
- let tx_id_atom =
- Atom::from_value(&mut scry_path_slab, tx_id_b58).unwrap_or_else(|_| {
- panic!(
- "Called `expect()` at {}:{} (git sha: {})",
- file!(),
- line!(),
- option_env!("GIT_SHA").unwrap_or("unknown")
- )
- });
- let raw_tx_tag = make_tas(&mut scry_path_slab, "raw-transaction").as_noun();
- let pax = T(
- &mut scry_path_slab,
- &[raw_tx_tag, tx_id_atom.as_noun(), D(0)],
- );
- debug!("tx by-id: {:?}", nockvm::noun::DebugPath(&pax.as_cell()?));
- scry_path_slab.set_root(pax);
- return Ok(scry_path_slab);
- }
- }
- // log the head of the request body
- Err(NockAppError::IoError(std::io::Error::new(
- std::io::ErrorKind::Other,
- format!("Unknown request - {:?}", request_body.head()),
- )))
- }
- /// Creates a response to a scry request by processing the scry result noun
- ///
- /// # Arguments
- /// * `scry_res` - The noun containing the scry result to process
- /// * `heard_type` - The type of request that was heard (as a string)
- /// * `res_slab` - Mutable reference to the noun slab for storing the response
- ///
- /// # Returns
- /// Either:
- /// - `Left(())` if the scry path was bad or nothing was found
- /// - `Right(Ok(NockchainResponse))` containing the successful response
- /// - `Right(Err(NockAppError))` if there was an error processing the result
- fn create_scry_response(
- scry_res: &Noun,
- heard_type: &str,
- res_slab: &mut NounSlab,
- ) -> Either<(), Result<NockchainResponse, NockAppError>> {
- match ScryResult::from(scry_res) {
- ScryResult::BadPath => {
- warn!("Bad scry path");
- Left(())
- }
- ScryResult::Nothing => {
- trace!("Nothing found at scry path");
- Left(())
- }
- ScryResult::Some(x) => {
- let nouns = vec![x];
- if let Ok(response_noun) = prepend_tas(res_slab, heard_type, nouns) {
- res_slab.set_root(response_noun);
- Right(Ok(NockchainResponse::new_response_result(res_slab.jam())))
- } else {
- error!("Failed to prepend tas to response noun");
- Right(Err(NockAppError::OtherError))
- }
- }
- ScryResult::Invalid => {
- error!("Invalid scry result");
- Right(Err(NockAppError::OtherError))
- }
- }
- }
- /// Prepends a @tas to one or more Nouns.
- ///
- /// # Arguments
- /// * `slab` - The NounSlab containing the noun
- /// * `tas_str` - The tag string to prepend
- /// * `nouns` - The Nouns to include
- ///
- /// # Returns
- /// The noun with @tas prepended
- fn prepend_tas(slab: &mut NounSlab, tas_str: &str, nouns: Vec<Noun>) -> Result<Noun, NockAppError> {
- let tas_atom = Atom::from_value(slab, tas_str)?;
- // Create a cell with the tag and all provided nouns
- let mut cell_elements = Vec::with_capacity(nouns.len() + 1);
- cell_elements.push(tas_atom.as_noun());
- cell_elements.extend(nouns);
- Ok(T(slab, &cell_elements))
- }
- #[cfg(test)]
- mod tests {
- use nockapp::noun::slab::NounSlab;
- use nockvm::noun::{D, T};
- use nockvm_macros::tas;
- use super::*;
- #[test]
- #[cfg_attr(miri, ignore)] // ibig has a memory leak so miri fails this test
- fn test_request_to_scry_slab() {
- // Test block by-height request
- {
- let mut slab = NounSlab::new();
- let height = 123u64;
- let by_height_tas = make_tas(&mut slab, "by-height");
- let by_height = T(&mut slab, &[by_height_tas.as_noun(), D(height)]);
- let block_cell = T(&mut slab, &[D(tas!(b"block")), by_height]);
- let request = T(&mut slab, &[D(tas!(b"request")), block_cell]);
- slab.set_root(request);
- let result_slab = request_to_scry_slab(unsafe { slab.root() }).unwrap_or_else(|_| {
- panic!(
- "Called `expect()` at {}:{} (git sha: {})",
- file!(),
- line!(),
- option_env!("GIT_SHA").unwrap_or("unknown")
- )
- });
- let result = unsafe { result_slab.root() };
- assert!(result.is_cell());
- let result_cell = result.as_cell().unwrap_or_else(|_| {
- panic!(
- "Called `expect()` at {}:{} (git sha: {})",
- file!(),
- line!(),
- option_env!("GIT_SHA").unwrap_or("unknown")
- )
- });
- assert!(result_cell.head().eq_bytes(b"heavy-n"));
- // Get the tail cell and check its components
- let tail_cell = result_cell.tail().as_cell().unwrap_or_else(|_| {
- panic!(
- "Called `expect()` at {}:{} (git sha: {})",
- file!(),
- line!(),
- option_env!("GIT_SHA").unwrap_or("unknown")
- )
- });
- let height_atom = tail_cell.head().as_atom().unwrap_or_else(|_| {
- panic!(
- "Called `expect()` at {}:{} (git sha: {})",
- file!(),
- line!(),
- option_env!("GIT_SHA").unwrap_or("unknown")
- )
- });
- assert_eq!(
- height_atom.as_u64().unwrap_or_else(|err| {
- panic!(
- "Panicked with {err:?} at {}:{} (git sha: {:?})",
- file!(),
- line!(),
- option_env!("GIT_SHA")
- )
- }),
- height
- );
- let tail_atom = tail_cell.tail().as_atom().unwrap_or_else(|_| {
- panic!(
- "Called `expect()` at {}:{} (git sha: {})",
- file!(),
- line!(),
- option_env!("GIT_SHA").unwrap_or("unknown")
- )
- });
- assert_eq!(
- tail_atom.as_u64().unwrap_or_else(|err| {
- panic!(
- "Panicked with {err:?} at {}:{} (git sha: {:?})",
- file!(),
- line!(),
- option_env!("GIT_SHA")
- )
- }),
- 0
- );
- }
- // Test invalid request (not a cell)
- {
- let mut slab = NounSlab::new();
- slab.set_root(D(123));
- let result = request_to_scry_slab(unsafe { slab.root() });
- assert!(result.is_err());
- }
- // Test elders request
- {
- let mut slab = NounSlab::new();
- // Create a 5-tuple [1 2 3 4 5] for the block ID
- let five_tuple = T(&mut slab, &[D(1), D(2), D(3), D(4), D(5)]);
- // Create a random peer ID and store its bytes
- let peer_id = PeerId::random();
- let peer_id_atom =
- Atom::from_value(&mut slab, peer_id.to_base58()).unwrap_or_else(|_| {
- panic!(
- "Called `expect()` at {}:{} (git sha: {})",
- file!(),
- line!(),
- option_env!("GIT_SHA").unwrap_or("unknown")
- )
- });
- let elders_cell = T(&mut slab, &[five_tuple, peer_id_atom.as_noun()]);
- let elders_tas = D(tas!(b"elders"));
- let inner_cell = T(&mut slab, &[elders_tas, elders_cell]);
- let block_cell = T(&mut slab, &[D(tas!(b"block")), inner_cell]);
- let request = T(&mut slab, &[D(tas!(b"request")), block_cell]);
- slab.set_root(request);
- let result_slab = request_to_scry_slab(unsafe { slab.root() }).unwrap_or_else(|_| {
- panic!(
- "Called `expect()` at {}:{} (git sha: {})",
- file!(),
- line!(),
- option_env!("GIT_SHA").unwrap_or("unknown")
- )
- });
- let result = unsafe { result_slab.root() };
- // Verify the structure: [%elders block_id_b58 peer_id 0]
- assert!(result.is_cell());
- let result_cell = result.as_cell().unwrap_or_else(|_| {
- panic!(
- "Called `expect()` at {}:{} (git sha: {})",
- file!(),
- line!(),
- option_env!("GIT_SHA").unwrap_or("unknown")
- )
- });
- // Check %elders tag
- assert!(result_cell.head().eq_bytes(b"elders"));
- // Get the tail cell
- let tail_cell = result_cell.tail().as_cell().unwrap_or_else(|_| {
- panic!(
- "Called `expect()` at {}:{} (git sha: {})",
- file!(),
- line!(),
- option_env!("GIT_SHA").unwrap_or("unknown")
- )
- });
- // Check block ID (should be base58 encoded)
- let block_id_atom = tail_cell.head().as_atom().unwrap_or_else(|_| {
- panic!(
- "Called `expect()` at {}:{} (git sha: {})",
- file!(),
- line!(),
- option_env!("GIT_SHA").unwrap_or("unknown")
- )
- });
- let block_id_bytes = block_id_atom.to_bytes_until_nul().unwrap_or_else(|_| {
- panic!(
- "Called `expect()` at {}:{} (git sha: {})",
- file!(),
- line!(),
- option_env!("GIT_SHA").unwrap_or("unknown")
- )
- });
- let block_id_str = String::from_utf8(block_id_bytes).unwrap_or_else(|_| {
- panic!(
- "Called `expect()` at {}:{} (git sha: {})",
- file!(),
- line!(),
- option_env!("GIT_SHA").unwrap_or("unknown")
- )
- });
- // Get the expected base58 string
- let expected_b58 = tip5_hash_to_base58(five_tuple).unwrap_or_else(|_| {
- panic!(
- "Called `expect()` at {}:{} (git sha: {})",
- file!(),
- line!(),
- option_env!("GIT_SHA").unwrap_or("unknown")
- )
- });
- assert_eq!(block_id_str, expected_b58);
- // Check peer ID
- let peer_cell = tail_cell.tail().as_cell().unwrap_or_else(|_| {
- panic!(
- "Called `expect()` at {}:{} (git sha: {})",
- file!(),
- line!(),
- option_env!("GIT_SHA").unwrap_or("unknown")
- )
- });
- let peer_id_result = peer_cell.head().as_atom().unwrap_or_else(|_| {
- panic!(
- "Called `expect()` at {}:{} (git sha: {})",
- file!(),
- line!(),
- option_env!("GIT_SHA").unwrap_or("unknown")
- )
- });
- let peer_bytes = peer_id_result.to_bytes_until_nul().unwrap_or_else(|_| {
- panic!(
- "Called `expect()` at {}:{} (git sha: {})",
- file!(),
- line!(),
- option_env!("GIT_SHA").unwrap_or("unknown")
- )
- });
- let peer_str = String::from_utf8(peer_bytes).unwrap_or_else(|_| {
- panic!(
- "Called `expect()` at {}:{} (git sha: {})",
- file!(),
- line!(),
- option_env!("GIT_SHA").unwrap_or("unknown")
- )
- });
- assert_eq!(peer_str, peer_id.to_base58());
- // Check final 0
- assert_eq!(
- peer_cell
- .tail()
- .as_direct()
- .unwrap_or_else(|err| {
- panic!(
- "Panicked with {err:?} at {}:{} (git sha: {:?})",
- file!(),
- line!(),
- option_env!("GIT_SHA")
- )
- })
- .data(),
- 0
- );
- }
- // Test invalid elders request (not a cell)
- {
- let mut slab = NounSlab::new();
- let invalid_request = T(
- &mut slab,
- &[D(tas!(b"request")), D(tas!(b"block")), D(tas!(b"elders"))],
- );
- slab.set_root(invalid_request);
- let result = request_to_scry_slab(unsafe { slab.root() });
- assert!(result.is_err());
- }
- }
- #[test]
- #[cfg_attr(miri, ignore)] // equix uses a foreign function so miri fails this tes
- fn test_equix_pow_verification() {
- use equix::EquiXBuilder;
- // Create EquiX builder - new() doesn't return Result
- let mut builder = EquiXBuilder::new();
- // Create test peer IDs
- let local_peer_id = PeerId::random();
- let remote_peer_id = PeerId::random();
- // Create test message
- let message = ByteBuf::from(vec![1, 2, 3, 4, 5]);
- // Create valid request with correct PoW
- let valid_request =
- NockchainRequest::new_request(&mut builder, &local_peer_id, &remote_peer_id, &{
- let mut slab = NounSlab::new();
- let message_noun = Atom::from_value(&mut slab, &message[..])
- .expect("Failed to create message atom")
- .as_noun();
- slab.set_root(message_noun);
- slab
- });
- // Verify the valid request
- match &valid_request {
- NockchainRequest::Request {
- pow,
- nonce,
- message: _,
- } => {
- // Test successful verification
- let result = valid_request.verify_pow(
- &mut builder, &remote_peer_id, // Note: peers are swapped for verification
- &local_peer_id,
- );
- assert!(result.is_ok(), "Valid PoW should verify successfully");
- // Test failed verification with tampered nonce
- let tampered_request = NockchainRequest::Request {
- pow: *pow,
- nonce: nonce + 1, // Tamper with the nonce
- message: message.clone(),
- };
- let result =
- tampered_request.verify_pow(&mut builder, &remote_peer_id, &local_peer_id);
- assert!(result.is_err(), "Tampered nonce should fail verification");
- // Test failed verification with wrong peer order
- let result = valid_request.verify_pow(
- &mut builder, &local_peer_id, // Wrong order - not swapped
- &remote_peer_id,
- );
- assert!(result.is_err(), "Wrong peer order should fail verification");
- }
- _ => panic!("Expected Request variant"),
- }
- // Test that gossip requests always verify successfully
- let gossip_request = NockchainRequest::Gossip {
- message: message.clone(),
- };
- let result = gossip_request.verify_pow(&mut builder, &remote_peer_id, &local_peer_id);
- assert!(
- result.is_ok(),
- "Gossip requests should always verify successfully"
- );
- }
- #[tokio::test]
- #[cfg_attr(miri, ignore)]
- async fn test_liar_peer_effect() {
- use equix::EquiXBuilder;
- use tokio::sync::mpsc;
- // Create a test peer ID and convert to base58
- let peer_id = PeerId::random();
- let peer_id_base58 = peer_id.to_base58();
- // Create the liar-peer effect noun
- let mut effect_slab = NounSlab::new();
- let liar_peer_atom = Atom::from_value(&mut effect_slab, "liar-peer")
- .expect("Failed to create liar-peer atom");
- let peer_id_atom = Atom::from_value(&mut effect_slab, peer_id_base58)
- .expect("Failed to create peer ID atom");
- let effect = T(
- &mut effect_slab,
- &[liar_peer_atom.as_noun(), peer_id_atom.as_noun()],
- );
- effect_slab.set_root(effect);
- // Create channel to receive SwarmAction
- let (swarm_tx, mut swarm_rx) = mpsc::channel(1);
- // Call handle_effect with the liar-peer effect
- let result = handle_effect(
- effect_slab,
- swarm_tx,
- EquiXBuilder::new(),
- PeerId::random(), // local peer ID (not relevant for this test)
- vec![], // connected peers (not relevant for this test)
- Arc::new(Mutex::new(MessageTracker::new())),
- )
- .await;
- // Verify the function succeeded
- assert!(result.is_ok(), "handle_effect should succeed");
- // Verify that a BlockPeer action was sent with the correct peer ID
- match swarm_rx.recv().await {
- Some(SwarmAction::BlockPeer {
- peer_id: blocked_peer,
- }) => {
- assert_eq!(blocked_peer, peer_id, "Wrong peer ID was blocked");
- }
- other => panic!("Expected BlockPeer action, got {:?}", other),
- }
- // Verify no more actions were sent
- assert!(swarm_rx.try_recv().is_err(), "Should only send one action");
- }
- #[tokio::test]
- #[cfg_attr(miri, ignore)] // ibig has a memory leak so miri fails this test
- async fn test_track_add_effect() {
- use equix::EquiXBuilder;
- use tokio::sync::mpsc;
- // Create test peer ID
- let peer_id = PeerId::random();
- let peer_id_base58 = peer_id.to_base58();
- // Create the track add effect noun
- let mut effect_slab = NounSlab::new();
- let track_atom = make_tas(&mut effect_slab, "track");
- let add_atom = make_tas(&mut effect_slab, "add");
- // Create block ID as [1 2 3 4 5]
- let block_id_tuple = T(&mut effect_slab, &[D(1), D(2), D(3), D(4), D(5)]);
- let peer_id_atom = Atom::from_value(&mut effect_slab, peer_id_base58)
- .expect("Failed to create peer ID atom");
- // Build the noun structure: [%track %add block-id peer-id]
- let data_cell = T(&mut effect_slab, &[block_id_tuple, peer_id_atom.as_noun()]);
- let add_cell = T(&mut effect_slab, &[add_atom.as_noun(), data_cell]);
- let track_cell = T(&mut effect_slab, &[track_atom.as_noun(), add_cell]);
- effect_slab.set_root(track_cell);
- // Create message tracker and other required components
- let message_tracker = Arc::new(Mutex::new(MessageTracker::new()));
- let (swarm_tx, _swarm_rx) = mpsc::channel(1);
- // Call handle_effect with the track add effect
- let result = handle_effect(
- effect_slab.clone(), // test fails if we don't clone
- swarm_tx,
- EquiXBuilder::new(),
- PeerId::random(), // local peer ID (not relevant for this test)
- vec![], // connected peers (not relevant for this test)
- message_tracker.clone(),
- )
- .await;
- // Verify the function succeeded
- assert!(result.is_ok(), "handle_effect should succeed");
- // Get the expected block ID string (base58 of [1 2 3 4 5])
- let block_id_str = tip5_hash_to_base58(block_id_tuple).unwrap_or_else(|_| {
- panic!(
- "Called `expect()` at {}:{} (git sha: {})",
- file!(),
- line!(),
- option_env!("GIT_SHA").unwrap_or("unknown")
- )
- });
- // Verify the message tracker state
- let tracker = message_tracker.lock().await;
- // Check block_id_to_peers mapping
- let peers = tracker.get_peers_for_block_id(block_id_tuple);
- assert!(
- peers.contains(&peer_id),
- "Peer ID should be associated with block ID"
- );
- // Check peer_to_block_ids mapping
- let block_ids = tracker.get_block_ids_for_peer(peer_id);
- assert!(
- block_ids.contains(&block_id_str),
- "Block ID should be associated with peer ID"
- );
- }
- #[tokio::test]
- #[cfg_attr(miri, ignore)] // ibig has a memory leak so miri fails this test
- async fn test_track_remove_effect() {
- use equix::EquiXBuilder;
- use tokio::sync::mpsc;
- // Create test peer ID
- let peer_id = PeerId::random();
- // Create a message tracker and add an entry that we'll later remove
- let message_tracker = Arc::new(Mutex::new(MessageTracker::new()));
- // Create block ID as [1 2 3 4 5]
- let mut setup_slab = NounSlab::new();
- let block_id_tuple = T(&mut setup_slab, &[D(1), D(2), D(3), D(4), D(5)]);
- {
- let mut tracker = message_tracker.lock().await;
- tracker
- .track_block_id_and_peer(block_id_tuple, peer_id)
- .unwrap_or_else(|_| {
- panic!(
- "Called `expect()` at {}:{} (git sha: {})",
- file!(),
- line!(),
- option_env!("GIT_SHA").unwrap_or("unknown")
- )
- });
- // Verify it was added correctly
- assert!(tracker.is_tracking_block_id(block_id_tuple));
- assert!(tracker.is_tracking_peer(peer_id));
- }
- // Now create the track remove effect noun
- let mut effect_slab = NounSlab::new();
- let track_atom = make_tas(&mut effect_slab, "track");
- let remove_atom = make_tas(&mut effect_slab, "remove");
- // Copy the block ID tuple to the effect slab
- let block_id_tuple_in_effect = T(&mut effect_slab, &[D(1), D(2), D(3), D(4), D(5)]);
- // Build the noun structure: [%track %remove block-id]
- let remove_cell = T(
- &mut effect_slab,
- &[remove_atom.as_noun(), block_id_tuple_in_effect],
- );
- let track_cell = T(&mut effect_slab, &[track_atom.as_noun(), remove_cell]);
- effect_slab.set_root(track_cell);
- // Create channel for SwarmAction (not used in this test)
- let (swarm_tx, _swarm_rx) = mpsc::channel(1);
- // Call handle_effect with the track remove effect
- let result = handle_effect(
- effect_slab,
- swarm_tx,
- EquiXBuilder::new(),
- PeerId::random(), // local peer ID (not relevant for this test)
- vec![], // connected peers (not relevant for this test)
- message_tracker.clone(),
- )
- .await;
- // Verify the function succeeded
- assert!(result.is_ok(), "handle_effect should succeed");
- // Verify the message tracker state after removal
- let tracker = message_tracker.lock().await;
- // Check that the block ID was removed from block_id_to_peers
- assert!(
- !tracker.is_tracking_block_id(block_id_tuple),
- "Block ID should be removed"
- );
- // Check that the peer's entry in peer_to_block_ids is also removed
- // (since this was the only block ID associated with the peer)
- assert!(
- !tracker.is_tracking_peer(peer_id),
- "Peer ID should be removed since it has no more block IDs"
- );
- }
- #[tokio::test]
- #[cfg_attr(miri, ignore)] // ibig has a memory leak so miri fails this test
- async fn test_liar_block_id_effect() {
- use equix::EquiXBuilder;
- use tokio::sync::mpsc;
- println!("Starting test_liar_block_id_effect");
- // Create test peer IDs
- let bad_peer1 = PeerId::random();
- let bad_peer2 = PeerId::random();
- let good_peer = PeerId::random();
- println!(
- "Created peer_ids: bad1={}, bad2={}, good={}",
- bad_peer1, bad_peer2, good_peer
- );
- // Create a message tracker and add entries
- let message_tracker = Arc::new(Mutex::new(MessageTracker::new()));
- // Create block IDs
- let mut setup_slab = NounSlab::new();
- // Bad block ID as [1 2 3 4 5]
- let bad_block_id = T(&mut setup_slab, &[D(1), D(2), D(3), D(4), D(5)]);
- // Good block ID as [6 7 8 9 10]
- let good_block_id = T(&mut setup_slab, &[D(6), D(7), D(8), D(9), D(10)]);
- println!("Created block_ids");
- {
- let mut tracker = message_tracker.lock().await;
- println!("Tracking block_ids and peers");
- // Associate bad_peer1 with the bad block
- tracker
- .track_block_id_and_peer(bad_block_id, bad_peer1)
- .unwrap_or_else(|_| {
- panic!(
- "Called `expect()` at {}:{} (git sha: {})",
- file!(),
- line!(),
- option_env!("GIT_SHA").unwrap_or("unknown")
- )
- });
- // Associate bad_peer2 with the bad block
- tracker
- .add_peer_if_tracking_block_id(bad_block_id, bad_peer2)
- .unwrap_or_else(|_| {
- panic!(
- "Called `expect()` at {}:{} (git sha: {})",
- file!(),
- line!(),
- option_env!("GIT_SHA").unwrap_or("unknown")
- )
- });
- // Associate good_peer with a different block
- tracker
- .track_block_id_and_peer(good_block_id, good_peer)
- .unwrap_or_else(|_| {
- panic!(
- "Called `expect()` at {}:{} (git sha: {})",
- file!(),
- line!(),
- option_env!("GIT_SHA").unwrap_or("unknown")
- )
- });
- // Verify tracking is working
- assert!(tracker.is_tracking_block_id(bad_block_id));
- assert!(tracker.is_tracking_block_id(good_block_id));
- assert!(tracker.is_tracking_peer(bad_peer1));
- assert!(tracker.is_tracking_peer(bad_peer2));
- assert!(tracker.is_tracking_peer(good_peer));
- println!("Verified tracking is working");
- }
- // Now create the liar-block-id effect noun for the bad block
- let mut effect_slab = NounSlab::new();
- let liar_block_id_atom = Atom::from_value(&mut effect_slab, "liar-block-id")
- .expect("Failed to create liar-block-id atom");
- // Copy the bad block ID tuple to the effect slab
- let bad_block_id_in_effect = T(&mut effect_slab, &[D(1), D(2), D(3), D(4), D(5)]);
- // Build the noun structure: [%liar-block-id bad-block-id]
- let effect = T(
- &mut effect_slab,
- &[liar_block_id_atom.as_noun(), bad_block_id_in_effect],
- );
- effect_slab.set_root(effect);
- println!("Created liar-block-id effect");
- // Create channel for SwarmAction
- let (swarm_tx, mut swarm_rx) = mpsc::channel(10); // Increased capacity for multiple actions
- println!("Created swarm channel");
- // Call handle_effect with the liar-block-id effect
- println!("Calling handle_effect");
- let result = handle_effect(
- effect_slab,
- swarm_tx,
- EquiXBuilder::new(),
- PeerId::random(), // local peer ID (not relevant for this test)
- vec![], // connected peers (not relevant for this test)
- message_tracker.clone(),
- )
- .await;
- println!("handle_effect result: {:?}", result);
- // Verify the function succeeded
- assert!(result.is_ok(), "handle_effect should succeed");
- println!("Verified handle_effect succeeded");
- // Collect all the block actions
- let mut blocked_peers = Vec::new();
- while let Ok(action) = swarm_rx.try_recv() {
- match action {
- SwarmAction::BlockPeer { peer_id } => {
- println!("Received BlockPeer action for peer: {}", peer_id);
- blocked_peers.push(peer_id);
- }
- other => {
- println!("Unexpected action received: {:?}", other);
- panic!("Expected BlockPeer action, got {:?}", other);
- }
- }
- }
- // Verify both bad peers were blocked
- assert_eq!(
- blocked_peers.len(),
- 2,
- "Should have blocked exactly 2 peers"
- );
- assert!(
- blocked_peers.contains(&bad_peer1),
- "bad_peer1 should be blocked"
- );
- assert!(
- blocked_peers.contains(&bad_peer2),
- "bad_peer2 should be blocked"
- );
- assert!(
- !blocked_peers.contains(&good_peer),
- "good_peer should not be blocked"
- );
- println!("Verified correct peers were blocked");
- // Verify the bad block ID was removed from the tracker
- {
- let tracker = message_tracker.lock().await;
- // Bad block should be removed
- assert!(
- !tracker.is_tracking_block_id(bad_block_id),
- "Bad block ID should be removed"
- );
- // Good block should still be tracked
- assert!(
- tracker.is_tracking_block_id(good_block_id),
- "Good block ID should still be tracked"
- );
- // Bad peers should be removed
- assert!(
- !tracker.is_tracking_peer(bad_peer1),
- "bad_peer1 should be removed from tracker"
- );
- assert!(
- !tracker.is_tracking_peer(bad_peer2),
- "bad_peer2 should be removed from tracker"
- );
- // Good peer should still be tracked
- assert!(
- tracker.is_tracking_peer(good_peer),
- "good_peer should still be tracked"
- );
- println!("Verified tracker state is correct after processing effect");
- }
- }
- #[tokio::test]
- #[cfg_attr(miri, ignore)] // ibig has a memory leak so miri fails this test
- async fn test_seen_block_effect() {
- use equix::EquiXBuilder;
- use tokio::sync::mpsc;
- let mut effect_slab = NounSlab::new();
- let block_id = T(&mut effect_slab, &[D(1), D(2), D(3), D(4), D(5)]);
- let block_id_str = tip5_hash_to_base58(block_id).unwrap_or_else(|_| {
- panic!(
- "Called `expect()` at {}:{} (git sha: {})",
- file!(),
- line!(),
- option_env!("GIT_SHA").unwrap_or("unknown")
- )
- });
- let effect = T(
- &mut effect_slab,
- &[D(tas!(b"seen")), D(tas!(b"block")), block_id],
- );
- effect_slab.set_root(effect);
- let (swarm_tx, _) = mpsc::channel(1);
- let message_tracker = Arc::new(Mutex::new(MessageTracker::new()));
- let message_tracker_clone = Arc::clone(&message_tracker); // Clone the Arc, not the MessageTracker
- let result = handle_effect(
- effect_slab,
- swarm_tx,
- EquiXBuilder::new(),
- PeerId::random(), // local peer ID (not relevant for this test)
- vec![], // connected peers (not relevant for this test)
- message_tracker_clone,
- )
- .await;
- assert!(result.is_ok(), "handle_effect should succeed");
- // Verify that the block id was added to the seen_blocks set
- let tracker = message_tracker.lock().await;
- let contains = tracker.seen_blocks.contains(&block_id_str);
- assert!(contains, "Block ID should be marked as seen");
- }
- #[tokio::test]
- #[cfg_attr(miri, ignore)] // ibig has a memory leak so miri fails this test
- async fn test_seen_tx_effect() {
- use equix::EquiXBuilder;
- use tokio::sync::mpsc;
- let mut effect_slab = NounSlab::new();
- let tx_id = T(&mut effect_slab, &[D(1), D(2), D(3), D(4), D(5)]);
- let tx_id_str = tip5_hash_to_base58(tx_id).unwrap_or_else(|_| {
- panic!(
- "Called `expect()` at {}:{} (git sha: {})",
- file!(),
- line!(),
- option_env!("GIT_SHA").unwrap_or("unknown")
- )
- });
- let effect = T(&mut effect_slab, &[D(tas!(b"seen")), D(tas!(b"tx")), tx_id]);
- effect_slab.set_root(effect);
- let (swarm_tx, _) = mpsc::channel(1);
- let message_tracker = Arc::new(Mutex::new(MessageTracker::new()));
- let message_tracker_clone = Arc::clone(&message_tracker); // Clone the Arc, not the MessageTracker
- let result = handle_effect(
- effect_slab,
- swarm_tx,
- EquiXBuilder::new(),
- PeerId::random(), // local peer ID (not relevant for this test)
- vec![], // connected peers (not relevant for this test)
- message_tracker_clone,
- )
- .await;
- assert!(result.is_ok(), "handle_effect should succeed");
- // Verify that the tx id was added to the seen_blocks set
- let tracker = message_tracker.lock().await;
- let contains = tracker.seen_txs.contains(&tx_id_str);
- assert!(contains, "tx ID should be marked as seen");
- }
- }
- fn dial_initial_peers(
- swarm: &mut Swarm<NockchainBehaviour>,
- peers: &[Multiaddr],
- ) -> Result<(), NockAppError> {
- for peer in peers {
- let peer = peer.clone();
- swarm.dial(peer.clone()).map_err(|e| {
- error!("Failed to dial initial peer {}: {}", peer, e);
- NockAppError::OtherError
- })?;
- }
- Ok(())
- }
|