nc.rs 80 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984
  1. use std::collections::HashMap;
  2. use std::mem::size_of;
  3. use std::str::FromStr;
  4. use std::sync::Arc;
  5. use bytes::Bytes;
  6. use either::{Either, Left, Right};
  7. use futures::{Future, StreamExt};
  8. use libp2p::identify::Event::Received;
  9. use libp2p::identity::Keypair;
  10. use libp2p::kad::NoKnownPeers;
  11. use libp2p::peer_store::Store;
  12. use libp2p::request_response::Event::*;
  13. use libp2p::request_response::Message::*;
  14. use libp2p::request_response::{self};
  15. use libp2p::swarm::SwarmEvent;
  16. use libp2p::{
  17. allow_block_list, connection_limits, memory_connection_limits, Multiaddr, PeerId, Swarm,
  18. };
  19. use nockapp::driver::{IODriverFn, NockAppHandle, PokeResult};
  20. use nockapp::noun::slab::NounSlab;
  21. use nockapp::utils::make_tas;
  22. use nockapp::utils::scry::*;
  23. use nockapp::wire::{Wire, WireRepr};
  24. use nockapp::{AtomExt, NockAppError, NounExt};
  25. use nockvm::noun::{Atom, Noun, D, T};
  26. use nockvm_macros::tas;
  27. use serde_bytes::ByteBuf;
  28. use tokio::sync::{mpsc, Mutex};
  29. use tokio::task::{AbortHandle, JoinError, JoinSet};
  30. use tracing::{debug, error, info, instrument, trace, warn};
  31. use crate::metrics::NockchainP2PMetrics;
  32. use crate::p2p::*;
  33. use crate::p2p_util::{log_fail2ban_ipv4, log_fail2ban_ipv6, MessageTracker, PeerIdExt};
  34. use crate::tip5_util::tip5_hash_to_base58;
  35. //TODO This wire is a placeholder for now. The libp2p driver is entangled with the other types of nockchain pokes
  36. //for historical reasons, and should be disentangled in the future.
  37. pub enum NockchainWire {
  38. Local,
  39. }
  40. impl Wire for NockchainWire {
  41. const VERSION: u64 = 1;
  42. const SOURCE: &'static str = "nc";
  43. }
  44. #[derive(Debug)]
  45. pub enum Libp2pWire {
  46. Gossip(PeerId),
  47. Response(PeerId),
  48. }
  49. impl Libp2pWire {
  50. fn verb(&self) -> &'static str {
  51. match self {
  52. Libp2pWire::Gossip(_) => "gossip",
  53. Libp2pWire::Response(_) => "response",
  54. }
  55. }
  56. fn peer_id(&self) -> &PeerId {
  57. match self {
  58. Libp2pWire::Gossip(peer_id) => peer_id,
  59. Libp2pWire::Response(peer_id) => peer_id,
  60. }
  61. }
  62. }
  63. impl Wire for Libp2pWire {
  64. const VERSION: u64 = 1;
  65. const SOURCE: &'static str = "libp2p";
  66. fn to_wire(&self) -> WireRepr {
  67. let tags = vec![self.verb().into(), "peer-id".into(), self.peer_id().to_base58().into()];
  68. WireRepr::new(Libp2pWire::SOURCE, Libp2pWire::VERSION, tags)
  69. }
  70. }
  71. enum EffectType {
  72. Gossip,
  73. Request,
  74. LiarPeer,
  75. LiarBlockId,
  76. Track,
  77. Seen,
  78. Unknown,
  79. }
  80. impl EffectType {
  81. fn from_noun_slab(noun_slab: &NounSlab) -> Self {
  82. let Ok(effect_cell) = (unsafe { noun_slab.root().as_cell() }) else {
  83. return EffectType::Unknown;
  84. };
  85. let head = effect_cell.head();
  86. let Ok(atom) = head.as_atom() else {
  87. return EffectType::Unknown;
  88. };
  89. let bytes = atom
  90. .to_bytes_until_nul()
  91. .expect("failed to strip null bytes");
  92. match bytes.as_slice() {
  93. b"gossip" => EffectType::Gossip,
  94. b"request" => EffectType::Request,
  95. b"liar-peer" => EffectType::LiarPeer,
  96. b"liar-block-id" => EffectType::LiarBlockId,
  97. b"track" => EffectType::Track,
  98. b"seen" => EffectType::Seen,
  99. _ => EffectType::Unknown,
  100. }
  101. }
  102. }
  103. struct TrackedJoinSet<T> {
  104. inner: JoinSet<T>,
  105. tasks: HashMap<String, AbortHandle>,
  106. }
  107. impl<T: 'static> TrackedJoinSet<T> {
  108. fn new() -> Self {
  109. Self {
  110. inner: JoinSet::new(),
  111. tasks: HashMap::new(),
  112. }
  113. }
  114. fn spawn(&mut self, name: String, task: impl Future<Output = T> + Send + 'static)
  115. where
  116. T: Send + 'static,
  117. {
  118. let handle = self.inner.spawn(task);
  119. self.tasks.insert(name, handle);
  120. }
  121. async fn join_next(&mut self) -> Option<Result<T, JoinError>> {
  122. let result = self.inner.join_next().await;
  123. if result.is_some() {
  124. // Remove the completed task from our tracking
  125. self.tasks.retain(|_, v| !v.is_finished());
  126. }
  127. result
  128. }
  129. // Keep this around for debugging
  130. #[allow(dead_code)]
  131. fn get_running_tasks(&self) -> Vec<String> {
  132. self.tasks.keys().cloned().collect()
  133. }
  134. }
  135. const POKE_VERSION: u64 = 0;
  136. #[instrument(skip(keypair, bind, allowed, limits, memory_limits, equix_builder))]
  137. pub fn make_libp2p_driver(
  138. keypair: Keypair,
  139. bind: Vec<Multiaddr>,
  140. allowed: Option<allow_block_list::Behaviour<allow_block_list::AllowedPeers>>,
  141. limits: connection_limits::ConnectionLimits,
  142. memory_limits: Option<memory_connection_limits::Behaviour>,
  143. initial_peers: &[Multiaddr],
  144. equix_builder: equix::EquiXBuilder,
  145. init_complete_tx: Option<tokio::sync::oneshot::Sender<()>>,
  146. ) -> IODriverFn {
  147. let initial_peers = Vec::from(initial_peers);
  148. Box::new(|mut handle| {
  149. let metrics = NockchainP2PMetrics::register(gnort::global_metrics_registry())
  150. .expect("Failed to register metrics!");
  151. Box::pin(async move {
  152. let mut swarm =
  153. match crate::p2p::start_swarm(keypair, bind, allowed, limits, memory_limits) {
  154. Ok(swarm) => swarm,
  155. Err(e) => {
  156. error!("Could not create swarm: {}", e);
  157. let (_, handle_clone) = handle.dup();
  158. tokio::spawn(async move {
  159. if let Err(e) = handle_clone.exit.exit(1).await {
  160. error!("Failed to send exit signal: {}", e);
  161. }
  162. });
  163. return Err(NockAppError::OtherError);
  164. }
  165. };
  166. let (swarm_tx, mut swarm_rx) = mpsc::channel::<SwarmAction>(1000); // number needs to be high enough to send gossips to peers
  167. let mut join_set = TrackedJoinSet::<Result<(), NockAppError>>::new();
  168. let message_tracker = Arc::new(Mutex::new(MessageTracker::new()));
  169. let mut kad_bootstrap = tokio::time::interval(KADEMLIA_BOOTSTRAP_INTERVAL);
  170. let mut initial_peer_retries_remaining = INITIAL_PEER_RETRIES;
  171. dial_initial_peers(&mut swarm, &initial_peers)?;
  172. if let Some(tx) = init_complete_tx {
  173. let _ = tx.send(());
  174. debug!("libp2p driver initialization complete signal sent");
  175. }
  176. loop {
  177. tokio::select! {
  178. Ok(noun_slab) = handle.next_effect() => {
  179. let _span = tracing::trace_span!("broadcast").entered();
  180. let swarm_tx_clone = swarm_tx.clone();
  181. let equix_builder_clone = equix_builder.clone();
  182. let local_peer_id = *swarm.local_peer_id();
  183. let connected_peers: Vec<PeerId> = swarm.connected_peers().cloned().collect();
  184. let message_tracker_clone = Arc::clone(&message_tracker); // Clone the Arc, not the MessageTracker
  185. join_set.spawn("handle_effect".to_string(), async move {
  186. handle_effect(noun_slab, swarm_tx_clone, equix_builder_clone, local_peer_id, connected_peers, message_tracker_clone).await
  187. });
  188. },
  189. Some(event) = swarm.next() => {
  190. match event {
  191. SwarmEvent::NewListenAddr { address, .. } => {
  192. info!("SEvent: Listening on {address:?}");
  193. },
  194. SwarmEvent::Behaviour(NockchainEvent::Identify(Received { connection_id: _, peer_id, info })) => {
  195. trace!("SEvent: identify_received");
  196. identify_received(&mut swarm, peer_id, info)?;
  197. },
  198. SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => {
  199. debug!("SEvent: {peer_id} is new friend via: {endpoint:?}");
  200. },
  201. SwarmEvent::ConnectionClosed { peer_id, endpoint, cause, .. } => {
  202. info!("SEvent: friendship ended with {peer_id} via: {endpoint:?}. cause: {cause:?}");
  203. // Clean up the message tracker when a peer disconnects
  204. let mut tracker = message_tracker.lock().await;
  205. tracker.remove_peer(&peer_id);
  206. },
  207. SwarmEvent::IncomingConnectionError { local_addr, send_back_addr, error, .. } => {
  208. error!("SEvent: Failed incoming connection from {} to {}: {}",
  209. send_back_addr, local_addr, error);
  210. },
  211. SwarmEvent::Behaviour(NockchainEvent::RequestResponse(Message { connection_id: _, peer, message })) => {
  212. trace!("SEvent: received RequestResponse");
  213. let _span = tracing::debug_span!("SwarmEvent::Behavior(NockchainEvent::RequestResponse(…))").entered();
  214. let swarm_tx_clone = swarm_tx.clone();
  215. let mut equix_builder_clone = equix_builder.clone();
  216. let local_peer_id = *swarm.local_peer_id();
  217. // We have to dup and move a handle back into `handle` to propitiate the borrow checker
  218. let (orig_handle, request_response_handle) = handle.dup();
  219. handle = orig_handle;
  220. let metrics = metrics.clone();
  221. let message_tracker_clone = Arc::clone(&message_tracker); // Clone the Arc, not the MessageTracker
  222. join_set.spawn("handle_request_response".to_string(), async move {
  223. handle_request_response(peer, message, swarm_tx_clone, &mut equix_builder_clone, local_peer_id, request_response_handle, metrics, message_tracker_clone).await
  224. });
  225. },
  226. SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
  227. error!("Failed outgoing connection to {:?}: {}", peer_id, error);
  228. },
  229. SwarmEvent::IncomingConnection {
  230. local_addr,
  231. send_back_addr,
  232. connection_id,
  233. ..
  234. } => {
  235. debug!("SEvent: Incoming connection from {local_addr:?} to {send_back_addr:?} with {connection_id:?}");
  236. },
  237. SwarmEvent::Dialing { peer_id, connection_id } => {
  238. debug!("SEvent: Dialing {peer_id:?} {connection_id}");
  239. },
  240. _ => {
  241. // Handle other swarm events
  242. trace!("SEvent: other swarm event {:?}", event);
  243. }
  244. }
  245. },
  246. Some(swarm_action) = swarm_rx.recv() => {
  247. // We do this because Swarm doesn't implement Send, and so we can't pass it into the tasks
  248. // being spawned in the match cases above.
  249. match swarm_action {
  250. SwarmAction::SendRequest { peer_id, request } => {
  251. trace!("SAction: SendRequest: {peer_id}");
  252. let _ = swarm.behaviour_mut().request_response.send_request(&peer_id, request);
  253. },
  254. SwarmAction::SendResponse { channel, response } => {
  255. trace!("SAction: SendResponse");
  256. let _ = swarm.behaviour_mut().request_response.send_response(channel, response);
  257. },
  258. SwarmAction::BlockPeer { peer_id } => {
  259. warn!("SAction: Blocking peer {peer_id}");
  260. // Block the peer in the allow_block_list
  261. swarm.behaviour_mut().allow_block_list.block_peer(peer_id);
  262. {
  263. // get peer IP address from the swarm
  264. let peer_addresses = swarm.behaviour_mut().peer_store.store().addresses_of_peer(&peer_id);
  265. if let Some(peer_multi_addrs) = peer_addresses {
  266. for multi_addr in peer_multi_addrs {
  267. for protocol in multi_addr.iter() {
  268. match protocol {
  269. libp2p::core::multiaddr::Protocol::Ip4(ip) => {
  270. log_fail2ban_ipv4(&peer_id, &ip);
  271. },
  272. libp2p::core::multiaddr::Protocol::Ip6(ip) => {
  273. log_fail2ban_ipv6(&peer_id, &ip);
  274. },
  275. // TODO: Dns?
  276. _ => {}
  277. }
  278. }
  279. }
  280. } else {
  281. error!("Failed to get peer IP address for peer id: {peer_id}");
  282. };
  283. }
  284. // Disconnect the peer if they're currently connected
  285. let _ = swarm.disconnect_peer_id(peer_id);
  286. },
  287. }
  288. },
  289. _ = kad_bootstrap.tick() => {
  290. // If we don't have any peers, we should retry dialing our initial peers
  291. if let Err(NoKnownPeers())= swarm.behaviour_mut().kad.bootstrap() {
  292. if initial_peer_retries_remaining > 0 {
  293. info!("Failed to bootstrap: {}", NoKnownPeers());
  294. initial_peer_retries_remaining -= 1;
  295. dial_initial_peers(&mut swarm, &initial_peers)?;
  296. } else {
  297. warn!("Failed to bootstrap after {} retries, will not attempt to redial initial peers.", INITIAL_PEER_RETRIES);
  298. }
  299. }
  300. },
  301. Some(result) = join_set.join_next() => {
  302. if let Err(e) = result {
  303. error!("Task error: {:?}", e);
  304. }
  305. },
  306. }
  307. }
  308. })
  309. })
  310. }
  311. #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
  312. /// Network struct (in serde/CBOR) for requests
  313. pub enum NockchainRequest {
  314. /// Request a block or TX from another node, carry PoW
  315. Request {
  316. pow: equix::SolutionByteArray,
  317. nonce: u64,
  318. message: ByteBuf,
  319. },
  320. /// Gossip a block or TX to another node
  321. Gossip { message: ByteBuf },
  322. }
  323. impl NockchainRequest {
  324. /// Make a new "request" which gossips a block or a TX
  325. fn new_gossip(message: &NounSlab) -> NockchainRequest {
  326. let message_bytes = ByteBuf::from(message.jam().as_ref());
  327. NockchainRequest::Gossip {
  328. message: message_bytes,
  329. }
  330. }
  331. /// Make a new request for a block or a TX
  332. fn new_request(
  333. builder: &mut equix::EquiXBuilder,
  334. local_peer_id: &libp2p::PeerId,
  335. remote_peer_id: &libp2p::PeerId,
  336. message: &NounSlab,
  337. ) -> NockchainRequest {
  338. let message_bytes = ByteBuf::from(message.jam().as_ref());
  339. let local_peer_bytes = (*local_peer_id).to_bytes();
  340. let remote_peer_bytes = (*remote_peer_id).to_bytes();
  341. let mut pow_buf = Vec::with_capacity(
  342. size_of::<u64>()
  343. + local_peer_bytes.len()
  344. + remote_peer_bytes.len()
  345. + message_bytes.len(),
  346. );
  347. pow_buf.extend_from_slice(&[0; size_of::<u64>()][..]);
  348. pow_buf.extend_from_slice(&local_peer_bytes[..]);
  349. pow_buf.extend_from_slice(&remote_peer_bytes[..]);
  350. pow_buf.extend_from_slice(&message_bytes[..]);
  351. let mut nonce = 0u64;
  352. let sol_bytes = loop {
  353. {
  354. let nonce_buf = &mut pow_buf[0..size_of::<u64>()];
  355. nonce_buf.copy_from_slice(&nonce.to_le_bytes()[..]);
  356. }
  357. if let Ok(sols) = builder.solve(&pow_buf[..]) {
  358. if !sols.is_empty() {
  359. break sols[0].to_bytes();
  360. }
  361. }
  362. nonce += 1;
  363. };
  364. NockchainRequest::Request {
  365. pow: sol_bytes,
  366. nonce,
  367. message: message_bytes,
  368. }
  369. }
  370. /// Verify the EquiX PoW attached to a request
  371. fn verify_pow(
  372. &self,
  373. builder: &mut equix::EquiXBuilder,
  374. local_peer_id: &libp2p::PeerId,
  375. remote_peer_id: &libp2p::PeerId,
  376. ) -> Result<(), equix::Error> {
  377. match self {
  378. NockchainRequest::Request {
  379. pow,
  380. nonce,
  381. message,
  382. } => {
  383. // This looks backwards, but it's because which node is local and which is remote
  384. // is swapped between generation at the sender and verification at the receiver.
  385. let local_peer_bytes = (*remote_peer_id).to_bytes();
  386. let remote_peer_bytes = (*local_peer_id).to_bytes();
  387. let nonce_bytes = nonce.to_le_bytes();
  388. let mut pow_buf = Vec::with_capacity(
  389. size_of::<u64>()
  390. + local_peer_bytes.len()
  391. + remote_peer_bytes.len()
  392. + message.len(),
  393. );
  394. pow_buf.extend_from_slice(&nonce_bytes[..]);
  395. pow_buf.extend_from_slice(&local_peer_bytes[..]);
  396. pow_buf.extend_from_slice(&remote_peer_bytes[..]);
  397. pow_buf.extend_from_slice(&message[..]);
  398. builder.verify_bytes(&pow_buf[..], pow)
  399. }
  400. NockchainRequest::Gossip { message: _ } => Ok(()),
  401. }
  402. }
  403. }
  404. #[derive(Debug, serde::Serialize, serde::Deserialize)]
  405. /// Responses to Nockchain requests
  406. pub enum NockchainResponse {
  407. /// The requested block or raw-tx
  408. Result { message: ByteBuf },
  409. /// If the request was a gossip, no actual response is needed
  410. Ack,
  411. }
  412. impl NockchainResponse {
  413. fn new_response_result(message: impl AsRef<[u8]>) -> NockchainResponse {
  414. let message_bytes: &[u8] = message.as_ref();
  415. let message_bytebuf = ByteBuf::from(message_bytes.to_vec());
  416. NockchainResponse::Result {
  417. message: message_bytebuf,
  418. }
  419. }
  420. }
  421. // fn emit_fail2ban(peer_ip: u128) -> Result<(), NockAppError> {
  422. // // get peer ip address
  423. // let peer_ip = peer_id.to_base58();
  424. // }
  425. async fn handle_effect(
  426. noun_slab: NounSlab,
  427. swarm_tx: mpsc::Sender<SwarmAction>,
  428. equix_builder: equix::EquiXBuilder,
  429. local_peer_id: PeerId,
  430. connected_peers: Vec<PeerId>,
  431. message_tracker: Arc<Mutex<MessageTracker>>,
  432. ) -> Result<(), NockAppError> {
  433. match EffectType::from_noun_slab(&noun_slab) {
  434. EffectType::Gossip => {
  435. // Get the tail of the gossip effect (after %gossip head)
  436. let mut tail_slab = NounSlab::new();
  437. let gossip_cell = unsafe { noun_slab.root().as_cell()?.tail() };
  438. // Skip version number
  439. // TODO: add version negotiation, reject unknown/incompatible versions
  440. let data_cell = gossip_cell.as_cell()?.tail();
  441. tail_slab.copy_into(data_cell);
  442. // Check if this is a heard-block gossip
  443. let gossip_noun = unsafe { tail_slab.root() };
  444. if let Ok(data_cell) = gossip_noun.as_cell() {
  445. if data_cell.head().eq_bytes(b"heard-block") {
  446. trace!("Gossip effect for heard-block, clearing block cache");
  447. let mut tracker = message_tracker.lock().await;
  448. tracker.block_cache.clear();
  449. }
  450. }
  451. let gossip_request = NockchainRequest::new_gossip(&tail_slab);
  452. for peer_id in connected_peers.clone() {
  453. let gossip_request_clone = gossip_request.clone();
  454. swarm_tx
  455. .send(SwarmAction::SendRequest {
  456. peer_id,
  457. request: gossip_request_clone,
  458. })
  459. .await
  460. .map_err(|_e| NockAppError::OtherError)?;
  461. }
  462. }
  463. EffectType::Request => {
  464. // Extract request details to check if it's a peer-specific request
  465. let request_cell = unsafe { noun_slab.root().as_cell()? };
  466. let request_body = request_cell.tail().as_cell()?;
  467. let request_type = request_body.head().as_direct()?;
  468. let target_peers = if request_type.data() == tas!(b"block") {
  469. let block_cell = request_body.tail().as_cell()?;
  470. if block_cell.head().eq_bytes(b"elders") {
  471. // Extract peer ID from elders request
  472. let elders_cell = block_cell.tail().as_cell()?;
  473. let peer_id_atom = elders_cell.tail().as_atom()?;
  474. if let Ok(bytes) = peer_id_atom.to_bytes_until_nul() {
  475. if let Ok(peer_id) = PeerId::from_bytes(&bytes) {
  476. vec![peer_id]
  477. } else {
  478. connected_peers.clone()
  479. }
  480. } else {
  481. connected_peers.clone()
  482. }
  483. } else {
  484. connected_peers.clone()
  485. }
  486. } else {
  487. connected_peers.clone()
  488. };
  489. for peer_id in target_peers {
  490. let local_peer_id_clone = local_peer_id;
  491. let mut equix_builder_clone = equix_builder.clone();
  492. let request = NockchainRequest::new_request(
  493. &mut equix_builder_clone, &local_peer_id_clone, &peer_id, &noun_slab,
  494. );
  495. swarm_tx
  496. .send(SwarmAction::SendRequest { peer_id, request })
  497. .await
  498. .map_err(|_e| NockAppError::OtherError)?;
  499. }
  500. }
  501. EffectType::LiarPeer => {
  502. let effect_cell = unsafe { noun_slab.root().as_cell()? };
  503. let peer_id_atom = effect_cell.tail().as_atom().map_err(|_| {
  504. NockAppError::IoError(std::io::Error::new(
  505. std::io::ErrorKind::Other,
  506. "Expected peer ID atom in liar-peer effect",
  507. ))
  508. })?;
  509. let bytes = peer_id_atom
  510. .to_bytes_until_nul()
  511. .expect("failed to strip null bytes");
  512. let peer_id_str = String::from_utf8(bytes).map_err(|_| {
  513. NockAppError::IoError(std::io::Error::new(
  514. std::io::ErrorKind::Other,
  515. "Invalid UTF-8 in peer ID",
  516. ))
  517. })?;
  518. let peer_id = PeerId::from_str(&peer_id_str).map_err(|_| {
  519. NockAppError::IoError(std::io::Error::new(
  520. std::io::ErrorKind::Other,
  521. "Invalid peer ID format",
  522. ))
  523. })?;
  524. swarm_tx
  525. .send(SwarmAction::BlockPeer { peer_id })
  526. .await
  527. .map_err(|_| NockAppError::OtherError)?;
  528. }
  529. EffectType::LiarBlockId => {
  530. let effect_cell = unsafe { noun_slab.root().as_cell()? };
  531. let block_id = effect_cell.tail();
  532. // Add the bad block ID
  533. let mut tracker = message_tracker.lock().await;
  534. let peers_to_ban = tracker.process_bad_block_id(block_id)?;
  535. // Ban each peer that sent this block
  536. for peer_id in peers_to_ban {
  537. swarm_tx
  538. .send(SwarmAction::BlockPeer { peer_id })
  539. .await
  540. .map_err(|_| NockAppError::OtherError)?;
  541. }
  542. }
  543. EffectType::Track => {
  544. let effect_cell = unsafe { noun_slab.root().as_cell()? };
  545. let track_cell = effect_cell.tail().as_cell()?;
  546. let action = track_cell.head();
  547. if action.eq_bytes(b"add") {
  548. // Handle [%track %add block-id peer-id]
  549. let data_cell = track_cell.tail().as_cell()?;
  550. let block_id = data_cell.head();
  551. let peer_id_atom = data_cell.tail().as_atom()?;
  552. // Convert peer_id from base58 string to PeerId
  553. let Ok(peer_id) = PeerId::from_noun(peer_id_atom.as_noun()) else {
  554. return Err(NockAppError::OtherError);
  555. };
  556. // Add to message tracker
  557. let mut tracker = message_tracker.lock().await;
  558. tracker.track_block_id_and_peer(block_id, peer_id)?;
  559. } else if action.eq_bytes(b"remove") {
  560. // Handle [%track %remove block-id]
  561. let block_id = track_cell.tail();
  562. // Remove from message tracker
  563. let mut tracker = message_tracker.lock().await;
  564. tracker.remove_block_id(block_id)?;
  565. } else {
  566. return Err(NockAppError::IoError(std::io::Error::new(
  567. std::io::ErrorKind::Other,
  568. "Invalid track action",
  569. )));
  570. }
  571. }
  572. EffectType::Seen => {
  573. let effect_cell = unsafe { noun_slab.root().as_cell()? };
  574. let seen_cell = effect_cell.tail().as_cell()?;
  575. let seen_type = seen_cell.head();
  576. if seen_type.eq_bytes(b"block") {
  577. let block_id = seen_cell.tail().as_cell()?;
  578. let mut tracker = message_tracker.lock().await;
  579. let block_id_str = tip5_hash_to_base58(block_id.as_noun())
  580. .expect("failed to convert block ID to base58");
  581. debug!("seen block id: {:?}", &block_id_str);
  582. tracker.seen_blocks.insert(block_id_str);
  583. } else if seen_type.eq_bytes(b"tx") {
  584. let tx_id = seen_cell.tail().as_cell()?;
  585. let mut tracker = message_tracker.lock().await;
  586. let tx_id_str = tip5_hash_to_base58(tx_id.as_noun())
  587. .expect("failed to convert tx ID to base58");
  588. tracker.seen_txs.insert(tx_id_str);
  589. }
  590. }
  591. EffectType::Unknown => {
  592. // This isn't unexpected - any effect that this driver doesn't handle
  593. // will hit this case.
  594. }
  595. }
  596. Ok(())
  597. }
  598. // TODO: Wrap some of this up.
  599. #[allow(clippy::too_many_arguments)]
  600. async fn handle_request_response(
  601. peer: PeerId,
  602. message: request_response::Message<NockchainRequest, NockchainResponse>,
  603. swarm_tx: mpsc::Sender<SwarmAction>,
  604. equix_builder: &mut equix::EquiXBuilder,
  605. local_peer_id: PeerId,
  606. nockapp: NockAppHandle,
  607. metrics: NockchainP2PMetrics,
  608. message_tracker: Arc<Mutex<MessageTracker>>,
  609. ) -> Result<(), NockAppError> {
  610. trace!("handle_request_response peer: {peer}");
  611. match message {
  612. Request {
  613. request, channel, ..
  614. } => {
  615. let Ok(()) = request.verify_pow(equix_builder, &local_peer_id, &peer) else {
  616. warn!("bad libp2p powork from {peer}, blocking!");
  617. swarm_tx
  618. .send(SwarmAction::BlockPeer { peer_id: peer })
  619. .await
  620. .map_err(|_| NockAppError::OtherError)?;
  621. return Ok(());
  622. };
  623. trace!("handle_request_response: powork verified");
  624. let mut request_slab = NounSlab::new();
  625. match request {
  626. NockchainRequest::Request {
  627. pow: _,
  628. nonce: _,
  629. message,
  630. } => {
  631. trace!("handle_request_response: Request received");
  632. let message_bytes = Bytes::from(message.to_vec());
  633. let request_noun = request_slab.cue_into(message_bytes)?;
  634. let (scry_res_slab, cache_hit) = if let Ok(Some(cache_result)) = {
  635. let mut tracker = message_tracker.lock().await;
  636. tracker.check_cache(&request_noun, &metrics).await
  637. } {
  638. debug!("found cached response for request");
  639. (cache_result, true)
  640. } else {
  641. let scry_slab = request_to_scry_slab(&request_noun)?;
  642. let Some(scry_res_slab) = (match nockapp.try_peek(scry_slab).await {
  643. Ok(Some(res_slab)) => {
  644. metrics.requests_peeked_some.increment();
  645. Some(res_slab)
  646. }
  647. Ok(None) => {
  648. metrics.requests_peeked_none.increment();
  649. trace!(
  650. "No data found for incoming request from: {}, request type: {:?}",
  651. peer,
  652. request_noun.as_cell()?.tail().as_cell().map(|c| c.head())
  653. );
  654. None
  655. }
  656. Err(NockAppError::MPSCFullError(act)) => {
  657. metrics.requests_dropped.increment();
  658. trace!(
  659. "handle_request_response: Request dropped due to backpressure"
  660. );
  661. Err(NockAppError::MPSCFullError(act))?
  662. }
  663. Err(err) => {
  664. metrics.requests_erred.increment();
  665. trace!("handle_request_response: Error getting response");
  666. Err(err)?
  667. }
  668. }) else {
  669. return Ok(());
  670. };
  671. (scry_res_slab, false)
  672. };
  673. let Ok(request_cell) = request_noun.as_cell() else {
  674. error!("request noun not a cell");
  675. return Err(NockAppError::OtherError);
  676. };
  677. let Ok(scry_tag) = request_cell.tail().as_cell()?.head().as_direct() else {
  678. error!("request tag axis not an atom");
  679. return Err(NockAppError::OtherError);
  680. };
  681. trace!("handle_request_response: request cell parsed");
  682. let mut res_slab = NounSlab::new();
  683. let response = match scry_tag.data() {
  684. tas!(b"block") => {
  685. trace!("handle_request_response: block tag");
  686. let scry_res = unsafe { scry_res_slab.root() };
  687. // Extract the request type from under %block
  688. let request_type = request_cell
  689. .tail()
  690. .as_cell()
  691. .and_then(|c| c.tail().as_cell())
  692. .map(|c| c.head().eq_bytes(b"elders"))
  693. .map_err(|_| {
  694. NockAppError::IoError(std::io::Error::new(
  695. std::io::ErrorKind::Other,
  696. "invalid block request structure",
  697. ))
  698. })?;
  699. // Use heard-elders for elders requests, heard-block otherwise
  700. let heard_type = if request_type {
  701. "heard-elders"
  702. } else {
  703. "heard-block"
  704. };
  705. match create_scry_response(scry_res, heard_type, &mut res_slab) {
  706. Left(()) => {
  707. trace!(
  708. "No data found for incoming block request, type: {}",
  709. heard_type
  710. );
  711. return Ok(());
  712. }
  713. Right(result) => {
  714. // cache response
  715. if !cache_hit && heard_type == "heard-block" {
  716. let height = request_cell
  717. .tail()
  718. .as_cell()?
  719. .tail()
  720. .as_cell()?
  721. .tail()
  722. .as_direct()?
  723. .data();
  724. let mut tracker = message_tracker.lock().await;
  725. tracker.block_cache.insert(height, scry_res_slab.clone());
  726. debug!("cacheing block request by height={:?}", height);
  727. }
  728. result?
  729. }
  730. }
  731. }
  732. tas!(b"raw-tx") => {
  733. trace!("handle_request_response: raw-tx tag");
  734. let scry_res = unsafe { scry_res_slab.root() };
  735. match create_scry_response(scry_res, "heard-tx", &mut res_slab) {
  736. Left(()) => {
  737. trace!("No data found for incoming raw-tx request");
  738. return Ok(());
  739. }
  740. Right(result) => {
  741. if !cache_hit {
  742. let tx_id = request_cell.tail().as_cell()?.tail();
  743. let tx_id_str = tip5_hash_to_base58(tx_id)?;
  744. let mut tracker = message_tracker.lock().await;
  745. debug!("cacheing tx request by id={:?}", tx_id_str);
  746. tracker.tx_cache.insert(tx_id_str, scry_res_slab.clone());
  747. }
  748. result?
  749. }
  750. }
  751. }
  752. tag => {
  753. error!("Unknown request tag: {:?}", tag);
  754. return Err(NockAppError::OtherError);
  755. }
  756. };
  757. swarm_tx
  758. .send(SwarmAction::SendResponse { channel, response })
  759. .await
  760. .map_err(|_| NockAppError::OtherError)?;
  761. }
  762. NockchainRequest::Gossip { message } => {
  763. trace!("handle_request_response: Gossip received");
  764. let message_bytes = Bytes::from(message.to_vec());
  765. let request_noun = request_slab.cue_into(message_bytes)?;
  766. trace!("handle_request_response: Gossip noun parsed");
  767. let send_response: tokio::task::JoinHandle<Result<(), NockAppError>> =
  768. tokio::spawn(async move {
  769. let response = NockchainResponse::Ack;
  770. swarm_tx
  771. .send(SwarmAction::SendResponse { channel, response })
  772. .await
  773. .map_err(|_| NockAppError::OtherError)?;
  774. Ok(())
  775. });
  776. let poke_kernel = tokio::task::spawn(async move {
  777. let head = request_noun.as_cell()?.head();
  778. if head.eq_bytes(b"heard-block") {
  779. let page = request_noun.as_cell()?.tail();
  780. let block_id = page.as_cell()?.head();
  781. let block_id_str = tip5_hash_to_base58(block_id)?;
  782. let tracker = message_tracker.lock().await;
  783. if tracker.seen_blocks.contains(&block_id_str) {
  784. debug!("Block already seen, not processing: {:?}", block_id_str);
  785. metrics.block_seen_cache_hits.increment();
  786. return Ok(());
  787. } else {
  788. debug!("block not seen, processing: {:?}", block_id_str);
  789. metrics.block_seen_cache_misses.increment();
  790. }
  791. }
  792. if head.eq_bytes(b"heard-tx") {
  793. let raw_tx = request_noun.as_cell()?.tail();
  794. let tx_id = raw_tx.as_cell()?.head();
  795. let tracker = message_tracker.lock().await;
  796. let tx_id_str = tip5_hash_to_base58(tx_id)?;
  797. if tracker.seen_txs.contains(&tx_id_str) {
  798. debug!("Tx already seen, not processing: {:?}", tx_id_str);
  799. metrics.tx_seen_cache_hits.increment();
  800. return Ok(());
  801. } else {
  802. debug!("tx not seen, processing: {:?}", tx_id_str);
  803. metrics.tx_seen_cache_misses.increment();
  804. }
  805. }
  806. let request_fact = prepend_tas(
  807. &mut request_slab,
  808. "fact",
  809. vec![D(POKE_VERSION), request_noun],
  810. )?;
  811. request_slab.set_root(request_fact);
  812. let wire = Libp2pWire::Gossip(peer);
  813. trace!(
  814. "Poking kernel with wire: {:?} noun: {:?}",
  815. wire,
  816. nockvm::noun::FullDebugCell(unsafe { &request_slab.root().as_cell()? })
  817. );
  818. match nockapp.try_poke(wire.to_wire(), request_slab).await {
  819. Ok(PokeResult::Ack) => {
  820. metrics.gossip_acked.increment();
  821. }
  822. Ok(PokeResult::Nack) => {
  823. metrics.gossip_nacked.increment();
  824. trace!("handle_request_response: gossip poke nacked");
  825. return Ok(());
  826. }
  827. Err(NockAppError::MPSCFullError(act)) => {
  828. metrics.gossip_dropped.increment();
  829. trace!(
  830. "handle_request_response: gossip poke dropped due to backpressure"
  831. );
  832. return Err(NockAppError::MPSCFullError(act));
  833. }
  834. Err(err) => {
  835. metrics.gossip_erred.increment();
  836. trace!("handle_request_response: Poke errored");
  837. return Err(err);
  838. }
  839. };
  840. trace!("handle_request_response: Poke successful");
  841. Ok(())
  842. });
  843. send_response.await??;
  844. poke_kernel.await??;
  845. }
  846. }
  847. }
  848. Response { response, .. } => match response {
  849. NockchainResponse::Result { message } => {
  850. trace!("handle_request_response: Response result received");
  851. let mut response_slab = NounSlab::new();
  852. let message_bytes = Bytes::from(message.to_vec());
  853. let response_noun = response_slab.cue_into(message_bytes)?;
  854. trace!("Received response from peer");
  855. trace!(
  856. "Response noun: {:?}",
  857. nockvm::noun::FullDebugCell(&response_noun.as_cell()?)
  858. );
  859. let response_fact = prepend_tas(
  860. &mut response_slab,
  861. "fact",
  862. vec![D(POKE_VERSION), response_noun],
  863. )?;
  864. response_slab.set_root(response_fact);
  865. let wire = Libp2pWire::Response(peer);
  866. match nockapp.try_poke(wire.to_wire(), response_slab).await {
  867. Ok(PokeResult::Ack) => {
  868. metrics.responses_acked.increment();
  869. }
  870. Ok(PokeResult::Nack) => {
  871. metrics.responses_nacked.increment();
  872. trace!("handle_request_response: Poke failed");
  873. return Ok(());
  874. }
  875. Err(NockAppError::MPSCFullError(act)) => {
  876. trace!("handle_request_response: Response dropped due to backpressure.");
  877. metrics.responses_dropped.increment();
  878. return Err(NockAppError::MPSCFullError(act));
  879. }
  880. Err(_) => {
  881. trace!("handle_request_response: Error poking with response");
  882. metrics.responses_erred.increment();
  883. trace!("Error sending poke")
  884. }
  885. }
  886. trace!("handle_request_response: Poke successful");
  887. }
  888. NockchainResponse::Ack => {
  889. trace!("Received acknowledgement from peer {}", peer);
  890. }
  891. },
  892. }
  893. Ok(())
  894. }
  895. /// Converts a request noun into a scry path that can be used to query the Nockchain state.
  896. ///
  897. /// The request noun is expected to be in the format:
  898. /// `[%request [type data]]` where:
  899. /// - `type` can be either "block" or "raw-tx"
  900. /// - For "block" type:
  901. /// - `data` can be either `[%by-height height]` or `[%elders block-id peer-id]`
  902. /// - For "raw-tx" type:
  903. /// - `data` must be `[%by-id id]`
  904. ///
  905. /// # Arguments
  906. /// * `noun` - The request noun to convert
  907. ///
  908. /// # Returns
  909. /// * `Ok(NounSlab)` - A noun slab containing the constructed scry path
  910. /// * `Err(NockAppError)` - If the request format is invalid or unknown
  911. ///
  912. /// # Examples
  913. /// For a block by height request:
  914. /// [%request [%block [%by-height 123]]] -> [%heavy-n 123 0]
  915. /// For a block by id request:
  916. /// [%request [%block [%elders [1 2 3 4 5] abcDEF]]] -> [%elders base58-block-id peer-id 0]
  917. /// For a raw transaction request:
  918. /// [%request [%raw-tx [%by-id [1 2 3 4 5]]]] -> [%raw-transaction base58-tx-id 0]
  919. fn request_to_scry_slab(noun: &Noun) -> Result<NounSlab, NockAppError> {
  920. let Ok(request) = noun.as_cell() else {
  921. return Err(NockAppError::IoError(std::io::Error::new(
  922. std::io::ErrorKind::Other,
  923. "Unknown request - not a cell",
  924. )));
  925. };
  926. let Ok(tag) = request.head().as_direct() else {
  927. return Err(NockAppError::IoError(std::io::Error::new(
  928. std::io::ErrorKind::Other,
  929. "Unknown request - not a direct",
  930. )));
  931. };
  932. if tag.data() != tas!(b"request") {
  933. return Err(NockAppError::IoError(std::io::Error::new(
  934. std::io::ErrorKind::Other,
  935. "Unknown request - not a request",
  936. )));
  937. }
  938. let mut scry_path_slab = NounSlab::new();
  939. let request_body = request.tail().as_cell()?;
  940. if request_body.head().eq_bytes(b"block") {
  941. let Ok(tail_cell) = request_body.tail().as_cell() else {
  942. return Err(NockAppError::IoError(std::io::Error::new(
  943. std::io::ErrorKind::Other,
  944. "Invalid block request",
  945. )));
  946. };
  947. if tail_cell.head().eq_bytes(b"by-height") && tail_cell.tail().is_atom() {
  948. let pax = T(
  949. &mut scry_path_slab,
  950. &[D(tas!(b"heavy-n")), tail_cell.tail(), D(0)],
  951. );
  952. scry_path_slab.set_root(pax);
  953. trace!(
  954. "block by-height: {:?}",
  955. nockvm::noun::DebugPath(&pax.as_cell()?)
  956. );
  957. return Ok(scry_path_slab);
  958. } else if tail_cell.head().eq_bytes(b"elders") {
  959. let Ok(elders_cell) = tail_cell.tail().as_cell() else {
  960. return Err(NockAppError::IoError(std::io::Error::new(
  961. std::io::ErrorKind::Other,
  962. "Invalid elders request",
  963. )));
  964. };
  965. let block_id_b58 = tip5_hash_to_base58(elders_cell.head())?;
  966. let block_id_atom =
  967. Atom::from_value(&mut scry_path_slab, block_id_b58).unwrap_or_else(|_| {
  968. panic!(
  969. "Called `expect()` at {}:{} (git sha: {})",
  970. file!(),
  971. line!(),
  972. option_env!("GIT_SHA").unwrap_or("unknown")
  973. )
  974. });
  975. let peer_id = elders_cell.tail();
  976. let pax = T(
  977. &mut scry_path_slab,
  978. &[D(tas!(b"elders")), block_id_atom.as_noun(), peer_id, D(0)],
  979. );
  980. debug!(
  981. "block elders: {:?}",
  982. nockvm::noun::DebugPath(&pax.as_cell()?)
  983. );
  984. scry_path_slab.set_root(pax);
  985. return Ok(scry_path_slab);
  986. }
  987. } else if request_body.head().eq_bytes(b"raw-tx") {
  988. let Ok(tail_cell) = request_body.tail().as_cell() else {
  989. return Err(NockAppError::IoError(std::io::Error::new(
  990. std::io::ErrorKind::Other,
  991. "Invalid raw-tx request",
  992. )));
  993. };
  994. if tail_cell.head().eq_bytes(b"by-id") {
  995. let tx_id_b58 = tip5_hash_to_base58(tail_cell.tail())?;
  996. let tx_id_atom =
  997. Atom::from_value(&mut scry_path_slab, tx_id_b58).unwrap_or_else(|_| {
  998. panic!(
  999. "Called `expect()` at {}:{} (git sha: {})",
  1000. file!(),
  1001. line!(),
  1002. option_env!("GIT_SHA").unwrap_or("unknown")
  1003. )
  1004. });
  1005. let raw_tx_tag = make_tas(&mut scry_path_slab, "raw-transaction").as_noun();
  1006. let pax = T(
  1007. &mut scry_path_slab,
  1008. &[raw_tx_tag, tx_id_atom.as_noun(), D(0)],
  1009. );
  1010. debug!("tx by-id: {:?}", nockvm::noun::DebugPath(&pax.as_cell()?));
  1011. scry_path_slab.set_root(pax);
  1012. return Ok(scry_path_slab);
  1013. }
  1014. }
  1015. // log the head of the request body
  1016. Err(NockAppError::IoError(std::io::Error::new(
  1017. std::io::ErrorKind::Other,
  1018. format!("Unknown request - {:?}", request_body.head()),
  1019. )))
  1020. }
  1021. /// Creates a response to a scry request by processing the scry result noun
  1022. ///
  1023. /// # Arguments
  1024. /// * `scry_res` - The noun containing the scry result to process
  1025. /// * `heard_type` - The type of request that was heard (as a string)
  1026. /// * `res_slab` - Mutable reference to the noun slab for storing the response
  1027. ///
  1028. /// # Returns
  1029. /// Either:
  1030. /// - `Left(())` if the scry path was bad or nothing was found
  1031. /// - `Right(Ok(NockchainResponse))` containing the successful response
  1032. /// - `Right(Err(NockAppError))` if there was an error processing the result
  1033. fn create_scry_response(
  1034. scry_res: &Noun,
  1035. heard_type: &str,
  1036. res_slab: &mut NounSlab,
  1037. ) -> Either<(), Result<NockchainResponse, NockAppError>> {
  1038. match ScryResult::from(scry_res) {
  1039. ScryResult::BadPath => {
  1040. warn!("Bad scry path");
  1041. Left(())
  1042. }
  1043. ScryResult::Nothing => {
  1044. trace!("Nothing found at scry path");
  1045. Left(())
  1046. }
  1047. ScryResult::Some(x) => {
  1048. let nouns = vec![x];
  1049. if let Ok(response_noun) = prepend_tas(res_slab, heard_type, nouns) {
  1050. res_slab.set_root(response_noun);
  1051. Right(Ok(NockchainResponse::new_response_result(res_slab.jam())))
  1052. } else {
  1053. error!("Failed to prepend tas to response noun");
  1054. Right(Err(NockAppError::OtherError))
  1055. }
  1056. }
  1057. ScryResult::Invalid => {
  1058. error!("Invalid scry result");
  1059. Right(Err(NockAppError::OtherError))
  1060. }
  1061. }
  1062. }
  1063. /// Prepends a @tas to one or more Nouns.
  1064. ///
  1065. /// # Arguments
  1066. /// * `slab` - The NounSlab containing the noun
  1067. /// * `tas_str` - The tag string to prepend
  1068. /// * `nouns` - The Nouns to include
  1069. ///
  1070. /// # Returns
  1071. /// The noun with @tas prepended
  1072. fn prepend_tas(slab: &mut NounSlab, tas_str: &str, nouns: Vec<Noun>) -> Result<Noun, NockAppError> {
  1073. let tas_atom = Atom::from_value(slab, tas_str)?;
  1074. // Create a cell with the tag and all provided nouns
  1075. let mut cell_elements = Vec::with_capacity(nouns.len() + 1);
  1076. cell_elements.push(tas_atom.as_noun());
  1077. cell_elements.extend(nouns);
  1078. Ok(T(slab, &cell_elements))
  1079. }
  1080. #[cfg(test)]
  1081. mod tests {
  1082. use nockapp::noun::slab::NounSlab;
  1083. use nockvm::noun::{D, T};
  1084. use nockvm_macros::tas;
  1085. use super::*;
  1086. #[test]
  1087. #[cfg_attr(miri, ignore)] // ibig has a memory leak so miri fails this test
  1088. fn test_request_to_scry_slab() {
  1089. // Test block by-height request
  1090. {
  1091. let mut slab = NounSlab::new();
  1092. let height = 123u64;
  1093. let by_height_tas = make_tas(&mut slab, "by-height");
  1094. let by_height = T(&mut slab, &[by_height_tas.as_noun(), D(height)]);
  1095. let block_cell = T(&mut slab, &[D(tas!(b"block")), by_height]);
  1096. let request = T(&mut slab, &[D(tas!(b"request")), block_cell]);
  1097. slab.set_root(request);
  1098. let result_slab = request_to_scry_slab(unsafe { slab.root() }).unwrap_or_else(|_| {
  1099. panic!(
  1100. "Called `expect()` at {}:{} (git sha: {})",
  1101. file!(),
  1102. line!(),
  1103. option_env!("GIT_SHA").unwrap_or("unknown")
  1104. )
  1105. });
  1106. let result = unsafe { result_slab.root() };
  1107. assert!(result.is_cell());
  1108. let result_cell = result.as_cell().unwrap_or_else(|_| {
  1109. panic!(
  1110. "Called `expect()` at {}:{} (git sha: {})",
  1111. file!(),
  1112. line!(),
  1113. option_env!("GIT_SHA").unwrap_or("unknown")
  1114. )
  1115. });
  1116. assert!(result_cell.head().eq_bytes(b"heavy-n"));
  1117. // Get the tail cell and check its components
  1118. let tail_cell = result_cell.tail().as_cell().unwrap_or_else(|_| {
  1119. panic!(
  1120. "Called `expect()` at {}:{} (git sha: {})",
  1121. file!(),
  1122. line!(),
  1123. option_env!("GIT_SHA").unwrap_or("unknown")
  1124. )
  1125. });
  1126. let height_atom = tail_cell.head().as_atom().unwrap_or_else(|_| {
  1127. panic!(
  1128. "Called `expect()` at {}:{} (git sha: {})",
  1129. file!(),
  1130. line!(),
  1131. option_env!("GIT_SHA").unwrap_or("unknown")
  1132. )
  1133. });
  1134. assert_eq!(
  1135. height_atom.as_u64().unwrap_or_else(|err| {
  1136. panic!(
  1137. "Panicked with {err:?} at {}:{} (git sha: {:?})",
  1138. file!(),
  1139. line!(),
  1140. option_env!("GIT_SHA")
  1141. )
  1142. }),
  1143. height
  1144. );
  1145. let tail_atom = tail_cell.tail().as_atom().unwrap_or_else(|_| {
  1146. panic!(
  1147. "Called `expect()` at {}:{} (git sha: {})",
  1148. file!(),
  1149. line!(),
  1150. option_env!("GIT_SHA").unwrap_or("unknown")
  1151. )
  1152. });
  1153. assert_eq!(
  1154. tail_atom.as_u64().unwrap_or_else(|err| {
  1155. panic!(
  1156. "Panicked with {err:?} at {}:{} (git sha: {:?})",
  1157. file!(),
  1158. line!(),
  1159. option_env!("GIT_SHA")
  1160. )
  1161. }),
  1162. 0
  1163. );
  1164. }
  1165. // Test invalid request (not a cell)
  1166. {
  1167. let mut slab = NounSlab::new();
  1168. slab.set_root(D(123));
  1169. let result = request_to_scry_slab(unsafe { slab.root() });
  1170. assert!(result.is_err());
  1171. }
  1172. // Test elders request
  1173. {
  1174. let mut slab = NounSlab::new();
  1175. // Create a 5-tuple [1 2 3 4 5] for the block ID
  1176. let five_tuple = T(&mut slab, &[D(1), D(2), D(3), D(4), D(5)]);
  1177. // Create a random peer ID and store its bytes
  1178. let peer_id = PeerId::random();
  1179. let peer_id_atom =
  1180. Atom::from_value(&mut slab, peer_id.to_base58()).unwrap_or_else(|_| {
  1181. panic!(
  1182. "Called `expect()` at {}:{} (git sha: {})",
  1183. file!(),
  1184. line!(),
  1185. option_env!("GIT_SHA").unwrap_or("unknown")
  1186. )
  1187. });
  1188. let elders_cell = T(&mut slab, &[five_tuple, peer_id_atom.as_noun()]);
  1189. let elders_tas = D(tas!(b"elders"));
  1190. let inner_cell = T(&mut slab, &[elders_tas, elders_cell]);
  1191. let block_cell = T(&mut slab, &[D(tas!(b"block")), inner_cell]);
  1192. let request = T(&mut slab, &[D(tas!(b"request")), block_cell]);
  1193. slab.set_root(request);
  1194. let result_slab = request_to_scry_slab(unsafe { slab.root() }).unwrap_or_else(|_| {
  1195. panic!(
  1196. "Called `expect()` at {}:{} (git sha: {})",
  1197. file!(),
  1198. line!(),
  1199. option_env!("GIT_SHA").unwrap_or("unknown")
  1200. )
  1201. });
  1202. let result = unsafe { result_slab.root() };
  1203. // Verify the structure: [%elders block_id_b58 peer_id 0]
  1204. assert!(result.is_cell());
  1205. let result_cell = result.as_cell().unwrap_or_else(|_| {
  1206. panic!(
  1207. "Called `expect()` at {}:{} (git sha: {})",
  1208. file!(),
  1209. line!(),
  1210. option_env!("GIT_SHA").unwrap_or("unknown")
  1211. )
  1212. });
  1213. // Check %elders tag
  1214. assert!(result_cell.head().eq_bytes(b"elders"));
  1215. // Get the tail cell
  1216. let tail_cell = result_cell.tail().as_cell().unwrap_or_else(|_| {
  1217. panic!(
  1218. "Called `expect()` at {}:{} (git sha: {})",
  1219. file!(),
  1220. line!(),
  1221. option_env!("GIT_SHA").unwrap_or("unknown")
  1222. )
  1223. });
  1224. // Check block ID (should be base58 encoded)
  1225. let block_id_atom = tail_cell.head().as_atom().unwrap_or_else(|_| {
  1226. panic!(
  1227. "Called `expect()` at {}:{} (git sha: {})",
  1228. file!(),
  1229. line!(),
  1230. option_env!("GIT_SHA").unwrap_or("unknown")
  1231. )
  1232. });
  1233. let block_id_bytes = block_id_atom.to_bytes_until_nul().unwrap_or_else(|_| {
  1234. panic!(
  1235. "Called `expect()` at {}:{} (git sha: {})",
  1236. file!(),
  1237. line!(),
  1238. option_env!("GIT_SHA").unwrap_or("unknown")
  1239. )
  1240. });
  1241. let block_id_str = String::from_utf8(block_id_bytes).unwrap_or_else(|_| {
  1242. panic!(
  1243. "Called `expect()` at {}:{} (git sha: {})",
  1244. file!(),
  1245. line!(),
  1246. option_env!("GIT_SHA").unwrap_or("unknown")
  1247. )
  1248. });
  1249. // Get the expected base58 string
  1250. let expected_b58 = tip5_hash_to_base58(five_tuple).unwrap_or_else(|_| {
  1251. panic!(
  1252. "Called `expect()` at {}:{} (git sha: {})",
  1253. file!(),
  1254. line!(),
  1255. option_env!("GIT_SHA").unwrap_or("unknown")
  1256. )
  1257. });
  1258. assert_eq!(block_id_str, expected_b58);
  1259. // Check peer ID
  1260. let peer_cell = tail_cell.tail().as_cell().unwrap_or_else(|_| {
  1261. panic!(
  1262. "Called `expect()` at {}:{} (git sha: {})",
  1263. file!(),
  1264. line!(),
  1265. option_env!("GIT_SHA").unwrap_or("unknown")
  1266. )
  1267. });
  1268. let peer_id_result = peer_cell.head().as_atom().unwrap_or_else(|_| {
  1269. panic!(
  1270. "Called `expect()` at {}:{} (git sha: {})",
  1271. file!(),
  1272. line!(),
  1273. option_env!("GIT_SHA").unwrap_or("unknown")
  1274. )
  1275. });
  1276. let peer_bytes = peer_id_result.to_bytes_until_nul().unwrap_or_else(|_| {
  1277. panic!(
  1278. "Called `expect()` at {}:{} (git sha: {})",
  1279. file!(),
  1280. line!(),
  1281. option_env!("GIT_SHA").unwrap_or("unknown")
  1282. )
  1283. });
  1284. let peer_str = String::from_utf8(peer_bytes).unwrap_or_else(|_| {
  1285. panic!(
  1286. "Called `expect()` at {}:{} (git sha: {})",
  1287. file!(),
  1288. line!(),
  1289. option_env!("GIT_SHA").unwrap_or("unknown")
  1290. )
  1291. });
  1292. assert_eq!(peer_str, peer_id.to_base58());
  1293. // Check final 0
  1294. assert_eq!(
  1295. peer_cell
  1296. .tail()
  1297. .as_direct()
  1298. .unwrap_or_else(|err| {
  1299. panic!(
  1300. "Panicked with {err:?} at {}:{} (git sha: {:?})",
  1301. file!(),
  1302. line!(),
  1303. option_env!("GIT_SHA")
  1304. )
  1305. })
  1306. .data(),
  1307. 0
  1308. );
  1309. }
  1310. // Test invalid elders request (not a cell)
  1311. {
  1312. let mut slab = NounSlab::new();
  1313. let invalid_request = T(
  1314. &mut slab,
  1315. &[D(tas!(b"request")), D(tas!(b"block")), D(tas!(b"elders"))],
  1316. );
  1317. slab.set_root(invalid_request);
  1318. let result = request_to_scry_slab(unsafe { slab.root() });
  1319. assert!(result.is_err());
  1320. }
  1321. }
  1322. #[test]
  1323. #[cfg_attr(miri, ignore)] // equix uses a foreign function so miri fails this tes
  1324. fn test_equix_pow_verification() {
  1325. use equix::EquiXBuilder;
  1326. // Create EquiX builder - new() doesn't return Result
  1327. let mut builder = EquiXBuilder::new();
  1328. // Create test peer IDs
  1329. let local_peer_id = PeerId::random();
  1330. let remote_peer_id = PeerId::random();
  1331. // Create test message
  1332. let message = ByteBuf::from(vec![1, 2, 3, 4, 5]);
  1333. // Create valid request with correct PoW
  1334. let valid_request =
  1335. NockchainRequest::new_request(&mut builder, &local_peer_id, &remote_peer_id, &{
  1336. let mut slab = NounSlab::new();
  1337. let message_noun = Atom::from_value(&mut slab, &message[..])
  1338. .expect("Failed to create message atom")
  1339. .as_noun();
  1340. slab.set_root(message_noun);
  1341. slab
  1342. });
  1343. // Verify the valid request
  1344. match &valid_request {
  1345. NockchainRequest::Request {
  1346. pow,
  1347. nonce,
  1348. message: _,
  1349. } => {
  1350. // Test successful verification
  1351. let result = valid_request.verify_pow(
  1352. &mut builder, &remote_peer_id, // Note: peers are swapped for verification
  1353. &local_peer_id,
  1354. );
  1355. assert!(result.is_ok(), "Valid PoW should verify successfully");
  1356. // Test failed verification with tampered nonce
  1357. let tampered_request = NockchainRequest::Request {
  1358. pow: *pow,
  1359. nonce: nonce + 1, // Tamper with the nonce
  1360. message: message.clone(),
  1361. };
  1362. let result =
  1363. tampered_request.verify_pow(&mut builder, &remote_peer_id, &local_peer_id);
  1364. assert!(result.is_err(), "Tampered nonce should fail verification");
  1365. // Test failed verification with wrong peer order
  1366. let result = valid_request.verify_pow(
  1367. &mut builder, &local_peer_id, // Wrong order - not swapped
  1368. &remote_peer_id,
  1369. );
  1370. assert!(result.is_err(), "Wrong peer order should fail verification");
  1371. }
  1372. _ => panic!("Expected Request variant"),
  1373. }
  1374. // Test that gossip requests always verify successfully
  1375. let gossip_request = NockchainRequest::Gossip {
  1376. message: message.clone(),
  1377. };
  1378. let result = gossip_request.verify_pow(&mut builder, &remote_peer_id, &local_peer_id);
  1379. assert!(
  1380. result.is_ok(),
  1381. "Gossip requests should always verify successfully"
  1382. );
  1383. }
  1384. #[tokio::test]
  1385. #[cfg_attr(miri, ignore)]
  1386. async fn test_liar_peer_effect() {
  1387. use equix::EquiXBuilder;
  1388. use tokio::sync::mpsc;
  1389. // Create a test peer ID and convert to base58
  1390. let peer_id = PeerId::random();
  1391. let peer_id_base58 = peer_id.to_base58();
  1392. // Create the liar-peer effect noun
  1393. let mut effect_slab = NounSlab::new();
  1394. let liar_peer_atom = Atom::from_value(&mut effect_slab, "liar-peer")
  1395. .expect("Failed to create liar-peer atom");
  1396. let peer_id_atom = Atom::from_value(&mut effect_slab, peer_id_base58)
  1397. .expect("Failed to create peer ID atom");
  1398. let effect = T(
  1399. &mut effect_slab,
  1400. &[liar_peer_atom.as_noun(), peer_id_atom.as_noun()],
  1401. );
  1402. effect_slab.set_root(effect);
  1403. // Create channel to receive SwarmAction
  1404. let (swarm_tx, mut swarm_rx) = mpsc::channel(1);
  1405. // Call handle_effect with the liar-peer effect
  1406. let result = handle_effect(
  1407. effect_slab,
  1408. swarm_tx,
  1409. EquiXBuilder::new(),
  1410. PeerId::random(), // local peer ID (not relevant for this test)
  1411. vec![], // connected peers (not relevant for this test)
  1412. Arc::new(Mutex::new(MessageTracker::new())),
  1413. )
  1414. .await;
  1415. // Verify the function succeeded
  1416. assert!(result.is_ok(), "handle_effect should succeed");
  1417. // Verify that a BlockPeer action was sent with the correct peer ID
  1418. match swarm_rx.recv().await {
  1419. Some(SwarmAction::BlockPeer {
  1420. peer_id: blocked_peer,
  1421. }) => {
  1422. assert_eq!(blocked_peer, peer_id, "Wrong peer ID was blocked");
  1423. }
  1424. other => panic!("Expected BlockPeer action, got {:?}", other),
  1425. }
  1426. // Verify no more actions were sent
  1427. assert!(swarm_rx.try_recv().is_err(), "Should only send one action");
  1428. }
  1429. #[tokio::test]
  1430. #[cfg_attr(miri, ignore)] // ibig has a memory leak so miri fails this test
  1431. async fn test_track_add_effect() {
  1432. use equix::EquiXBuilder;
  1433. use tokio::sync::mpsc;
  1434. // Create test peer ID
  1435. let peer_id = PeerId::random();
  1436. let peer_id_base58 = peer_id.to_base58();
  1437. // Create the track add effect noun
  1438. let mut effect_slab = NounSlab::new();
  1439. let track_atom = make_tas(&mut effect_slab, "track");
  1440. let add_atom = make_tas(&mut effect_slab, "add");
  1441. // Create block ID as [1 2 3 4 5]
  1442. let block_id_tuple = T(&mut effect_slab, &[D(1), D(2), D(3), D(4), D(5)]);
  1443. let peer_id_atom = Atom::from_value(&mut effect_slab, peer_id_base58)
  1444. .expect("Failed to create peer ID atom");
  1445. // Build the noun structure: [%track %add block-id peer-id]
  1446. let data_cell = T(&mut effect_slab, &[block_id_tuple, peer_id_atom.as_noun()]);
  1447. let add_cell = T(&mut effect_slab, &[add_atom.as_noun(), data_cell]);
  1448. let track_cell = T(&mut effect_slab, &[track_atom.as_noun(), add_cell]);
  1449. effect_slab.set_root(track_cell);
  1450. // Create message tracker and other required components
  1451. let message_tracker = Arc::new(Mutex::new(MessageTracker::new()));
  1452. let (swarm_tx, _swarm_rx) = mpsc::channel(1);
  1453. // Call handle_effect with the track add effect
  1454. let result = handle_effect(
  1455. effect_slab.clone(), // test fails if we don't clone
  1456. swarm_tx,
  1457. EquiXBuilder::new(),
  1458. PeerId::random(), // local peer ID (not relevant for this test)
  1459. vec![], // connected peers (not relevant for this test)
  1460. message_tracker.clone(),
  1461. )
  1462. .await;
  1463. // Verify the function succeeded
  1464. assert!(result.is_ok(), "handle_effect should succeed");
  1465. // Get the expected block ID string (base58 of [1 2 3 4 5])
  1466. let block_id_str = tip5_hash_to_base58(block_id_tuple).unwrap_or_else(|_| {
  1467. panic!(
  1468. "Called `expect()` at {}:{} (git sha: {})",
  1469. file!(),
  1470. line!(),
  1471. option_env!("GIT_SHA").unwrap_or("unknown")
  1472. )
  1473. });
  1474. // Verify the message tracker state
  1475. let tracker = message_tracker.lock().await;
  1476. // Check block_id_to_peers mapping
  1477. let peers = tracker.get_peers_for_block_id(block_id_tuple);
  1478. assert!(
  1479. peers.contains(&peer_id),
  1480. "Peer ID should be associated with block ID"
  1481. );
  1482. // Check peer_to_block_ids mapping
  1483. let block_ids = tracker.get_block_ids_for_peer(peer_id);
  1484. assert!(
  1485. block_ids.contains(&block_id_str),
  1486. "Block ID should be associated with peer ID"
  1487. );
  1488. }
  1489. #[tokio::test]
  1490. #[cfg_attr(miri, ignore)] // ibig has a memory leak so miri fails this test
  1491. async fn test_track_remove_effect() {
  1492. use equix::EquiXBuilder;
  1493. use tokio::sync::mpsc;
  1494. // Create test peer ID
  1495. let peer_id = PeerId::random();
  1496. // Create a message tracker and add an entry that we'll later remove
  1497. let message_tracker = Arc::new(Mutex::new(MessageTracker::new()));
  1498. // Create block ID as [1 2 3 4 5]
  1499. let mut setup_slab = NounSlab::new();
  1500. let block_id_tuple = T(&mut setup_slab, &[D(1), D(2), D(3), D(4), D(5)]);
  1501. {
  1502. let mut tracker = message_tracker.lock().await;
  1503. tracker
  1504. .track_block_id_and_peer(block_id_tuple, peer_id)
  1505. .unwrap_or_else(|_| {
  1506. panic!(
  1507. "Called `expect()` at {}:{} (git sha: {})",
  1508. file!(),
  1509. line!(),
  1510. option_env!("GIT_SHA").unwrap_or("unknown")
  1511. )
  1512. });
  1513. // Verify it was added correctly
  1514. assert!(tracker.is_tracking_block_id(block_id_tuple));
  1515. assert!(tracker.is_tracking_peer(peer_id));
  1516. }
  1517. // Now create the track remove effect noun
  1518. let mut effect_slab = NounSlab::new();
  1519. let track_atom = make_tas(&mut effect_slab, "track");
  1520. let remove_atom = make_tas(&mut effect_slab, "remove");
  1521. // Copy the block ID tuple to the effect slab
  1522. let block_id_tuple_in_effect = T(&mut effect_slab, &[D(1), D(2), D(3), D(4), D(5)]);
  1523. // Build the noun structure: [%track %remove block-id]
  1524. let remove_cell = T(
  1525. &mut effect_slab,
  1526. &[remove_atom.as_noun(), block_id_tuple_in_effect],
  1527. );
  1528. let track_cell = T(&mut effect_slab, &[track_atom.as_noun(), remove_cell]);
  1529. effect_slab.set_root(track_cell);
  1530. // Create channel for SwarmAction (not used in this test)
  1531. let (swarm_tx, _swarm_rx) = mpsc::channel(1);
  1532. // Call handle_effect with the track remove effect
  1533. let result = handle_effect(
  1534. effect_slab,
  1535. swarm_tx,
  1536. EquiXBuilder::new(),
  1537. PeerId::random(), // local peer ID (not relevant for this test)
  1538. vec![], // connected peers (not relevant for this test)
  1539. message_tracker.clone(),
  1540. )
  1541. .await;
  1542. // Verify the function succeeded
  1543. assert!(result.is_ok(), "handle_effect should succeed");
  1544. // Verify the message tracker state after removal
  1545. let tracker = message_tracker.lock().await;
  1546. // Check that the block ID was removed from block_id_to_peers
  1547. assert!(
  1548. !tracker.is_tracking_block_id(block_id_tuple),
  1549. "Block ID should be removed"
  1550. );
  1551. // Check that the peer's entry in peer_to_block_ids is also removed
  1552. // (since this was the only block ID associated with the peer)
  1553. assert!(
  1554. !tracker.is_tracking_peer(peer_id),
  1555. "Peer ID should be removed since it has no more block IDs"
  1556. );
  1557. }
  1558. #[tokio::test]
  1559. #[cfg_attr(miri, ignore)] // ibig has a memory leak so miri fails this test
  1560. async fn test_liar_block_id_effect() {
  1561. use equix::EquiXBuilder;
  1562. use tokio::sync::mpsc;
  1563. println!("Starting test_liar_block_id_effect");
  1564. // Create test peer IDs
  1565. let bad_peer1 = PeerId::random();
  1566. let bad_peer2 = PeerId::random();
  1567. let good_peer = PeerId::random();
  1568. println!(
  1569. "Created peer_ids: bad1={}, bad2={}, good={}",
  1570. bad_peer1, bad_peer2, good_peer
  1571. );
  1572. // Create a message tracker and add entries
  1573. let message_tracker = Arc::new(Mutex::new(MessageTracker::new()));
  1574. // Create block IDs
  1575. let mut setup_slab = NounSlab::new();
  1576. // Bad block ID as [1 2 3 4 5]
  1577. let bad_block_id = T(&mut setup_slab, &[D(1), D(2), D(3), D(4), D(5)]);
  1578. // Good block ID as [6 7 8 9 10]
  1579. let good_block_id = T(&mut setup_slab, &[D(6), D(7), D(8), D(9), D(10)]);
  1580. println!("Created block_ids");
  1581. {
  1582. let mut tracker = message_tracker.lock().await;
  1583. println!("Tracking block_ids and peers");
  1584. // Associate bad_peer1 with the bad block
  1585. tracker
  1586. .track_block_id_and_peer(bad_block_id, bad_peer1)
  1587. .unwrap_or_else(|_| {
  1588. panic!(
  1589. "Called `expect()` at {}:{} (git sha: {})",
  1590. file!(),
  1591. line!(),
  1592. option_env!("GIT_SHA").unwrap_or("unknown")
  1593. )
  1594. });
  1595. // Associate bad_peer2 with the bad block
  1596. tracker
  1597. .add_peer_if_tracking_block_id(bad_block_id, bad_peer2)
  1598. .unwrap_or_else(|_| {
  1599. panic!(
  1600. "Called `expect()` at {}:{} (git sha: {})",
  1601. file!(),
  1602. line!(),
  1603. option_env!("GIT_SHA").unwrap_or("unknown")
  1604. )
  1605. });
  1606. // Associate good_peer with a different block
  1607. tracker
  1608. .track_block_id_and_peer(good_block_id, good_peer)
  1609. .unwrap_or_else(|_| {
  1610. panic!(
  1611. "Called `expect()` at {}:{} (git sha: {})",
  1612. file!(),
  1613. line!(),
  1614. option_env!("GIT_SHA").unwrap_or("unknown")
  1615. )
  1616. });
  1617. // Verify tracking is working
  1618. assert!(tracker.is_tracking_block_id(bad_block_id));
  1619. assert!(tracker.is_tracking_block_id(good_block_id));
  1620. assert!(tracker.is_tracking_peer(bad_peer1));
  1621. assert!(tracker.is_tracking_peer(bad_peer2));
  1622. assert!(tracker.is_tracking_peer(good_peer));
  1623. println!("Verified tracking is working");
  1624. }
  1625. // Now create the liar-block-id effect noun for the bad block
  1626. let mut effect_slab = NounSlab::new();
  1627. let liar_block_id_atom = Atom::from_value(&mut effect_slab, "liar-block-id")
  1628. .expect("Failed to create liar-block-id atom");
  1629. // Copy the bad block ID tuple to the effect slab
  1630. let bad_block_id_in_effect = T(&mut effect_slab, &[D(1), D(2), D(3), D(4), D(5)]);
  1631. // Build the noun structure: [%liar-block-id bad-block-id]
  1632. let effect = T(
  1633. &mut effect_slab,
  1634. &[liar_block_id_atom.as_noun(), bad_block_id_in_effect],
  1635. );
  1636. effect_slab.set_root(effect);
  1637. println!("Created liar-block-id effect");
  1638. // Create channel for SwarmAction
  1639. let (swarm_tx, mut swarm_rx) = mpsc::channel(10); // Increased capacity for multiple actions
  1640. println!("Created swarm channel");
  1641. // Call handle_effect with the liar-block-id effect
  1642. println!("Calling handle_effect");
  1643. let result = handle_effect(
  1644. effect_slab,
  1645. swarm_tx,
  1646. EquiXBuilder::new(),
  1647. PeerId::random(), // local peer ID (not relevant for this test)
  1648. vec![], // connected peers (not relevant for this test)
  1649. message_tracker.clone(),
  1650. )
  1651. .await;
  1652. println!("handle_effect result: {:?}", result);
  1653. // Verify the function succeeded
  1654. assert!(result.is_ok(), "handle_effect should succeed");
  1655. println!("Verified handle_effect succeeded");
  1656. // Collect all the block actions
  1657. let mut blocked_peers = Vec::new();
  1658. while let Ok(action) = swarm_rx.try_recv() {
  1659. match action {
  1660. SwarmAction::BlockPeer { peer_id } => {
  1661. println!("Received BlockPeer action for peer: {}", peer_id);
  1662. blocked_peers.push(peer_id);
  1663. }
  1664. other => {
  1665. println!("Unexpected action received: {:?}", other);
  1666. panic!("Expected BlockPeer action, got {:?}", other);
  1667. }
  1668. }
  1669. }
  1670. // Verify both bad peers were blocked
  1671. assert_eq!(
  1672. blocked_peers.len(),
  1673. 2,
  1674. "Should have blocked exactly 2 peers"
  1675. );
  1676. assert!(
  1677. blocked_peers.contains(&bad_peer1),
  1678. "bad_peer1 should be blocked"
  1679. );
  1680. assert!(
  1681. blocked_peers.contains(&bad_peer2),
  1682. "bad_peer2 should be blocked"
  1683. );
  1684. assert!(
  1685. !blocked_peers.contains(&good_peer),
  1686. "good_peer should not be blocked"
  1687. );
  1688. println!("Verified correct peers were blocked");
  1689. // Verify the bad block ID was removed from the tracker
  1690. {
  1691. let tracker = message_tracker.lock().await;
  1692. // Bad block should be removed
  1693. assert!(
  1694. !tracker.is_tracking_block_id(bad_block_id),
  1695. "Bad block ID should be removed"
  1696. );
  1697. // Good block should still be tracked
  1698. assert!(
  1699. tracker.is_tracking_block_id(good_block_id),
  1700. "Good block ID should still be tracked"
  1701. );
  1702. // Bad peers should be removed
  1703. assert!(
  1704. !tracker.is_tracking_peer(bad_peer1),
  1705. "bad_peer1 should be removed from tracker"
  1706. );
  1707. assert!(
  1708. !tracker.is_tracking_peer(bad_peer2),
  1709. "bad_peer2 should be removed from tracker"
  1710. );
  1711. // Good peer should still be tracked
  1712. assert!(
  1713. tracker.is_tracking_peer(good_peer),
  1714. "good_peer should still be tracked"
  1715. );
  1716. println!("Verified tracker state is correct after processing effect");
  1717. }
  1718. }
  1719. #[tokio::test]
  1720. #[cfg_attr(miri, ignore)] // ibig has a memory leak so miri fails this test
  1721. async fn test_seen_block_effect() {
  1722. use equix::EquiXBuilder;
  1723. use tokio::sync::mpsc;
  1724. let mut effect_slab = NounSlab::new();
  1725. let block_id = T(&mut effect_slab, &[D(1), D(2), D(3), D(4), D(5)]);
  1726. let block_id_str = tip5_hash_to_base58(block_id).unwrap_or_else(|_| {
  1727. panic!(
  1728. "Called `expect()` at {}:{} (git sha: {})",
  1729. file!(),
  1730. line!(),
  1731. option_env!("GIT_SHA").unwrap_or("unknown")
  1732. )
  1733. });
  1734. let effect = T(
  1735. &mut effect_slab,
  1736. &[D(tas!(b"seen")), D(tas!(b"block")), block_id],
  1737. );
  1738. effect_slab.set_root(effect);
  1739. let (swarm_tx, _) = mpsc::channel(1);
  1740. let message_tracker = Arc::new(Mutex::new(MessageTracker::new()));
  1741. let message_tracker_clone = Arc::clone(&message_tracker); // Clone the Arc, not the MessageTracker
  1742. let result = handle_effect(
  1743. effect_slab,
  1744. swarm_tx,
  1745. EquiXBuilder::new(),
  1746. PeerId::random(), // local peer ID (not relevant for this test)
  1747. vec![], // connected peers (not relevant for this test)
  1748. message_tracker_clone,
  1749. )
  1750. .await;
  1751. assert!(result.is_ok(), "handle_effect should succeed");
  1752. // Verify that the block id was added to the seen_blocks set
  1753. let tracker = message_tracker.lock().await;
  1754. let contains = tracker.seen_blocks.contains(&block_id_str);
  1755. assert!(contains, "Block ID should be marked as seen");
  1756. }
  1757. #[tokio::test]
  1758. #[cfg_attr(miri, ignore)] // ibig has a memory leak so miri fails this test
  1759. async fn test_seen_tx_effect() {
  1760. use equix::EquiXBuilder;
  1761. use tokio::sync::mpsc;
  1762. let mut effect_slab = NounSlab::new();
  1763. let tx_id = T(&mut effect_slab, &[D(1), D(2), D(3), D(4), D(5)]);
  1764. let tx_id_str = tip5_hash_to_base58(tx_id).unwrap_or_else(|_| {
  1765. panic!(
  1766. "Called `expect()` at {}:{} (git sha: {})",
  1767. file!(),
  1768. line!(),
  1769. option_env!("GIT_SHA").unwrap_or("unknown")
  1770. )
  1771. });
  1772. let effect = T(&mut effect_slab, &[D(tas!(b"seen")), D(tas!(b"tx")), tx_id]);
  1773. effect_slab.set_root(effect);
  1774. let (swarm_tx, _) = mpsc::channel(1);
  1775. let message_tracker = Arc::new(Mutex::new(MessageTracker::new()));
  1776. let message_tracker_clone = Arc::clone(&message_tracker); // Clone the Arc, not the MessageTracker
  1777. let result = handle_effect(
  1778. effect_slab,
  1779. swarm_tx,
  1780. EquiXBuilder::new(),
  1781. PeerId::random(), // local peer ID (not relevant for this test)
  1782. vec![], // connected peers (not relevant for this test)
  1783. message_tracker_clone,
  1784. )
  1785. .await;
  1786. assert!(result.is_ok(), "handle_effect should succeed");
  1787. // Verify that the tx id was added to the seen_blocks set
  1788. let tracker = message_tracker.lock().await;
  1789. let contains = tracker.seen_txs.contains(&tx_id_str);
  1790. assert!(contains, "tx ID should be marked as seen");
  1791. }
  1792. }
  1793. fn dial_initial_peers(
  1794. swarm: &mut Swarm<NockchainBehaviour>,
  1795. peers: &[Multiaddr],
  1796. ) -> Result<(), NockAppError> {
  1797. for peer in peers {
  1798. let peer = peer.clone();
  1799. swarm.dial(peer.clone()).map_err(|e| {
  1800. error!("Failed to dial initial peer {}: {}", peer, e);
  1801. NockAppError::OtherError
  1802. })?;
  1803. }
  1804. Ok(())
  1805. }