nc.rs 79 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995
  1. use std::collections::HashMap;
  2. use std::mem::size_of;
  3. use std::str::FromStr;
  4. use std::sync::Arc;
  5. use bytes::Bytes;
  6. use crown::nockapp::driver::{IODriverFn, NockAppHandle, PokeResult};
  7. use crown::nockapp::wire::{Wire, WireRepr};
  8. use crown::nockapp::NockAppError;
  9. use crown::noun::slab::NounSlab;
  10. use crown::utils::make_tas;
  11. use crown::utils::scry::*;
  12. use crown::{AtomExt, NounExt};
  13. use either::{Either, Left, Right};
  14. use futures::{Future, StreamExt};
  15. use libp2p::identify::Event::Received;
  16. use libp2p::request_response::Event::*;
  17. use libp2p::request_response::Message::*;
  18. use libp2p::request_response::{self};
  19. use libp2p::swarm::SwarmEvent;
  20. use libp2p::{PeerId, Swarm};
  21. use serde_bytes::ByteBuf;
  22. use sword::noun::{Atom, Noun, D, T};
  23. use sword_macros::tas;
  24. use tokio::sync::{mpsc, Mutex};
  25. use tokio::task::{AbortHandle, JoinError, JoinSet};
  26. use tracing::{debug, error, info, instrument, trace, warn};
  27. use crate::metrics::NockchainP2PMetrics;
  28. use crate::p2p::*;
  29. use crate::p2p_util::{MessageTracker, PeerIdExt};
  30. use crate::tip5_util::tip5_hash_to_base58;
  31. //TODO This wire is a placeholder for now. The libp2p driver is entangled with the other types of nockchain pokes
  32. //for historical reasons, and should be disentangled in the future.
  33. pub enum NockchainWire {
  34. Local,
  35. }
  36. impl Wire for NockchainWire {
  37. const VERSION: u64 = 1;
  38. const SOURCE: &'static str = "nc";
  39. }
  40. pub enum Libp2pWire {
  41. Gossip(PeerId),
  42. Response(PeerId),
  43. }
  44. impl Libp2pWire {
  45. fn verb(&self) -> &'static str {
  46. match self {
  47. Libp2pWire::Gossip(_) => "gossip",
  48. Libp2pWire::Response(_) => "response",
  49. }
  50. }
  51. fn peer_id(&self) -> &PeerId {
  52. match self {
  53. Libp2pWire::Gossip(peer_id) => peer_id,
  54. Libp2pWire::Response(peer_id) => peer_id,
  55. }
  56. }
  57. }
  58. impl Wire for Libp2pWire {
  59. const VERSION: u64 = 1;
  60. const SOURCE: &'static str = "libp2p";
  61. fn to_wire(&self) -> WireRepr {
  62. let tags = vec![self.verb().into(), "peer-id".into(), self.peer_id().to_base58().into()];
  63. WireRepr::new(Libp2pWire::SOURCE, Libp2pWire::VERSION, tags)
  64. }
  65. }
  66. enum EffectType {
  67. Gossip,
  68. Request,
  69. LiarPeer,
  70. LiarBlockId,
  71. Track,
  72. Seen,
  73. Unknown,
  74. }
  75. impl EffectType {
  76. fn from_noun_slab(noun_slab: &NounSlab) -> Self {
  77. let Ok(effect_cell) = (unsafe { noun_slab.root().as_cell() }) else {
  78. return EffectType::Unknown;
  79. };
  80. let head = effect_cell.head();
  81. let Ok(atom) = head.as_atom() else {
  82. return EffectType::Unknown;
  83. };
  84. let bytes = atom
  85. .to_bytes_until_nul()
  86. .expect("failed to strip null bytes");
  87. match bytes.as_slice() {
  88. b"gossip" => EffectType::Gossip,
  89. b"request" => EffectType::Request,
  90. b"liar-peer" => EffectType::LiarPeer,
  91. b"liar-block-id" => EffectType::LiarBlockId,
  92. b"track" => EffectType::Track,
  93. b"seen" => EffectType::Seen,
  94. _ => EffectType::Unknown,
  95. }
  96. }
  97. }
  98. struct TrackedJoinSet<T> {
  99. inner: JoinSet<T>,
  100. tasks: HashMap<String, AbortHandle>,
  101. }
  102. impl<T: 'static> TrackedJoinSet<T> {
  103. fn new() -> Self {
  104. Self {
  105. inner: JoinSet::new(),
  106. tasks: HashMap::new(),
  107. }
  108. }
  109. fn spawn(&mut self, name: String, task: impl Future<Output = T> + Send + 'static)
  110. where
  111. T: Send + 'static,
  112. {
  113. let handle = self.inner.spawn(task);
  114. self.tasks.insert(name, handle);
  115. }
  116. async fn join_next(&mut self) -> Option<Result<T, JoinError>> {
  117. let result = self.inner.join_next().await;
  118. if result.is_some() {
  119. // Remove the completed task from our tracking
  120. self.tasks.retain(|_, v| !v.is_finished());
  121. }
  122. result
  123. }
  124. // Keep this around for debugging
  125. #[allow(dead_code)]
  126. fn get_running_tasks(&self) -> Vec<String> {
  127. self.tasks.keys().cloned().collect()
  128. }
  129. }
  130. #[derive(Debug, Clone)]
  131. pub struct MiningKeyConfig {
  132. pub share: u64,
  133. pub m: u64,
  134. pub keys: Vec<String>,
  135. }
  136. impl FromStr for MiningKeyConfig {
  137. type Err = String;
  138. fn from_str(s: &str) -> Result<Self, Self::Err> {
  139. // Expected format: "share,m:key1,key2,key3"
  140. let parts: Vec<&str> = s.split(':').collect();
  141. if parts.len() != 2 {
  142. return Err("Invalid format. Expected 'share,m:key1,key2,key3'".to_string());
  143. }
  144. let share_m: Vec<&str> = parts[0].split(',').collect();
  145. if share_m.len() != 2 {
  146. return Err("Invalid share,m format".to_string());
  147. }
  148. let share = share_m[0].parse::<u64>().map_err(|e| e.to_string())?;
  149. let m = share_m[1].parse::<u64>().map_err(|e| e.to_string())?;
  150. let keys: Vec<String> = parts[1].split(',').map(String::from).collect();
  151. Ok(MiningKeyConfig { share, m, keys })
  152. }
  153. }
  154. #[instrument(skip(swarm, equix_builder))]
  155. pub fn make_libp2p_driver(
  156. mut swarm: Swarm<NockchainBehaviour>,
  157. equix_builder: equix::EquiXBuilder,
  158. mining_config: Option<Vec<MiningKeyConfig>>,
  159. init_complete_tx: Option<tokio::sync::oneshot::Sender<()>>,
  160. ) -> IODriverFn {
  161. Box::new(|mut handle| {
  162. let metrics = NockchainP2PMetrics::register(gnort::global_metrics_registry())
  163. .expect("Failed to register metrics!");
  164. Box::pin(async move {
  165. let (swarm_tx, mut swarm_rx) = mpsc::channel::<SwarmAction>(1000); // number needs to be high enough to send gossips to peers
  166. let mut join_set = TrackedJoinSet::<Result<(), NockAppError>>::new();
  167. let message_tracker = Arc::new(Mutex::new(MessageTracker::new()));
  168. send_init_stark_config_poke(&handle).await?;
  169. if let Some(configs) = mining_config {
  170. if configs.len() == 1
  171. && configs[0].share == 1
  172. && configs[0].m == 1
  173. && configs[0].keys.len() == 1
  174. {
  175. // Simple case - use set_mining_key
  176. set_mining_key(&handle, configs[0].keys[0].clone()).await?;
  177. } else {
  178. // Advanced case - use set_mining_key_advanced
  179. set_mining_key_advanced(&handle, configs).await?;
  180. }
  181. enable_mining(&handle, true).await?;
  182. } else {
  183. enable_mining(&handle, false).await?;
  184. }
  185. if let Some(tx) = init_complete_tx {
  186. let _ = tx.send(());
  187. debug!("libp2p driver initialization complete signal sent");
  188. }
  189. loop {
  190. tokio::select! {
  191. Ok(noun_slab) = handle.next_effect() => {
  192. let _span = tracing::trace_span!("broadcast").entered();
  193. let swarm_tx_clone = swarm_tx.clone();
  194. let equix_builder_clone = equix_builder.clone();
  195. let local_peer_id = *swarm.local_peer_id();
  196. let connected_peers: Vec<PeerId> = swarm.connected_peers().cloned().collect();
  197. let message_tracker_clone = Arc::clone(&message_tracker); // Clone the Arc, not the MessageTracker
  198. join_set.spawn("handle_effect".to_string(), async move {
  199. handle_effect(noun_slab, swarm_tx_clone, equix_builder_clone, local_peer_id, connected_peers, message_tracker_clone).await
  200. });
  201. },
  202. Some(event) = swarm.next() => {
  203. match event {
  204. SwarmEvent::NewListenAddr { address, .. } => {
  205. info!("SEvent: Listening on {address:?}");
  206. },
  207. SwarmEvent::Behaviour(NockchainEvent::Identify(Received { connection_id: _, peer_id, info })) => {
  208. trace!("SEvent: identify_received");
  209. identify_received(&mut swarm, peer_id, info)?;
  210. },
  211. SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => {
  212. info!("SEvent: {peer_id} is new friend via: {endpoint:?}");
  213. },
  214. SwarmEvent::ConnectionClosed { peer_id, endpoint, cause, .. } => {
  215. info!("SEvent: friendship ended with {peer_id} via: {endpoint:?}. cause: {cause:?}");
  216. // Clean up the message tracker when a peer disconnects
  217. let mut tracker = message_tracker.lock().await;
  218. tracker.remove_peer(&peer_id);
  219. },
  220. SwarmEvent::IncomingConnectionError { local_addr, send_back_addr, error, .. } => {
  221. error!("SEvent: Failed incoming connection from {} to {}: {}",
  222. send_back_addr, local_addr, error);
  223. },
  224. SwarmEvent::Behaviour(NockchainEvent::RequestResponse(Message { connection_id: _, peer, message })) => {
  225. trace!("SEvent: received RequestResponse");
  226. let _span = tracing::debug_span!("SwarmEvent::Behavior(NockchainEvent::RequestResponse(…))").entered();
  227. let swarm_tx_clone = swarm_tx.clone();
  228. let mut equix_builder_clone = equix_builder.clone();
  229. let local_peer_id = *swarm.local_peer_id();
  230. // We have to dup and move a handle back into `handle` to propitiate the borrow checker
  231. let (orig_handle, request_response_handle) = handle.dup();
  232. handle = orig_handle;
  233. let metrics = metrics.clone();
  234. let message_tracker_clone = Arc::clone(&message_tracker); // Clone the Arc, not the MessageTracker
  235. join_set.spawn("handle_request_response".to_string(), async move {
  236. handle_request_response(peer, message, swarm_tx_clone, &mut equix_builder_clone, local_peer_id, request_response_handle, metrics, message_tracker_clone).await
  237. });
  238. },
  239. SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
  240. error!("Failed outgoing connection to {:?}: {}", peer_id, error);
  241. },
  242. SwarmEvent::IncomingConnection {
  243. local_addr,
  244. send_back_addr,
  245. connection_id,
  246. ..
  247. } => {
  248. debug!("SEvent: Incoming connection from {local_addr:?} to {send_back_addr:?} with {connection_id:?}");
  249. },
  250. SwarmEvent::Dialing { peer_id, connection_id } => {
  251. debug!("SEvent: Dialing {peer_id:?} {connection_id}");
  252. },
  253. _ => {
  254. // Handle other swarm events
  255. trace!("SEvent: other swarm event {:?}", event);
  256. }
  257. }
  258. },
  259. Some(swarm_action) = swarm_rx.recv() => {
  260. // We do this because Swarm doesn't implement Send, and so we can't pass it into the tasks
  261. // being spawned in the match cases above.
  262. match swarm_action {
  263. SwarmAction::SendRequest { peer_id, request } => {
  264. trace!("SAction: SendRequest: {peer_id}");
  265. let _ = swarm.behaviour_mut().request_response.send_request(&peer_id, request);
  266. },
  267. SwarmAction::SendResponse { channel, response } => {
  268. trace!("SAction: SendResponse");
  269. let _ = swarm.behaviour_mut().request_response.send_response(channel, response);
  270. },
  271. SwarmAction::BlockPeer { peer_id } => {
  272. warn!("SAction: Blocking peer {peer_id}");
  273. swarm.behaviour_mut().allow_block_list.block_peer(peer_id);
  274. // Disconnect the peer if they're currently connected
  275. let _ = swarm.disconnect_peer_id(peer_id);
  276. },
  277. }
  278. },
  279. Some(result) = join_set.join_next() => {
  280. if let Err(e) = result {
  281. error!("Task error: {:?}", e);
  282. }
  283. },
  284. }
  285. }
  286. })
  287. })
  288. }
  289. #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
  290. /// Network struct (in serde/CBOR) for requests
  291. pub enum NockchainRequest {
  292. /// Request a block or TX from another node, carry PoW
  293. Request {
  294. pow: equix::SolutionByteArray,
  295. nonce: u64,
  296. message: ByteBuf,
  297. },
  298. /// Gossip a block or TX to another node
  299. Gossip { message: ByteBuf },
  300. }
  301. impl NockchainRequest {
  302. /// Make a new "request" which gossips a block or a TX
  303. fn new_gossip(message: &NounSlab) -> NockchainRequest {
  304. let message_bytes = ByteBuf::from(message.jam().as_ref());
  305. NockchainRequest::Gossip {
  306. message: message_bytes,
  307. }
  308. }
  309. /// Make a new request for a block or a TX
  310. fn new_request(
  311. builder: &mut equix::EquiXBuilder,
  312. local_peer_id: &libp2p::PeerId,
  313. remote_peer_id: &libp2p::PeerId,
  314. message: &NounSlab,
  315. ) -> NockchainRequest {
  316. let message_bytes = ByteBuf::from(message.jam().as_ref());
  317. let local_peer_bytes = (*local_peer_id).to_bytes();
  318. let remote_peer_bytes = (*remote_peer_id).to_bytes();
  319. let mut pow_buf = Vec::with_capacity(
  320. size_of::<u64>()
  321. + local_peer_bytes.len()
  322. + remote_peer_bytes.len()
  323. + message_bytes.len(),
  324. );
  325. pow_buf.extend_from_slice(&[0; size_of::<u64>()][..]);
  326. pow_buf.extend_from_slice(&local_peer_bytes[..]);
  327. pow_buf.extend_from_slice(&remote_peer_bytes[..]);
  328. pow_buf.extend_from_slice(&message_bytes[..]);
  329. let mut nonce = 0u64;
  330. let sol_bytes = loop {
  331. {
  332. let nonce_buf = &mut pow_buf[0..size_of::<u64>()];
  333. nonce_buf.copy_from_slice(&nonce.to_le_bytes()[..]);
  334. }
  335. if let Ok(sols) = builder.solve(&pow_buf[..]) {
  336. if !sols.is_empty() {
  337. break sols[0].to_bytes();
  338. }
  339. }
  340. nonce += 1;
  341. };
  342. NockchainRequest::Request {
  343. pow: sol_bytes,
  344. nonce,
  345. message: message_bytes,
  346. }
  347. }
  348. /// Verify the EquiX PoW attached to a request
  349. fn verify_pow(
  350. &self,
  351. builder: &mut equix::EquiXBuilder,
  352. local_peer_id: &libp2p::PeerId,
  353. remote_peer_id: &libp2p::PeerId,
  354. ) -> Result<(), equix::Error> {
  355. match self {
  356. NockchainRequest::Request {
  357. pow,
  358. nonce,
  359. message,
  360. } => {
  361. // This looks backwards, but it's because which node is local and which is remote
  362. // is swapped between generation at the sender and verification at the receiver.
  363. let local_peer_bytes = (*remote_peer_id).to_bytes();
  364. let remote_peer_bytes = (*local_peer_id).to_bytes();
  365. let nonce_bytes = nonce.to_le_bytes();
  366. let mut pow_buf = Vec::with_capacity(
  367. size_of::<u64>()
  368. + local_peer_bytes.len()
  369. + remote_peer_bytes.len()
  370. + message.len(),
  371. );
  372. pow_buf.extend_from_slice(&nonce_bytes[..]);
  373. pow_buf.extend_from_slice(&local_peer_bytes[..]);
  374. pow_buf.extend_from_slice(&remote_peer_bytes[..]);
  375. pow_buf.extend_from_slice(&message[..]);
  376. builder.verify_bytes(&pow_buf[..], pow)
  377. }
  378. NockchainRequest::Gossip { message: _ } => Ok(()),
  379. }
  380. }
  381. }
  382. #[derive(Debug, serde::Serialize, serde::Deserialize)]
  383. /// Responses to Nockchain requests
  384. pub enum NockchainResponse {
  385. /// The requested block or raw-tx
  386. Result { message: ByteBuf },
  387. /// If the request was a gossip, no actual response is needed
  388. Ack,
  389. }
  390. impl NockchainResponse {
  391. fn new_response_result(message: impl AsRef<[u8]>) -> NockchainResponse {
  392. let message_bytes: &[u8] = message.as_ref();
  393. let message_bytebuf = ByteBuf::from(message_bytes.to_vec());
  394. NockchainResponse::Result {
  395. message: message_bytebuf,
  396. }
  397. }
  398. }
  399. #[instrument(skip(handle, pubkey))]
  400. async fn set_mining_key(
  401. handle: &NockAppHandle,
  402. pubkey: String,
  403. ) -> Result<PokeResult, NockAppError> {
  404. let mut set_mining_key_slab = NounSlab::new();
  405. let set_mining_key = Atom::from_value(&mut set_mining_key_slab, "set-mining-key")
  406. .expect("Failed to create set-mining-key atom");
  407. let pubkey_cord =
  408. Atom::from_value(&mut set_mining_key_slab, pubkey).expect("Failed to create pubkey atom");
  409. let set_mining_key_poke = T(
  410. &mut set_mining_key_slab,
  411. &[D(tas!(b"command")), set_mining_key.as_noun(), pubkey_cord.as_noun()],
  412. );
  413. set_mining_key_slab.set_root(set_mining_key_poke);
  414. let wire = NockchainWire::Local;
  415. handle.poke(wire.to_wire(), set_mining_key_slab).await
  416. }
  417. async fn set_mining_key_advanced(
  418. handle: &NockAppHandle,
  419. configs: Vec<MiningKeyConfig>,
  420. ) -> Result<PokeResult, NockAppError> {
  421. let mut set_mining_key_slab = NounSlab::new();
  422. let set_mining_key_adv = Atom::from_value(&mut set_mining_key_slab, "set-mining-key-advanced")
  423. .expect("Failed to create set-mining-key-advanced atom");
  424. // Create the list of configs
  425. let mut config_list = Vec::new();
  426. for config in configs {
  427. // Create the list of keys
  428. let mut key_list = Vec::new();
  429. for key in config.keys {
  430. let key_atom =
  431. Atom::from_value(&mut set_mining_key_slab, key).expect("Failed to create key atom");
  432. key_list.push(key_atom.as_noun());
  433. }
  434. let keys_cell = T(&mut set_mining_key_slab, &key_list);
  435. // Create the config tuple [share m keys]
  436. let config_tuple = T(
  437. &mut set_mining_key_slab,
  438. &[D(config.share), D(config.m), keys_cell],
  439. );
  440. config_list.push(config_tuple);
  441. }
  442. let configs_cell = T(&mut set_mining_key_slab, &config_list);
  443. let set_mining_key_poke = T(
  444. &mut set_mining_key_slab,
  445. &[D(tas!(b"command")), set_mining_key_adv.as_noun(), configs_cell],
  446. );
  447. set_mining_key_slab.set_root(set_mining_key_poke);
  448. let wire = NockchainWire::Local;
  449. handle.poke(wire.to_wire(), set_mining_key_slab).await
  450. }
  451. //TODO add %set-mining-key-multisig poke
  452. #[instrument(skip(handle))]
  453. async fn enable_mining(handle: &NockAppHandle, enable: bool) -> Result<PokeResult, NockAppError> {
  454. let mut enable_mining_slab = NounSlab::new();
  455. let enable_mining = Atom::from_value(&mut enable_mining_slab, "enable-mining")
  456. .expect("Failed to create enable-mining atom");
  457. let enable_mining_poke = T(
  458. &mut enable_mining_slab,
  459. &[D(tas!(b"command")), enable_mining.as_noun(), D(if enable { 0 } else { 1 })],
  460. );
  461. enable_mining_slab.set_root(enable_mining_poke);
  462. let wire = NockchainWire::Local;
  463. handle.poke(wire.to_wire(), enable_mining_slab).await
  464. }
  465. #[instrument(skip(handle))]
  466. async fn send_init_stark_config_poke(handle: &NockAppHandle) -> Result<PokeResult, NockAppError> {
  467. let mut init_stark_config_slab = NounSlab::new();
  468. let tag = make_tas(&mut init_stark_config_slab, "init-stark-config").as_noun();
  469. let poke = T(
  470. &mut init_stark_config_slab,
  471. &[D(tas!(b"command")), tag, D(0)],
  472. );
  473. init_stark_config_slab.set_root(poke);
  474. let wire = NockchainWire::Local;
  475. handle.poke(wire.to_wire(), init_stark_config_slab).await
  476. }
  477. async fn handle_effect(
  478. noun_slab: NounSlab,
  479. swarm_tx: mpsc::Sender<SwarmAction>,
  480. equix_builder: equix::EquiXBuilder,
  481. local_peer_id: PeerId,
  482. connected_peers: Vec<PeerId>,
  483. message_tracker: Arc<Mutex<MessageTracker>>,
  484. ) -> Result<(), NockAppError> {
  485. match EffectType::from_noun_slab(&noun_slab) {
  486. EffectType::Gossip => {
  487. // remove the %gossip head
  488. let mut tail_slab = NounSlab::new();
  489. tail_slab.copy_into(unsafe { noun_slab.root().as_cell()?.tail() });
  490. // Check if this is a heard-block gossip
  491. let gossip_noun = unsafe { tail_slab.root() };
  492. if let Ok(cell) = gossip_noun.as_cell() {
  493. if cell.head().eq_bytes(b"heard-block") {
  494. trace!("Gossip effect for heard-block, clearing block cache");
  495. let mut tracker = message_tracker.lock().await;
  496. tracker.block_cache.clear();
  497. }
  498. }
  499. let gossip_request = NockchainRequest::new_gossip(&tail_slab);
  500. for peer_id in connected_peers.clone() {
  501. let gossip_request_clone = gossip_request.clone();
  502. swarm_tx
  503. .send(SwarmAction::SendRequest {
  504. peer_id,
  505. request: gossip_request_clone,
  506. })
  507. .await
  508. .map_err(|_e| NockAppError::OtherError)?;
  509. }
  510. }
  511. EffectType::Request => {
  512. // Extract request details to check if it's a peer-specific request
  513. let request_cell = unsafe { noun_slab.root().as_cell()? };
  514. let request_body = request_cell.tail().as_cell()?;
  515. let request_type = request_body.head().as_direct()?;
  516. let target_peers = if request_type.data() == tas!(b"block") {
  517. let block_cell = request_body.tail().as_cell()?;
  518. if block_cell.head().eq_bytes(b"elders") {
  519. // Extract peer ID from elders request
  520. let elders_cell = block_cell.tail().as_cell()?;
  521. let peer_id_atom = elders_cell.tail().as_atom()?;
  522. if let Ok(bytes) = peer_id_atom.to_bytes_until_nul() {
  523. if let Ok(peer_id) = PeerId::from_bytes(&bytes) {
  524. vec![peer_id]
  525. } else {
  526. connected_peers.clone()
  527. }
  528. } else {
  529. connected_peers.clone()
  530. }
  531. } else {
  532. connected_peers.clone()
  533. }
  534. } else {
  535. connected_peers.clone()
  536. };
  537. for peer_id in target_peers {
  538. let local_peer_id_clone = local_peer_id;
  539. let mut equix_builder_clone = equix_builder.clone();
  540. let request = NockchainRequest::new_request(
  541. &mut equix_builder_clone, &local_peer_id_clone, &peer_id, &noun_slab,
  542. );
  543. swarm_tx
  544. .send(SwarmAction::SendRequest { peer_id, request })
  545. .await
  546. .map_err(|_e| NockAppError::OtherError)?;
  547. }
  548. }
  549. EffectType::LiarPeer => {
  550. let effect_cell = unsafe { noun_slab.root().as_cell()? };
  551. let peer_id_atom = effect_cell.tail().as_atom().map_err(|_| {
  552. NockAppError::IoError(std::io::Error::new(
  553. std::io::ErrorKind::Other,
  554. "Expected peer ID atom in liar-peer effect",
  555. ))
  556. })?;
  557. let bytes = peer_id_atom
  558. .to_bytes_until_nul()
  559. .expect("failed to strip null bytes");
  560. let peer_id_str = String::from_utf8(bytes).map_err(|_| {
  561. NockAppError::IoError(std::io::Error::new(
  562. std::io::ErrorKind::Other,
  563. "Invalid UTF-8 in peer ID",
  564. ))
  565. })?;
  566. let peer_id = PeerId::from_str(&peer_id_str).map_err(|_| {
  567. NockAppError::IoError(std::io::Error::new(
  568. std::io::ErrorKind::Other,
  569. "Invalid peer ID format",
  570. ))
  571. })?;
  572. swarm_tx
  573. .send(SwarmAction::BlockPeer { peer_id })
  574. .await
  575. .map_err(|_| NockAppError::OtherError)?;
  576. }
  577. EffectType::LiarBlockId => {
  578. let effect_cell = unsafe { noun_slab.root().as_cell()? };
  579. let block_id = effect_cell.tail();
  580. //add the bad block ID
  581. let mut tracker = message_tracker.lock().await;
  582. let peers_to_ban = tracker.process_bad_block_id(block_id)?;
  583. // Ban each peer that sent this block
  584. for peer_id in peers_to_ban {
  585. swarm_tx
  586. .send(SwarmAction::BlockPeer { peer_id })
  587. .await
  588. .map_err(|_| NockAppError::OtherError)?;
  589. }
  590. }
  591. EffectType::Track => {
  592. let effect_cell = unsafe { noun_slab.root().as_cell()? };
  593. let track_cell = effect_cell.tail().as_cell()?;
  594. let action = track_cell.head();
  595. if action.eq_bytes(b"add") {
  596. // Handle [%track %add block-id peer-id]
  597. let data_cell = track_cell.tail().as_cell()?;
  598. let block_id = data_cell.head();
  599. let peer_id_atom = data_cell.tail().as_atom()?;
  600. // Convert peer_id from base58 string to PeerId
  601. let Ok(peer_id) = PeerId::from_noun(peer_id_atom.as_noun()) else {
  602. return Err(NockAppError::OtherError);
  603. };
  604. // Add to message tracker
  605. let mut tracker = message_tracker.lock().await;
  606. tracker.track_block_id_and_peer(block_id, peer_id)?;
  607. } else if action.eq_bytes(b"remove") {
  608. // Handle [%track %remove block-id]
  609. let block_id = track_cell.tail();
  610. // Remove from message tracker
  611. let mut tracker = message_tracker.lock().await;
  612. tracker.remove_block_id(block_id)?;
  613. } else {
  614. return Err(NockAppError::IoError(std::io::Error::new(
  615. std::io::ErrorKind::Other,
  616. "Invalid track action",
  617. )));
  618. }
  619. }
  620. EffectType::Seen => {
  621. let effect_cell = unsafe { noun_slab.root().as_cell()? };
  622. let seen_cell = effect_cell.tail().as_cell()?;
  623. let seen_type = seen_cell.head();
  624. if seen_type.eq_bytes(b"block") {
  625. let block_id = seen_cell.tail().as_cell()?;
  626. let mut tracker = message_tracker.lock().await;
  627. let block_id_str = tip5_hash_to_base58(block_id.as_noun())
  628. .expect("failed to convert block ID to base58");
  629. debug!("seen block id: {:?}", &block_id_str);
  630. tracker.seen_blocks.insert(block_id_str);
  631. } else if seen_type.eq_bytes(b"tx") {
  632. let tx_id = seen_cell.tail().as_cell()?;
  633. let mut tracker = message_tracker.lock().await;
  634. let tx_id_str = tip5_hash_to_base58(tx_id.as_noun())
  635. .expect("failed to convert tx ID to base58");
  636. tracker.seen_txs.insert(tx_id_str);
  637. }
  638. }
  639. EffectType::Unknown => {
  640. // This isn't unexpected - any effect that this driver doesn't handle
  641. // will hit this case.
  642. }
  643. }
  644. Ok(())
  645. }
  646. // TODO: Wrap some of this up.
  647. #[allow(clippy::too_many_arguments)]
  648. async fn handle_request_response(
  649. peer: PeerId,
  650. message: request_response::Message<NockchainRequest, NockchainResponse>,
  651. swarm_tx: mpsc::Sender<SwarmAction>,
  652. equix_builder: &mut equix::EquiXBuilder,
  653. local_peer_id: PeerId,
  654. nockapp: NockAppHandle,
  655. metrics: NockchainP2PMetrics,
  656. message_tracker: Arc<Mutex<MessageTracker>>,
  657. ) -> Result<(), NockAppError> {
  658. trace!("handle_request_response peer: {peer}");
  659. match message {
  660. Request {
  661. request, channel, ..
  662. } => {
  663. let Ok(()) = request.verify_pow(equix_builder, &local_peer_id, &peer) else {
  664. warn!("bad libp2p powork from {peer}, blocking!");
  665. swarm_tx
  666. .send(SwarmAction::BlockPeer { peer_id: peer })
  667. .await
  668. .map_err(|_| NockAppError::OtherError)?;
  669. return Ok(());
  670. };
  671. trace!("handle_request_response: powork verified");
  672. let mut request_slab = NounSlab::new();
  673. match request {
  674. NockchainRequest::Request {
  675. pow: _,
  676. nonce: _,
  677. message,
  678. } => {
  679. trace!("handle_request_response: Request received");
  680. let message_bytes = Bytes::from(message.to_vec());
  681. let request_noun = request_slab.cue_into(message_bytes)?;
  682. let (scry_res_slab, cache_hit) = if let Ok(Some(cache_result)) = {
  683. let mut tracker = message_tracker.lock().await;
  684. tracker.check_cache(&request_noun, &metrics).await
  685. } {
  686. debug!("found cached response for request");
  687. (cache_result, true)
  688. } else {
  689. let scry_slab = request_to_scry_slab(&request_noun)?;
  690. let Some(scry_res_slab) = (match nockapp.try_peek(scry_slab).await {
  691. Ok(Some(res_slab)) => {
  692. metrics.requests_peeked_some.increment();
  693. Some(res_slab)
  694. }
  695. Ok(None) => {
  696. metrics.requests_peeked_none.increment();
  697. trace!(
  698. "No data found for incoming request from: {}, request type: {:?}",
  699. peer,
  700. request_noun.as_cell()?.tail().as_cell().map(|c| c.head())
  701. );
  702. None
  703. }
  704. Err(NockAppError::MPSCFullError(act)) => {
  705. metrics.requests_dropped.increment();
  706. trace!(
  707. "handle_request_response: Request dropped due to backpressure"
  708. );
  709. Err(NockAppError::MPSCFullError(act))?
  710. }
  711. Err(err) => {
  712. metrics.requests_erred.increment();
  713. trace!("handle_request_response: Error getting response");
  714. Err(err)?
  715. }
  716. }) else {
  717. return Ok(());
  718. };
  719. (scry_res_slab, false)
  720. };
  721. let Ok(request_cell) = request_noun.as_cell() else {
  722. error!("request noun not a cell");
  723. return Err(NockAppError::OtherError);
  724. };
  725. let Ok(scry_tag) = request_cell.tail().as_cell()?.head().as_direct() else {
  726. error!("request tag axis not an atom");
  727. return Err(NockAppError::OtherError);
  728. };
  729. trace!("handle_request_response: request cell parsed");
  730. let mut res_slab = NounSlab::new();
  731. let response = match scry_tag.data() {
  732. tas!(b"block") => {
  733. trace!("handle_request_response: block tag");
  734. let scry_res = unsafe { scry_res_slab.root() };
  735. // Extract the request type from under %block
  736. let request_type = request_cell
  737. .tail()
  738. .as_cell()
  739. .and_then(|c| c.tail().as_cell())
  740. .map(|c| c.head().eq_bytes(b"elders"))
  741. .map_err(|_| {
  742. NockAppError::IoError(std::io::Error::new(
  743. std::io::ErrorKind::Other,
  744. "invalid block request structure",
  745. ))
  746. })?;
  747. // Use heard-elders for elders requests, heard-block otherwise
  748. let heard_type = if request_type {
  749. "heard-elders"
  750. } else {
  751. "heard-block"
  752. };
  753. match create_scry_response(scry_res, heard_type, &mut res_slab) {
  754. Left(()) => {
  755. trace!(
  756. "No data found for incoming block request, type: {}",
  757. heard_type
  758. );
  759. return Ok(());
  760. }
  761. Right(result) => {
  762. // cache response
  763. if !cache_hit && heard_type == "heard-block" {
  764. let height = request_cell
  765. .tail()
  766. .as_cell()?
  767. .tail()
  768. .as_cell()?
  769. .tail()
  770. .as_direct()?
  771. .data();
  772. let mut tracker = message_tracker.lock().await;
  773. tracker.block_cache.insert(height, scry_res_slab.clone());
  774. debug!("cacheing block request by height={:?}", height);
  775. }
  776. result?
  777. }
  778. }
  779. }
  780. tas!(b"raw-tx") => {
  781. trace!("handle_request_response: raw-tx tag");
  782. let scry_res = unsafe { scry_res_slab.root() };
  783. match create_scry_response(scry_res, "heard-tx", &mut res_slab) {
  784. Left(()) => {
  785. trace!("No data found for incoming raw-tx request");
  786. return Ok(());
  787. }
  788. Right(result) => {
  789. if !cache_hit {
  790. let tx_id = request_cell.tail().as_cell()?.tail();
  791. let tx_id_str = tip5_hash_to_base58(tx_id)?;
  792. let mut tracker = message_tracker.lock().await;
  793. debug!("cacheing tx request by id={:?}", tx_id_str);
  794. tracker.tx_cache.insert(tx_id_str, scry_res_slab.clone());
  795. }
  796. result?
  797. }
  798. }
  799. }
  800. tag => {
  801. error!("Unknown request tag: {:?}", tag);
  802. return Err(NockAppError::OtherError);
  803. }
  804. };
  805. swarm_tx
  806. .send(SwarmAction::SendResponse { channel, response })
  807. .await
  808. .map_err(|_| NockAppError::OtherError)?;
  809. }
  810. NockchainRequest::Gossip { message } => {
  811. trace!("handle_request_response: Gossip received");
  812. let message_bytes = Bytes::from(message.to_vec());
  813. let request_noun = request_slab.cue_into(message_bytes)?;
  814. trace!("handle_request_response: Gossip noun parsed");
  815. let head = request_noun.as_cell()?.head();
  816. if head.eq_bytes(b"heard-block") {
  817. let page = request_noun.as_cell()?.tail();
  818. let block_id = page.as_cell()?.head();
  819. let block_id_str = tip5_hash_to_base58(block_id)?;
  820. let tracker = message_tracker.lock().await;
  821. if tracker.seen_blocks.contains(&block_id_str) {
  822. debug!("Block already seen, not processing: {:?}", block_id_str);
  823. metrics.block_seen_cache_hits.increment();
  824. return Ok(());
  825. } else {
  826. debug!("block not seen, processing: {:?}", block_id_str);
  827. metrics.block_seen_cache_misses.increment();
  828. }
  829. }
  830. if head.eq_bytes(b"heard-tx") {
  831. let raw_tx = request_noun.as_cell()?.tail();
  832. let tx_id = raw_tx.as_cell()?.head();
  833. let tracker = message_tracker.lock().await;
  834. let tx_id_str = tip5_hash_to_base58(tx_id)?;
  835. if tracker.seen_txs.contains(&tx_id_str) {
  836. debug!("Tx already seen, not processing: {:?}", tx_id_str);
  837. metrics.tx_seen_cache_hits.increment();
  838. return Ok(());
  839. } else {
  840. debug!("tx not seen, processing: {:?}", tx_id_str);
  841. metrics.tx_seen_cache_misses.increment();
  842. }
  843. }
  844. let request_fact = prepend_tas(&mut request_slab, "fact", request_noun)?;
  845. request_slab.set_root(request_fact);
  846. let wire = Libp2pWire::Gossip(peer);
  847. match nockapp.try_poke(wire.to_wire(), request_slab).await {
  848. Ok(PokeResult::Ack) => {
  849. metrics.gossip_acked.increment();
  850. }
  851. Ok(PokeResult::Nack) => {
  852. metrics.gossip_nacked.increment();
  853. trace!("handle_request_response: gossip poke nacked");
  854. return Ok(());
  855. }
  856. Err(NockAppError::MPSCFullError(act)) => {
  857. metrics.gossip_dropped.increment();
  858. trace!(
  859. "handle_request_response: gossip poke dropped due to backpressure"
  860. );
  861. return Err(NockAppError::MPSCFullError(act));
  862. }
  863. Err(err) => {
  864. metrics.gossip_erred.increment();
  865. trace!("handle_request_response: Poke errored");
  866. return Err(err);
  867. }
  868. };
  869. trace!("handle_request_response: Poke successful");
  870. let response = NockchainResponse::Ack;
  871. swarm_tx
  872. .send(SwarmAction::SendResponse { channel, response })
  873. .await
  874. .map_err(|_| NockAppError::OtherError)?;
  875. }
  876. }
  877. }
  878. Response { response, .. } => match response {
  879. NockchainResponse::Result { message } => {
  880. trace!("handle_request_response: Response result received");
  881. let mut response_slab = NounSlab::new();
  882. let message_bytes = Bytes::from(message.to_vec());
  883. let response_noun = response_slab.cue_into(message_bytes)?;
  884. trace!("Received response from peer");
  885. let response_fact = prepend_tas(&mut response_slab, "fact", response_noun)?;
  886. response_slab.set_root(response_fact);
  887. let wire = Libp2pWire::Response(peer);
  888. match nockapp.try_poke(wire.to_wire(), response_slab).await {
  889. Ok(PokeResult::Ack) => {
  890. metrics.responses_acked.increment();
  891. }
  892. Ok(PokeResult::Nack) => {
  893. metrics.responses_nacked.increment();
  894. trace!("handle_request_response: Poke failed");
  895. return Ok(());
  896. }
  897. Err(NockAppError::MPSCFullError(act)) => {
  898. trace!("handle_request_response: Response dropped due to backpressure.");
  899. metrics.responses_dropped.increment();
  900. return Err(NockAppError::MPSCFullError(act));
  901. }
  902. Err(_) => {
  903. trace!("handle_request_response: Error poking with response");
  904. metrics.responses_erred.increment();
  905. trace!("Error sending poke")
  906. }
  907. }
  908. trace!("handle_request_response: Poke successful");
  909. }
  910. NockchainResponse::Ack => {
  911. trace!("Received acknowledgement from peer {}", peer);
  912. }
  913. },
  914. }
  915. Ok(())
  916. }
  917. /// Converts a request noun into a scry path that can be used to query the Nockchain state.
  918. ///
  919. /// The request noun is expected to be in the format:
  920. /// `[%request [type data]]` where:
  921. /// - `type` can be either "block" or "raw-tx"
  922. /// - For "block" type:
  923. /// - `data` can be either `[%by-height height]` or `[%elders block-id peer-id]`
  924. /// - For "raw-tx" type:
  925. /// - `data` must be `[%by-id id]`
  926. ///
  927. /// # Arguments
  928. /// * `noun` - The request noun to convert
  929. ///
  930. /// # Returns
  931. /// * `Ok(NounSlab)` - A noun slab containing the constructed scry path
  932. /// * `Err(NockAppError)` - If the request format is invalid or unknown
  933. ///
  934. /// # Examples
  935. /// For a block by height request:
  936. /// [%request [%block [%by-height 123]]] -> [%heavy-n 123 0]
  937. /// For a block by id request:
  938. /// [%request [%block [%elders [1 2 3 4 5] abcDEF]]] -> [%elders base58-block-id peer-id 0]
  939. /// For a raw transaction request:
  940. /// [%request [%raw-tx [%by-id [1 2 3 4 5]]]] -> [%raw-transaction base58-tx-id 0]
  941. fn request_to_scry_slab(noun: &Noun) -> Result<NounSlab, NockAppError> {
  942. let Ok(request) = noun.as_cell() else {
  943. return Err(NockAppError::IoError(std::io::Error::new(
  944. std::io::ErrorKind::Other,
  945. "Unknown request - not a cell",
  946. )));
  947. };
  948. let Ok(tag) = request.head().as_direct() else {
  949. return Err(NockAppError::IoError(std::io::Error::new(
  950. std::io::ErrorKind::Other,
  951. "Unknown request - not a direct",
  952. )));
  953. };
  954. if tag.data() != tas!(b"request") {
  955. return Err(NockAppError::IoError(std::io::Error::new(
  956. std::io::ErrorKind::Other,
  957. "Unknown request - not a request",
  958. )));
  959. }
  960. let mut scry_path_slab = NounSlab::new();
  961. let request_body = request.tail().as_cell()?;
  962. if request_body.head().eq_bytes(b"block") {
  963. let Ok(tail_cell) = request_body.tail().as_cell() else {
  964. return Err(NockAppError::IoError(std::io::Error::new(
  965. std::io::ErrorKind::Other,
  966. "Invalid block request",
  967. )));
  968. };
  969. if tail_cell.head().eq_bytes(b"by-height") && tail_cell.tail().is_atom() {
  970. let pax = T(
  971. &mut scry_path_slab,
  972. &[D(tas!(b"heavy-n")), tail_cell.tail(), D(0)],
  973. );
  974. scry_path_slab.set_root(pax);
  975. info!(
  976. "block by-height: {:?}",
  977. sword::noun::DebugPath(&pax.as_cell()?)
  978. );
  979. return Ok(scry_path_slab);
  980. } else if tail_cell.head().eq_bytes(b"elders") {
  981. let Ok(elders_cell) = tail_cell.tail().as_cell() else {
  982. return Err(NockAppError::IoError(std::io::Error::new(
  983. std::io::ErrorKind::Other,
  984. "Invalid elders request",
  985. )));
  986. };
  987. let block_id_b58 = tip5_hash_to_base58(elders_cell.head())?;
  988. let block_id_atom =
  989. Atom::from_value(&mut scry_path_slab, block_id_b58).unwrap_or_else(|_| {
  990. panic!(
  991. "Called `expect()` at {}:{} (git sha: {})",
  992. file!(),
  993. line!(),
  994. option_env!("GIT_SHA").unwrap_or("unknown")
  995. )
  996. });
  997. let peer_id = elders_cell.tail();
  998. let pax = T(
  999. &mut scry_path_slab,
  1000. &[D(tas!(b"elders")), block_id_atom.as_noun(), peer_id, D(0)],
  1001. );
  1002. info!(
  1003. "block elders: {:?}",
  1004. sword::noun::DebugPath(&pax.as_cell()?)
  1005. );
  1006. scry_path_slab.set_root(pax);
  1007. return Ok(scry_path_slab);
  1008. }
  1009. } else if request_body.head().eq_bytes(b"raw-tx") {
  1010. let Ok(tail_cell) = request_body.tail().as_cell() else {
  1011. return Err(NockAppError::IoError(std::io::Error::new(
  1012. std::io::ErrorKind::Other,
  1013. "Invalid raw-tx request",
  1014. )));
  1015. };
  1016. if tail_cell.head().eq_bytes(b"by-id") {
  1017. let tx_id_b58 = tip5_hash_to_base58(tail_cell.tail())?;
  1018. let tx_id_atom =
  1019. Atom::from_value(&mut scry_path_slab, tx_id_b58).unwrap_or_else(|_| {
  1020. panic!(
  1021. "Called `expect()` at {}:{} (git sha: {})",
  1022. file!(),
  1023. line!(),
  1024. option_env!("GIT_SHA").unwrap_or("unknown")
  1025. )
  1026. });
  1027. let raw_tx_tag = make_tas(&mut scry_path_slab, "raw-transaction").as_noun();
  1028. let pax = T(
  1029. &mut scry_path_slab,
  1030. &[raw_tx_tag, tx_id_atom.as_noun(), D(0)],
  1031. );
  1032. info!("tx by-id: {:?}", sword::noun::DebugPath(&pax.as_cell()?));
  1033. scry_path_slab.set_root(pax);
  1034. return Ok(scry_path_slab);
  1035. }
  1036. }
  1037. // log the head of the request body
  1038. Err(NockAppError::IoError(std::io::Error::new(
  1039. std::io::ErrorKind::Other,
  1040. format!("Unknown request - {:?}", request_body.head()),
  1041. )))
  1042. }
  1043. /// Creates a response to a scry request by processing the scry result noun
  1044. ///
  1045. /// # Arguments
  1046. /// * `scry_res` - The noun containing the scry result to process
  1047. /// * `heard_type` - The type of request that was heard (as a string)
  1048. /// * `res_slab` - Mutable reference to the noun slab for storing the response
  1049. ///
  1050. /// # Returns
  1051. /// Either:
  1052. /// - `Left(())` if the scry path was bad or nothing was found
  1053. /// - `Right(Ok(NockchainResponse))` containing the successful response
  1054. /// - `Right(Err(NockAppError))` if there was an error processing the result
  1055. fn create_scry_response(
  1056. scry_res: &Noun,
  1057. heard_type: &str,
  1058. res_slab: &mut NounSlab,
  1059. ) -> Either<(), Result<NockchainResponse, NockAppError>> {
  1060. match ScryResult::from(scry_res) {
  1061. ScryResult::BadPath => {
  1062. warn!("Bad scry path");
  1063. Left(())
  1064. }
  1065. ScryResult::Nothing => {
  1066. trace!("Nothing found at scry path");
  1067. Left(())
  1068. }
  1069. ScryResult::Some(x) => {
  1070. if let Ok(response_noun) = prepend_tas(res_slab, heard_type, x) {
  1071. res_slab.set_root(response_noun);
  1072. Right(Ok(NockchainResponse::new_response_result(res_slab.jam())))
  1073. } else {
  1074. error!("Failed to prepend tas to response noun");
  1075. Right(Err(NockAppError::OtherError))
  1076. }
  1077. }
  1078. ScryResult::Invalid => {
  1079. error!("Invalid scry result");
  1080. Right(Err(NockAppError::OtherError))
  1081. }
  1082. }
  1083. }
  1084. /// Prepends a @tas to a Noun.
  1085. ///
  1086. /// # Arguments
  1087. /// * `slab` - The NounSlab containing the noun
  1088. /// * `noun` - The Noun to prepend @tas to
  1089. ///
  1090. /// # Returns
  1091. /// The noun with @tas prepended
  1092. fn prepend_tas(slab: &mut NounSlab, tas_str: &str, noun: Noun) -> Result<Noun, NockAppError> {
  1093. let tas_atom = Atom::from_value(slab, tas_str)?;
  1094. Ok(T(slab, &[tas_atom.as_noun(), noun]))
  1095. }
  1096. #[cfg(test)]
  1097. mod tests {
  1098. use crown::noun::slab::NounSlab;
  1099. use sword::noun::{D, T};
  1100. use sword_macros::tas;
  1101. use super::*;
  1102. #[test]
  1103. #[cfg_attr(miri, ignore)] // ibig has a memory leak so miri fails this test
  1104. fn test_request_to_scry_slab() {
  1105. // Test block by-height request
  1106. {
  1107. let mut slab = NounSlab::new();
  1108. let height = 123u64;
  1109. let by_height_tas = make_tas(&mut slab, "by-height");
  1110. let by_height = T(&mut slab, &[by_height_tas.as_noun(), D(height)]);
  1111. let block_cell = T(&mut slab, &[D(tas!(b"block")), by_height]);
  1112. let request = T(&mut slab, &[D(tas!(b"request")), block_cell]);
  1113. slab.set_root(request);
  1114. let result_slab = request_to_scry_slab(unsafe { slab.root() }).unwrap_or_else(|_| {
  1115. panic!(
  1116. "Called `expect()` at {}:{} (git sha: {})",
  1117. file!(),
  1118. line!(),
  1119. option_env!("GIT_SHA").unwrap_or("unknown")
  1120. )
  1121. });
  1122. let result = unsafe { result_slab.root() };
  1123. assert!(result.is_cell());
  1124. let result_cell = result.as_cell().unwrap_or_else(|_| {
  1125. panic!(
  1126. "Called `expect()` at {}:{} (git sha: {})",
  1127. file!(),
  1128. line!(),
  1129. option_env!("GIT_SHA").unwrap_or("unknown")
  1130. )
  1131. });
  1132. assert!(result_cell.head().eq_bytes(b"heavy-n"));
  1133. // Get the tail cell and check its components
  1134. let tail_cell = result_cell.tail().as_cell().unwrap_or_else(|_| {
  1135. panic!(
  1136. "Called `expect()` at {}:{} (git sha: {})",
  1137. file!(),
  1138. line!(),
  1139. option_env!("GIT_SHA").unwrap_or("unknown")
  1140. )
  1141. });
  1142. let height_atom = tail_cell.head().as_atom().unwrap_or_else(|_| {
  1143. panic!(
  1144. "Called `expect()` at {}:{} (git sha: {})",
  1145. file!(),
  1146. line!(),
  1147. option_env!("GIT_SHA").unwrap_or("unknown")
  1148. )
  1149. });
  1150. assert_eq!(
  1151. height_atom.as_u64().unwrap_or_else(|err| {
  1152. panic!(
  1153. "Panicked with {err:?} at {}:{} (git sha: {:?})",
  1154. file!(),
  1155. line!(),
  1156. option_env!("GIT_SHA")
  1157. )
  1158. }),
  1159. height
  1160. );
  1161. let tail_atom = tail_cell.tail().as_atom().unwrap_or_else(|_| {
  1162. panic!(
  1163. "Called `expect()` at {}:{} (git sha: {})",
  1164. file!(),
  1165. line!(),
  1166. option_env!("GIT_SHA").unwrap_or("unknown")
  1167. )
  1168. });
  1169. assert_eq!(
  1170. tail_atom.as_u64().unwrap_or_else(|err| {
  1171. panic!(
  1172. "Panicked with {err:?} at {}:{} (git sha: {:?})",
  1173. file!(),
  1174. line!(),
  1175. option_env!("GIT_SHA")
  1176. )
  1177. }),
  1178. 0
  1179. );
  1180. }
  1181. // Test invalid request (not a cell)
  1182. {
  1183. let mut slab = NounSlab::new();
  1184. slab.set_root(D(123));
  1185. let result = request_to_scry_slab(unsafe { slab.root() });
  1186. assert!(result.is_err());
  1187. }
  1188. // Test elders request
  1189. {
  1190. let mut slab = NounSlab::new();
  1191. // Create a 5-tuple [1 2 3 4 5] for the block ID
  1192. let five_tuple = T(&mut slab, &[D(1), D(2), D(3), D(4), D(5)]);
  1193. // Create a random peer ID and store its bytes
  1194. let peer_id = PeerId::random();
  1195. let peer_id_atom =
  1196. Atom::from_value(&mut slab, peer_id.to_base58()).unwrap_or_else(|_| {
  1197. panic!(
  1198. "Called `expect()` at {}:{} (git sha: {})",
  1199. file!(),
  1200. line!(),
  1201. option_env!("GIT_SHA").unwrap_or("unknown")
  1202. )
  1203. });
  1204. let elders_cell = T(&mut slab, &[five_tuple, peer_id_atom.as_noun()]);
  1205. let elders_tas = D(tas!(b"elders"));
  1206. let inner_cell = T(&mut slab, &[elders_tas, elders_cell]);
  1207. let block_cell = T(&mut slab, &[D(tas!(b"block")), inner_cell]);
  1208. let request = T(&mut slab, &[D(tas!(b"request")), block_cell]);
  1209. slab.set_root(request);
  1210. let result_slab = request_to_scry_slab(unsafe { slab.root() }).unwrap_or_else(|_| {
  1211. panic!(
  1212. "Called `expect()` at {}:{} (git sha: {})",
  1213. file!(),
  1214. line!(),
  1215. option_env!("GIT_SHA").unwrap_or("unknown")
  1216. )
  1217. });
  1218. let result = unsafe { result_slab.root() };
  1219. // Verify the structure: [%elders block_id_b58 peer_id 0]
  1220. assert!(result.is_cell());
  1221. let result_cell = result.as_cell().unwrap_or_else(|_| {
  1222. panic!(
  1223. "Called `expect()` at {}:{} (git sha: {})",
  1224. file!(),
  1225. line!(),
  1226. option_env!("GIT_SHA").unwrap_or("unknown")
  1227. )
  1228. });
  1229. // Check %elders tag
  1230. assert!(result_cell.head().eq_bytes(b"elders"));
  1231. // Get the tail cell
  1232. let tail_cell = result_cell.tail().as_cell().unwrap_or_else(|_| {
  1233. panic!(
  1234. "Called `expect()` at {}:{} (git sha: {})",
  1235. file!(),
  1236. line!(),
  1237. option_env!("GIT_SHA").unwrap_or("unknown")
  1238. )
  1239. });
  1240. // Check block ID (should be base58 encoded)
  1241. let block_id_atom = tail_cell.head().as_atom().unwrap_or_else(|_| {
  1242. panic!(
  1243. "Called `expect()` at {}:{} (git sha: {})",
  1244. file!(),
  1245. line!(),
  1246. option_env!("GIT_SHA").unwrap_or("unknown")
  1247. )
  1248. });
  1249. let block_id_bytes = block_id_atom.to_bytes_until_nul().unwrap_or_else(|_| {
  1250. panic!(
  1251. "Called `expect()` at {}:{} (git sha: {})",
  1252. file!(),
  1253. line!(),
  1254. option_env!("GIT_SHA").unwrap_or("unknown")
  1255. )
  1256. });
  1257. let block_id_str = String::from_utf8(block_id_bytes).unwrap_or_else(|_| {
  1258. panic!(
  1259. "Called `expect()` at {}:{} (git sha: {})",
  1260. file!(),
  1261. line!(),
  1262. option_env!("GIT_SHA").unwrap_or("unknown")
  1263. )
  1264. });
  1265. // Get the expected base58 string
  1266. let expected_b58 = tip5_hash_to_base58(five_tuple).unwrap_or_else(|_| {
  1267. panic!(
  1268. "Called `expect()` at {}:{} (git sha: {})",
  1269. file!(),
  1270. line!(),
  1271. option_env!("GIT_SHA").unwrap_or("unknown")
  1272. )
  1273. });
  1274. assert_eq!(block_id_str, expected_b58);
  1275. // Check peer ID
  1276. let peer_cell = tail_cell.tail().as_cell().unwrap_or_else(|_| {
  1277. panic!(
  1278. "Called `expect()` at {}:{} (git sha: {})",
  1279. file!(),
  1280. line!(),
  1281. option_env!("GIT_SHA").unwrap_or("unknown")
  1282. )
  1283. });
  1284. let peer_id_result = peer_cell.head().as_atom().unwrap_or_else(|_| {
  1285. panic!(
  1286. "Called `expect()` at {}:{} (git sha: {})",
  1287. file!(),
  1288. line!(),
  1289. option_env!("GIT_SHA").unwrap_or("unknown")
  1290. )
  1291. });
  1292. let peer_bytes = peer_id_result.to_bytes_until_nul().unwrap_or_else(|_| {
  1293. panic!(
  1294. "Called `expect()` at {}:{} (git sha: {})",
  1295. file!(),
  1296. line!(),
  1297. option_env!("GIT_SHA").unwrap_or("unknown")
  1298. )
  1299. });
  1300. let peer_str = String::from_utf8(peer_bytes).unwrap_or_else(|_| {
  1301. panic!(
  1302. "Called `expect()` at {}:{} (git sha: {})",
  1303. file!(),
  1304. line!(),
  1305. option_env!("GIT_SHA").unwrap_or("unknown")
  1306. )
  1307. });
  1308. assert_eq!(peer_str, peer_id.to_base58());
  1309. // Check final 0
  1310. assert_eq!(
  1311. peer_cell
  1312. .tail()
  1313. .as_direct()
  1314. .unwrap_or_else(|err| {
  1315. panic!(
  1316. "Panicked with {err:?} at {}:{} (git sha: {:?})",
  1317. file!(),
  1318. line!(),
  1319. option_env!("GIT_SHA")
  1320. )
  1321. })
  1322. .data(),
  1323. 0
  1324. );
  1325. }
  1326. // Test invalid elders request (not a cell)
  1327. {
  1328. let mut slab = NounSlab::new();
  1329. let invalid_request = T(
  1330. &mut slab,
  1331. &[D(tas!(b"request")), D(tas!(b"block")), D(tas!(b"elders"))],
  1332. );
  1333. slab.set_root(invalid_request);
  1334. let result = request_to_scry_slab(unsafe { slab.root() });
  1335. assert!(result.is_err());
  1336. }
  1337. }
  1338. #[test]
  1339. #[cfg_attr(miri, ignore)] // equix uses a foreign function so miri fails this tes
  1340. fn test_equix_pow_verification() {
  1341. use equix::EquiXBuilder;
  1342. // Create EquiX builder - new() doesn't return Result
  1343. let mut builder = EquiXBuilder::new();
  1344. // Create test peer IDs
  1345. let local_peer_id = PeerId::random();
  1346. let remote_peer_id = PeerId::random();
  1347. // Create test message
  1348. let message = ByteBuf::from(vec![1, 2, 3, 4, 5]);
  1349. // Create valid request with correct PoW
  1350. let valid_request =
  1351. NockchainRequest::new_request(&mut builder, &local_peer_id, &remote_peer_id, &{
  1352. let mut slab = NounSlab::new();
  1353. let message_noun = Atom::from_value(&mut slab, &message[..])
  1354. .expect("Failed to create message atom")
  1355. .as_noun();
  1356. slab.set_root(message_noun);
  1357. slab
  1358. });
  1359. // Verify the valid request
  1360. match &valid_request {
  1361. NockchainRequest::Request {
  1362. pow,
  1363. nonce,
  1364. message: _,
  1365. } => {
  1366. // Test successful verification
  1367. let result = valid_request.verify_pow(
  1368. &mut builder, &remote_peer_id, // Note: peers are swapped for verification
  1369. &local_peer_id,
  1370. );
  1371. assert!(result.is_ok(), "Valid PoW should verify successfully");
  1372. // Test failed verification with tampered nonce
  1373. let tampered_request = NockchainRequest::Request {
  1374. pow: *pow,
  1375. nonce: nonce + 1, // Tamper with the nonce
  1376. message: message.clone(),
  1377. };
  1378. let result =
  1379. tampered_request.verify_pow(&mut builder, &remote_peer_id, &local_peer_id);
  1380. assert!(result.is_err(), "Tampered nonce should fail verification");
  1381. // Test failed verification with wrong peer order
  1382. let result = valid_request.verify_pow(
  1383. &mut builder, &local_peer_id, // Wrong order - not swapped
  1384. &remote_peer_id,
  1385. );
  1386. assert!(result.is_err(), "Wrong peer order should fail verification");
  1387. }
  1388. _ => panic!("Expected Request variant"),
  1389. }
  1390. // Test that gossip requests always verify successfully
  1391. let gossip_request = NockchainRequest::Gossip {
  1392. message: message.clone(),
  1393. };
  1394. let result = gossip_request.verify_pow(&mut builder, &remote_peer_id, &local_peer_id);
  1395. assert!(
  1396. result.is_ok(),
  1397. "Gossip requests should always verify successfully"
  1398. );
  1399. }
  1400. #[tokio::test]
  1401. #[cfg_attr(miri, ignore)]
  1402. async fn test_liar_peer_effect() {
  1403. use equix::EquiXBuilder;
  1404. use tokio::sync::mpsc;
  1405. // Create a test peer ID and convert to base58
  1406. let peer_id = PeerId::random();
  1407. let peer_id_base58 = peer_id.to_base58();
  1408. // Create the liar-peer effect noun
  1409. let mut effect_slab = NounSlab::new();
  1410. let liar_peer_atom = Atom::from_value(&mut effect_slab, "liar-peer")
  1411. .expect("Failed to create liar-peer atom");
  1412. let peer_id_atom = Atom::from_value(&mut effect_slab, peer_id_base58)
  1413. .expect("Failed to create peer ID atom");
  1414. let effect = T(
  1415. &mut effect_slab,
  1416. &[liar_peer_atom.as_noun(), peer_id_atom.as_noun()],
  1417. );
  1418. effect_slab.set_root(effect);
  1419. // Create channel to receive SwarmAction
  1420. let (swarm_tx, mut swarm_rx) = mpsc::channel(1);
  1421. // Call handle_effect with the liar-peer effect
  1422. let result = handle_effect(
  1423. effect_slab,
  1424. swarm_tx,
  1425. EquiXBuilder::new(),
  1426. PeerId::random(), // local peer ID (not relevant for this test)
  1427. vec![], // connected peers (not relevant for this test)
  1428. Arc::new(Mutex::new(MessageTracker::new())),
  1429. )
  1430. .await;
  1431. // Verify the function succeeded
  1432. assert!(result.is_ok(), "handle_effect should succeed");
  1433. // Verify that a BlockPeer action was sent with the correct peer ID
  1434. match swarm_rx.recv().await {
  1435. Some(SwarmAction::BlockPeer {
  1436. peer_id: blocked_peer,
  1437. }) => {
  1438. assert_eq!(blocked_peer, peer_id, "Wrong peer ID was blocked");
  1439. }
  1440. other => panic!("Expected BlockPeer action, got {:?}", other),
  1441. }
  1442. // Verify no more actions were sent
  1443. assert!(swarm_rx.try_recv().is_err(), "Should only send one action");
  1444. }
  1445. #[tokio::test]
  1446. #[cfg_attr(miri, ignore)] // ibig has a memory leak so miri fails this test
  1447. async fn test_track_add_effect() {
  1448. use equix::EquiXBuilder;
  1449. use tokio::sync::mpsc;
  1450. // Create test peer ID
  1451. let peer_id = PeerId::random();
  1452. let peer_id_base58 = peer_id.to_base58();
  1453. // Create the track add effect noun
  1454. let mut effect_slab = NounSlab::new();
  1455. let track_atom = make_tas(&mut effect_slab, "track");
  1456. let add_atom = make_tas(&mut effect_slab, "add");
  1457. // Create block ID as [1 2 3 4 5]
  1458. let block_id_tuple = T(&mut effect_slab, &[D(1), D(2), D(3), D(4), D(5)]);
  1459. let peer_id_atom = Atom::from_value(&mut effect_slab, peer_id_base58)
  1460. .expect("Failed to create peer ID atom");
  1461. // Build the noun structure: [%track %add block-id peer-id]
  1462. let data_cell = T(&mut effect_slab, &[block_id_tuple, peer_id_atom.as_noun()]);
  1463. let add_cell = T(&mut effect_slab, &[add_atom.as_noun(), data_cell]);
  1464. let track_cell = T(&mut effect_slab, &[track_atom.as_noun(), add_cell]);
  1465. effect_slab.set_root(track_cell);
  1466. // Create message tracker and other required components
  1467. let message_tracker = Arc::new(Mutex::new(MessageTracker::new()));
  1468. let (swarm_tx, _swarm_rx) = mpsc::channel(1);
  1469. // Call handle_effect with the track add effect
  1470. let result = handle_effect(
  1471. effect_slab.clone(), // test fails if we don't clone
  1472. swarm_tx,
  1473. EquiXBuilder::new(),
  1474. PeerId::random(), // local peer ID (not relevant for this test)
  1475. vec![], // connected peers (not relevant for this test)
  1476. message_tracker.clone(),
  1477. )
  1478. .await;
  1479. // Verify the function succeeded
  1480. assert!(result.is_ok(), "handle_effect should succeed");
  1481. // Get the expected block ID string (base58 of [1 2 3 4 5])
  1482. let block_id_str = tip5_hash_to_base58(block_id_tuple).unwrap_or_else(|_| {
  1483. panic!(
  1484. "Called `expect()` at {}:{} (git sha: {})",
  1485. file!(),
  1486. line!(),
  1487. option_env!("GIT_SHA").unwrap_or("unknown")
  1488. )
  1489. });
  1490. // Verify the message tracker state
  1491. let tracker = message_tracker.lock().await;
  1492. // Check block_id_to_peers mapping
  1493. let peers = tracker.get_peers_for_block_id(block_id_tuple);
  1494. assert!(
  1495. peers.contains(&peer_id),
  1496. "Peer ID should be associated with block ID"
  1497. );
  1498. // Check peer_to_block_ids mapping
  1499. let block_ids = tracker.get_block_ids_for_peer(peer_id);
  1500. assert!(
  1501. block_ids.contains(&block_id_str),
  1502. "Block ID should be associated with peer ID"
  1503. );
  1504. }
  1505. #[tokio::test]
  1506. #[cfg_attr(miri, ignore)] // ibig has a memory leak so miri fails this test
  1507. async fn test_track_remove_effect() {
  1508. use equix::EquiXBuilder;
  1509. use tokio::sync::mpsc;
  1510. // Create test peer ID
  1511. let peer_id = PeerId::random();
  1512. // Create a message tracker and add an entry that we'll later remove
  1513. let message_tracker = Arc::new(Mutex::new(MessageTracker::new()));
  1514. // Create block ID as [1 2 3 4 5]
  1515. let mut setup_slab = NounSlab::new();
  1516. let block_id_tuple = T(&mut setup_slab, &[D(1), D(2), D(3), D(4), D(5)]);
  1517. {
  1518. let mut tracker = message_tracker.lock().await;
  1519. tracker
  1520. .track_block_id_and_peer(block_id_tuple, peer_id)
  1521. .unwrap_or_else(|_| {
  1522. panic!(
  1523. "Called `expect()` at {}:{} (git sha: {})",
  1524. file!(),
  1525. line!(),
  1526. option_env!("GIT_SHA").unwrap_or("unknown")
  1527. )
  1528. });
  1529. // Verify it was added correctly
  1530. assert!(tracker.is_tracking_block_id(block_id_tuple));
  1531. assert!(tracker.is_tracking_peer(peer_id));
  1532. }
  1533. // Now create the track remove effect noun
  1534. let mut effect_slab = NounSlab::new();
  1535. let track_atom = make_tas(&mut effect_slab, "track");
  1536. let remove_atom = make_tas(&mut effect_slab, "remove");
  1537. // Copy the block ID tuple to the effect slab
  1538. let block_id_tuple_in_effect = T(&mut effect_slab, &[D(1), D(2), D(3), D(4), D(5)]);
  1539. // Build the noun structure: [%track %remove block-id]
  1540. let remove_cell = T(
  1541. &mut effect_slab,
  1542. &[remove_atom.as_noun(), block_id_tuple_in_effect],
  1543. );
  1544. let track_cell = T(&mut effect_slab, &[track_atom.as_noun(), remove_cell]);
  1545. effect_slab.set_root(track_cell);
  1546. // Create channel for SwarmAction (not used in this test)
  1547. let (swarm_tx, _swarm_rx) = mpsc::channel(1);
  1548. // Call handle_effect with the track remove effect
  1549. let result = handle_effect(
  1550. effect_slab,
  1551. swarm_tx,
  1552. EquiXBuilder::new(),
  1553. PeerId::random(), // local peer ID (not relevant for this test)
  1554. vec![], // connected peers (not relevant for this test)
  1555. message_tracker.clone(),
  1556. )
  1557. .await;
  1558. // Verify the function succeeded
  1559. assert!(result.is_ok(), "handle_effect should succeed");
  1560. // Verify the message tracker state after removal
  1561. let tracker = message_tracker.lock().await;
  1562. // Check that the block ID was removed from block_id_to_peers
  1563. assert!(
  1564. !tracker.is_tracking_block_id(block_id_tuple),
  1565. "Block ID should be removed"
  1566. );
  1567. // Check that the peer's entry in peer_to_block_ids is also removed
  1568. // (since this was the only block ID associated with the peer)
  1569. assert!(
  1570. !tracker.is_tracking_peer(peer_id),
  1571. "Peer ID should be removed since it has no more block IDs"
  1572. );
  1573. }
  1574. #[tokio::test]
  1575. #[cfg_attr(miri, ignore)] // ibig has a memory leak so miri fails this test
  1576. async fn test_liar_block_id_effect() {
  1577. use equix::EquiXBuilder;
  1578. use tokio::sync::mpsc;
  1579. println!("Starting test_liar_block_id_effect");
  1580. // Create test peer IDs
  1581. let bad_peer1 = PeerId::random();
  1582. let bad_peer2 = PeerId::random();
  1583. let good_peer = PeerId::random();
  1584. println!(
  1585. "Created peer_ids: bad1={}, bad2={}, good={}",
  1586. bad_peer1, bad_peer2, good_peer
  1587. );
  1588. // Create a message tracker and add entries
  1589. let message_tracker = Arc::new(Mutex::new(MessageTracker::new()));
  1590. // Create block IDs
  1591. let mut setup_slab = NounSlab::new();
  1592. // Bad block ID as [1 2 3 4 5]
  1593. let bad_block_id = T(&mut setup_slab, &[D(1), D(2), D(3), D(4), D(5)]);
  1594. // Good block ID as [6 7 8 9 10]
  1595. let good_block_id = T(&mut setup_slab, &[D(6), D(7), D(8), D(9), D(10)]);
  1596. println!("Created block_ids");
  1597. {
  1598. let mut tracker = message_tracker.lock().await;
  1599. println!("Tracking block_ids and peers");
  1600. // Associate bad_peer1 with the bad block
  1601. tracker
  1602. .track_block_id_and_peer(bad_block_id, bad_peer1)
  1603. .unwrap_or_else(|_| {
  1604. panic!(
  1605. "Called `expect()` at {}:{} (git sha: {})",
  1606. file!(),
  1607. line!(),
  1608. option_env!("GIT_SHA").unwrap_or("unknown")
  1609. )
  1610. });
  1611. // Associate bad_peer2 with the bad block
  1612. tracker
  1613. .add_peer_if_tracking_block_id(bad_block_id, bad_peer2)
  1614. .unwrap_or_else(|_| {
  1615. panic!(
  1616. "Called `expect()` at {}:{} (git sha: {})",
  1617. file!(),
  1618. line!(),
  1619. option_env!("GIT_SHA").unwrap_or("unknown")
  1620. )
  1621. });
  1622. // Associate good_peer with a different block
  1623. tracker
  1624. .track_block_id_and_peer(good_block_id, good_peer)
  1625. .unwrap_or_else(|_| {
  1626. panic!(
  1627. "Called `expect()` at {}:{} (git sha: {})",
  1628. file!(),
  1629. line!(),
  1630. option_env!("GIT_SHA").unwrap_or("unknown")
  1631. )
  1632. });
  1633. // Verify tracking is working
  1634. assert!(tracker.is_tracking_block_id(bad_block_id));
  1635. assert!(tracker.is_tracking_block_id(good_block_id));
  1636. assert!(tracker.is_tracking_peer(bad_peer1));
  1637. assert!(tracker.is_tracking_peer(bad_peer2));
  1638. assert!(tracker.is_tracking_peer(good_peer));
  1639. println!("Verified tracking is working");
  1640. }
  1641. // Now create the liar-block-id effect noun for the bad block
  1642. let mut effect_slab = NounSlab::new();
  1643. let liar_block_id_atom = Atom::from_value(&mut effect_slab, "liar-block-id")
  1644. .expect("Failed to create liar-block-id atom");
  1645. // Copy the bad block ID tuple to the effect slab
  1646. let bad_block_id_in_effect = T(&mut effect_slab, &[D(1), D(2), D(3), D(4), D(5)]);
  1647. // Build the noun structure: [%liar-block-id bad-block-id]
  1648. let effect = T(
  1649. &mut effect_slab,
  1650. &[liar_block_id_atom.as_noun(), bad_block_id_in_effect],
  1651. );
  1652. effect_slab.set_root(effect);
  1653. println!("Created liar-block-id effect");
  1654. // Create channel for SwarmAction
  1655. let (swarm_tx, mut swarm_rx) = mpsc::channel(10); // Increased capacity for multiple actions
  1656. println!("Created swarm channel");
  1657. // Call handle_effect with the liar-block-id effect
  1658. println!("Calling handle_effect");
  1659. let result = handle_effect(
  1660. effect_slab,
  1661. swarm_tx,
  1662. EquiXBuilder::new(),
  1663. PeerId::random(), // local peer ID (not relevant for this test)
  1664. vec![], // connected peers (not relevant for this test)
  1665. message_tracker.clone(),
  1666. )
  1667. .await;
  1668. println!("handle_effect result: {:?}", result);
  1669. // Verify the function succeeded
  1670. assert!(result.is_ok(), "handle_effect should succeed");
  1671. println!("Verified handle_effect succeeded");
  1672. // Collect all the block actions
  1673. let mut blocked_peers = Vec::new();
  1674. while let Ok(action) = swarm_rx.try_recv() {
  1675. match action {
  1676. SwarmAction::BlockPeer { peer_id } => {
  1677. println!("Received BlockPeer action for peer: {}", peer_id);
  1678. blocked_peers.push(peer_id);
  1679. }
  1680. other => {
  1681. println!("Unexpected action received: {:?}", other);
  1682. panic!("Expected BlockPeer action, got {:?}", other);
  1683. }
  1684. }
  1685. }
  1686. // Verify both bad peers were blocked
  1687. assert_eq!(
  1688. blocked_peers.len(),
  1689. 2,
  1690. "Should have blocked exactly 2 peers"
  1691. );
  1692. assert!(
  1693. blocked_peers.contains(&bad_peer1),
  1694. "bad_peer1 should be blocked"
  1695. );
  1696. assert!(
  1697. blocked_peers.contains(&bad_peer2),
  1698. "bad_peer2 should be blocked"
  1699. );
  1700. assert!(
  1701. !blocked_peers.contains(&good_peer),
  1702. "good_peer should not be blocked"
  1703. );
  1704. println!("Verified correct peers were blocked");
  1705. // Verify the bad block ID was removed from the tracker
  1706. {
  1707. let tracker = message_tracker.lock().await;
  1708. // Bad block should be removed
  1709. assert!(
  1710. !tracker.is_tracking_block_id(bad_block_id),
  1711. "Bad block ID should be removed"
  1712. );
  1713. // Good block should still be tracked
  1714. assert!(
  1715. tracker.is_tracking_block_id(good_block_id),
  1716. "Good block ID should still be tracked"
  1717. );
  1718. // Bad peers should be removed
  1719. assert!(
  1720. !tracker.is_tracking_peer(bad_peer1),
  1721. "bad_peer1 should be removed from tracker"
  1722. );
  1723. assert!(
  1724. !tracker.is_tracking_peer(bad_peer2),
  1725. "bad_peer2 should be removed from tracker"
  1726. );
  1727. // Good peer should still be tracked
  1728. assert!(
  1729. tracker.is_tracking_peer(good_peer),
  1730. "good_peer should still be tracked"
  1731. );
  1732. println!("Verified tracker state is correct after processing effect");
  1733. }
  1734. }
  1735. #[tokio::test]
  1736. #[cfg_attr(miri, ignore)] // ibig has a memory leak so miri fails this test
  1737. async fn test_seen_block_effect() {
  1738. use equix::EquiXBuilder;
  1739. use tokio::sync::mpsc;
  1740. let mut effect_slab = NounSlab::new();
  1741. let block_id = T(&mut effect_slab, &[D(1), D(2), D(3), D(4), D(5)]);
  1742. let block_id_str = tip5_hash_to_base58(block_id).unwrap_or_else(|_| {
  1743. panic!(
  1744. "Called `expect()` at {}:{} (git sha: {})",
  1745. file!(),
  1746. line!(),
  1747. option_env!("GIT_SHA").unwrap_or("unknown")
  1748. )
  1749. });
  1750. let effect = T(
  1751. &mut effect_slab,
  1752. &[D(tas!(b"seen")), D(tas!(b"block")), block_id],
  1753. );
  1754. effect_slab.set_root(effect);
  1755. let (swarm_tx, _) = mpsc::channel(1);
  1756. let message_tracker = Arc::new(Mutex::new(MessageTracker::new()));
  1757. let message_tracker_clone = Arc::clone(&message_tracker); // Clone the Arc, not the MessageTracker
  1758. let result = handle_effect(
  1759. effect_slab,
  1760. swarm_tx,
  1761. EquiXBuilder::new(),
  1762. PeerId::random(), // local peer ID (not relevant for this test)
  1763. vec![], // connected peers (not relevant for this test)
  1764. message_tracker_clone,
  1765. )
  1766. .await;
  1767. assert!(result.is_ok(), "handle_effect should succeed");
  1768. // Verify that the block id was added to the seen_blocks set
  1769. let tracker = message_tracker.lock().await;
  1770. let contains = tracker.seen_blocks.contains(&block_id_str);
  1771. assert!(contains, "Block ID should be marked as seen");
  1772. }
  1773. #[tokio::test]
  1774. #[cfg_attr(miri, ignore)] // ibig has a memory leak so miri fails this test
  1775. async fn test_seen_tx_effect() {
  1776. use equix::EquiXBuilder;
  1777. use tokio::sync::mpsc;
  1778. let mut effect_slab = NounSlab::new();
  1779. let tx_id = T(&mut effect_slab, &[D(1), D(2), D(3), D(4), D(5)]);
  1780. let tx_id_str = tip5_hash_to_base58(tx_id).unwrap_or_else(|_| {
  1781. panic!(
  1782. "Called `expect()` at {}:{} (git sha: {})",
  1783. file!(),
  1784. line!(),
  1785. option_env!("GIT_SHA").unwrap_or("unknown")
  1786. )
  1787. });
  1788. let effect = T(&mut effect_slab, &[D(tas!(b"seen")), D(tas!(b"tx")), tx_id]);
  1789. effect_slab.set_root(effect);
  1790. let (swarm_tx, _) = mpsc::channel(1);
  1791. let message_tracker = Arc::new(Mutex::new(MessageTracker::new()));
  1792. let message_tracker_clone = Arc::clone(&message_tracker); // Clone the Arc, not the MessageTracker
  1793. let result = handle_effect(
  1794. effect_slab,
  1795. swarm_tx,
  1796. EquiXBuilder::new(),
  1797. PeerId::random(), // local peer ID (not relevant for this test)
  1798. vec![], // connected peers (not relevant for this test)
  1799. message_tracker_clone,
  1800. )
  1801. .await;
  1802. assert!(result.is_ok(), "handle_effect should succeed");
  1803. // Verify that the tx id was added to the seen_blocks set
  1804. let tracker = message_tracker.lock().await;
  1805. let contains = tracker.seen_txs.contains(&tx_id_str);
  1806. assert!(contains, "tx ID should be marked as seen");
  1807. }
  1808. }