p2p_util.rs 21 KB


  1. use std::collections::{BTreeMap, BTreeSet};
  2. use std::net::{Ipv4Addr, Ipv6Addr};
  3. use std::str::FromStr;
  4. use libp2p::PeerId;
  5. use nockapp::noun::slab::NounSlab;
  6. use nockapp::{AtomExt, NockAppError, NounExt};
  7. use nockvm::noun::Noun;
  8. use nockvm_macros::tas;
  9. use tracing::{debug, warn};
  10. use crate::metrics::NockchainP2PMetrics;
  11. use crate::tip5_util::tip5_hash_to_base58;
  12. // The warn logs are specifically constructed for fail2ban
  13. // Changing these breaks the integration with the fail2ban regex
  14. pub fn log_fail2ban_ipv4(peer_id: &PeerId, ip: &Ipv4Addr) {
  15. warn!("fail2ban: Blocked peer {peer_id} with IPv4 address: {ip}");
  16. }
  17. pub fn log_fail2ban_ipv6(peer_id: &PeerId, ip: &Ipv6Addr) {
  18. warn!("fail2ban: Blocked peer {peer_id} with IPv6 address: {ip}");
  19. }
  20. pub trait PeerIdExt {
  21. fn from_noun(noun: Noun) -> Result<PeerId, NockAppError>;
  22. }
  23. impl PeerIdExt for PeerId {
  24. fn from_noun(noun: Noun) -> Result<PeerId, NockAppError> {
  25. let peer_id_bytes = noun.as_atom()?.to_bytes_until_nul()?;
  26. let peer_id_str = String::from_utf8(peer_id_bytes)?;
  27. PeerId::from_str(&peer_id_str).map_err(|_| NockAppError::OtherError)
  28. }
  29. }
  30. /// This struct is used to track which peers sent us which block IDs.
  31. /// `block_id_to_peers` is the one we really care about, since it's what we use
  32. /// to figure out which peers to ban when we get a %liar-block-id effect.
  33. /// But when we are removing peers, we don't want to have to iterate over
  34. /// every block ID and check if the peer is in the set. So we also maintain
  35. /// a `peer_to_block_ids` map.
  36. pub struct MessageTracker {
  37. block_id_to_peers: BTreeMap<String, BTreeSet<PeerId>>,
  38. peer_to_block_ids: BTreeMap<PeerId, BTreeSet<String>>,
  39. pub seen_blocks: BTreeSet<String>,
  40. pub seen_txs: BTreeSet<String>,
  41. pub block_cache: BTreeMap<u64, NounSlab>,
  42. pub tx_cache: BTreeMap<String, NounSlab>,
  43. }
  44. impl Default for MessageTracker {
  45. fn default() -> Self {
  46. Self::new()
  47. }
  48. }
  49. impl MessageTracker {
  50. pub fn new() -> Self {
  51. Self {
  52. block_id_to_peers: BTreeMap::new(),
  53. peer_to_block_ids: BTreeMap::new(),
  54. seen_blocks: BTreeSet::new(),
  55. seen_txs: BTreeSet::new(),
  56. block_cache: BTreeMap::new(),
  57. tx_cache: BTreeMap::new(),
  58. }
  59. }
  60. fn track_block_id_str_and_peer(&mut self, block_id_str: String, peer_id: PeerId) {
  61. self.block_id_to_peers
  62. .entry(block_id_str.clone())
  63. .or_default()
  64. .insert(peer_id);
  65. self.peer_to_block_ids
  66. .entry(peer_id)
  67. .or_default()
  68. .insert(block_id_str);
  69. }
  70. fn remove_block_id_str(&mut self, block_id: &str) {
  71. let Some(peers) = self.block_id_to_peers.remove(block_id) else {
  72. return;
  73. };
  74. for peer_id in peers {
  75. let Some(block_ids) = self.peer_to_block_ids.get_mut(&peer_id) else {
  76. continue;
  77. };
  78. block_ids.remove(block_id);
  79. if block_ids.is_empty() {
  80. self.peer_to_block_ids.remove(&peer_id);
  81. }
  82. }
  83. }
  84. /// Removes a peer from the tracker.
  85. /// done if a peer disconnects or is banned.
  86. pub fn remove_peer(&mut self, peer_id: &PeerId) {
  87. let Some(block_ids) = self.peer_to_block_ids.remove(peer_id) else {
  88. return;
  89. };
  90. for block_id in block_ids {
  91. let Some(peers) = self.block_id_to_peers.get_mut(&block_id) else {
  92. continue;
  93. };
  94. peers.remove(peer_id);
  95. if peers.is_empty() {
  96. self.block_id_to_peers.remove(&block_id);
  97. }
  98. }
  99. }
  100. /// Adds a block ID and peer to the tracker.
  101. /// implements [%track %add block-id peer-id] effect
  102. pub fn track_block_id_and_peer(
  103. &mut self,
  104. block_id: Noun,
  105. peer_id: PeerId,
  106. ) -> Result<(), NockAppError> {
  107. let block_id_str = tip5_hash_to_base58(block_id)?;
  108. self.track_block_id_str_and_peer(block_id_str, peer_id);
  109. Ok(())
  110. }
  111. /// Adds a peer to an existing block ID. Returns true if the block ID exists and the peer was added,
  112. /// false if the block ID doesn't exist in the tracker.
  113. pub fn add_peer_if_tracking_block_id(
  114. &mut self,
  115. block_id: Noun,
  116. peer_id: PeerId,
  117. ) -> Result<bool, NockAppError> {
  118. let block_id_str = tip5_hash_to_base58(block_id)?;
  119. if self.block_id_to_peers.contains_key(&block_id_str) {
  120. self.track_block_id_str_and_peer(block_id_str, peer_id);
  121. Ok(true)
  122. } else {
  123. Ok(false)
  124. }
  125. }
  126. /// Removes a block ID from the tracker.
  127. /// implements [%track %remove block-id] effect
  128. pub fn remove_block_id(&mut self, block_id: Noun) -> Result<(), NockAppError> {
  129. let block_id_str = tip5_hash_to_base58(block_id)?;
  130. self.remove_block_id_str(&block_id_str);
  131. Ok(())
  132. }
  133. /// Returns a list of peers that have sent us a given block ID.
  134. pub fn get_peers_for_block_id(&self, block_id: Noun) -> Vec<PeerId> {
  135. let Ok(block_id_str) = tip5_hash_to_base58(block_id) else {
  136. panic!("Invalid block ID");
  137. };
  138. self.block_id_to_peers
  139. .get(&block_id_str)
  140. .map(|peers| peers.iter().cloned().collect::<Vec<_>>())
  141. .unwrap_or_default()
  142. }
  143. /// Returns a list of block IDs that a given peer has sent us.
  144. pub fn get_block_ids_for_peer(&self, peer_id: PeerId) -> Vec<String> {
  145. self.peer_to_block_ids
  146. .get(&peer_id)
  147. .map(|block_ids| block_ids.iter().cloned().collect::<Vec<_>>())
  148. .unwrap_or_default()
  149. }
  150. /// Returns true if we are tracking a given block ID.
  151. pub fn is_tracking_block_id(&self, block_id: Noun) -> bool {
  152. let Ok(block_id_str) = tip5_hash_to_base58(block_id) else {
  153. return false;
  154. };
  155. self.block_id_to_peers.contains_key(&block_id_str)
  156. }
  157. pub fn is_tracking_peer(&self, peer_id: PeerId) -> bool {
  158. self.peer_to_block_ids.contains_key(&peer_id)
  159. }
  160. // Removes the block id from the MessageTracker maps and returns all the
  161. // peers who had sent us that block.
  162. pub fn process_bad_block_id(&mut self, block_id: Noun) -> Result<Vec<PeerId>, NockAppError> {
  163. let block_id_str = tip5_hash_to_base58(block_id)?;
  164. let peers_to_ban = self
  165. .block_id_to_peers
  166. .get(&block_id_str)
  167. .map(|peers| peers.iter().cloned().collect::<Vec<_>>())
  168. .unwrap_or_default();
  169. // Remove each peer that sent us this bad block
  170. for peer in &peers_to_ban {
  171. self.remove_peer(peer);
  172. }
  173. self.remove_block_id(block_id)?;
  174. Ok(peers_to_ban)
  175. }
  176. pub async fn check_cache(
  177. &mut self,
  178. request: &Noun,
  179. metrics: &NockchainP2PMetrics,
  180. ) -> Result<Option<NounSlab>, NockAppError> {
  181. let tag = request.as_cell()?.head().as_direct()?.data();
  182. if tag != tas!(b"request") {
  183. return Ok(None);
  184. }
  185. let request_body = request.as_cell()?.tail().as_cell()?;
  186. if request_body.head().eq_bytes(b"block") {
  187. let tail = request_body.tail();
  188. let kind = tail.as_cell()?.head();
  189. if !kind.eq_bytes(b"by-height") {
  190. return Ok(None);
  191. }
  192. let height = tail.as_cell()?.tail().as_direct()?.data();
  193. if let Some(cached_block) = self.block_cache.get(&height) {
  194. debug!("found cached block request by height={:?}", height);
  195. metrics.block_request_cache_hits.increment();
  196. Ok(Some(cached_block.clone()))
  197. } else {
  198. debug!("didn't find cached block request by height={:?}", height);
  199. metrics.block_request_cache_misses.increment();
  200. Ok(None)
  201. }
  202. } else if request_body.head().eq_bytes(b"raw-tx") {
  203. let tail = request_body.tail();
  204. let kind = tail.as_cell()?.head();
  205. if !kind.eq_bytes(b"by-id") {
  206. return Ok(None);
  207. }
  208. let tx_id = tail.as_cell()?.tail();
  209. let tx_id_str = tip5_hash_to_base58(tx_id)?;
  210. if let Some(cached_tx) = self.tx_cache.get(&tx_id_str) {
  211. debug!("found cached tx request by id={:?}", tx_id_str);
  212. metrics.tx_request_cache_hits.increment();
  213. return Ok(Some(cached_tx.clone()));
  214. } else {
  215. debug!("didn't find cached tx request by id={:?}", tx_id_str);
  216. metrics.tx_request_cache_misses.increment();
  217. return Ok(None);
  218. }
  219. } else {
  220. return Ok(None);
  221. }
  222. }
  223. }
  224. #[cfg(test)]
  225. mod tests {
  226. use nockapp::noun::slab::NounSlab;
  227. use nockvm::noun::{D, T};
  228. use super::*;
  229. #[test]
  230. #[cfg_attr(miri, ignore)] // ibig has a memory leak so miri fails this test
  231. fn test_message_tracker_basic() {
  232. let mut tracker = MessageTracker::new();
  233. let peer_id = PeerId::random();
  234. // Create a block ID as [1 2 3 4 5]
  235. let mut slab = NounSlab::new();
  236. let block_id_tuple = T(&mut slab, &[D(1), D(2), D(3), D(4), D(5)]);
  237. // Add the block ID
  238. tracker
  239. .track_block_id_and_peer(block_id_tuple, peer_id)
  240. .unwrap_or_else(|_| {
  241. panic!(
  242. "Called `expect()` at {}:{} (git sha: {})",
  243. file!(),
  244. line!(),
  245. option_env!("GIT_SHA").unwrap_or("unknown")
  246. )
  247. });
  248. // Get the block ID string
  249. let block_id_str = tip5_hash_to_base58(block_id_tuple).unwrap_or_else(|_| {
  250. panic!(
  251. "Called `expect()` at {}:{} (git sha: {})",
  252. file!(),
  253. line!(),
  254. option_env!("GIT_SHA").unwrap_or("unknown")
  255. )
  256. });
  257. // Verify it was added correctly
  258. assert!(tracker.block_id_to_peers.contains_key(&block_id_str));
  259. assert!(tracker.peer_to_block_ids.contains_key(&peer_id));
  260. // Remove the block ID
  261. tracker.remove_block_id(block_id_tuple).unwrap_or_else(|_| {
  262. panic!(
  263. "Called `expect()` at {}:{} (git sha: {})",
  264. file!(),
  265. line!(),
  266. option_env!("GIT_SHA").unwrap_or("unknown")
  267. )
  268. });
  269. // Verify it was removed
  270. assert!(!tracker.block_id_to_peers.contains_key(&block_id_str));
  271. assert!(!tracker.peer_to_block_ids.contains_key(&peer_id));
  272. }
  273. #[test]
  274. #[cfg_attr(miri, ignore)] // ibig has a memory leak so miri fails this test
  275. fn test_bad_block_id() {
  276. let mut tracker = MessageTracker::new();
  277. let peer_id = PeerId::random();
  278. // Create a block ID
  279. let mut slab = NounSlab::new();
  280. let block_id_tuple = T(&mut slab, &[D(1), D(2), D(3), D(4), D(5)]);
  281. // Track the block ID
  282. tracker
  283. .track_block_id_and_peer(block_id_tuple, peer_id)
  284. .unwrap_or_else(|_| {
  285. panic!(
  286. "Called `expect()` at {}:{} (git sha: {})",
  287. file!(),
  288. line!(),
  289. option_env!("GIT_SHA").unwrap_or("unknown")
  290. )
  291. });
  292. // Mark it as bad
  293. let peers_to_ban = tracker
  294. .process_bad_block_id(block_id_tuple)
  295. .unwrap_or_else(|_| {
  296. panic!(
  297. "Called `expect()` at {}:{} (git sha: {})",
  298. file!(),
  299. line!(),
  300. option_env!("GIT_SHA").unwrap_or("unknown")
  301. )
  302. });
  303. // Verify the peer is returned for banning
  304. assert_eq!(peers_to_ban.len(), 1);
  305. assert_eq!(peers_to_ban[0], peer_id);
  306. }
  307. #[test]
  308. fn test_peer_id_base58_roundtrip() {
  309. use nockvm::noun::Atom;
  310. // Generate a random PeerId
  311. let original_peer_id = PeerId::random();
  312. let base58_str = original_peer_id.to_base58();
  313. println!("Original base58: {}", base58_str);
  314. // Create a NounSlab and store the base58 string as an Atom
  315. let mut slab = NounSlab::new();
  316. let peer_id_atom = Atom::from_value(&mut slab, base58_str.as_bytes())
  317. .expect("Failed to create peer ID atom");
  318. // Use the from_noun method to convert back to PeerId
  319. let recovered_peer_id = PeerId::from_noun(peer_id_atom.as_noun()).unwrap_or_else(|_| {
  320. panic!(
  321. "Called `expect()` at {}:{} (git sha: {})",
  322. file!(),
  323. line!(),
  324. option_env!("GIT_SHA").unwrap_or("unknown")
  325. )
  326. });
  327. // Verify round trip
  328. assert_eq!(original_peer_id, recovered_peer_id);
  329. }
  330. #[test]
  331. #[cfg_attr(miri, ignore)] // ibig has a memory leak so miri fails this test
  332. fn test_add_peer_if_tracking_block_id() {
  333. let mut tracker = MessageTracker::new();
  334. let peer_id1 = PeerId::random();
  335. let peer_id2 = PeerId::random();
  336. // Create a block ID
  337. let mut slab = NounSlab::new();
  338. let block_id_tuple = T(&mut slab, &[D(1), D(2), D(3), D(4), D(5)]);
  339. // First, try to add a peer to a non-existent block ID
  340. let result = tracker
  341. .add_peer_if_tracking_block_id(block_id_tuple, peer_id1)
  342. .unwrap_or_else(|_| {
  343. panic!(
  344. "Called `expect()` at {}:{} (git sha: {})",
  345. file!(),
  346. line!(),
  347. option_env!("GIT_SHA").unwrap_or("unknown")
  348. )
  349. });
  350. assert!(!result); // Should return false since block ID doesn't exist
  351. // Now track the block ID with peer1
  352. tracker
  353. .track_block_id_and_peer(block_id_tuple, peer_id1)
  354. .unwrap_or_else(|_| {
  355. panic!(
  356. "Called `expect()` at {}:{} (git sha: {})",
  357. file!(),
  358. line!(),
  359. option_env!("GIT_SHA").unwrap_or("unknown")
  360. )
  361. });
  362. // Add peer2 to the existing block ID
  363. let result = tracker
  364. .add_peer_if_tracking_block_id(block_id_tuple, peer_id2)
  365. .unwrap_or_else(|_| {
  366. panic!(
  367. "Called `expect()` at {}:{} (git sha: {})",
  368. file!(),
  369. line!(),
  370. option_env!("GIT_SHA").unwrap_or("unknown")
  371. )
  372. });
  373. assert!(result); // Should return true since block ID exists
  374. // Verify both peers are associated with the block ID
  375. let peers = tracker.get_peers_for_block_id(block_id_tuple);
  376. assert_eq!(peers.len(), 2);
  377. assert!(peers.contains(&peer_id1));
  378. assert!(peers.contains(&peer_id2));
  379. }
  380. #[test]
  381. #[cfg_attr(miri, ignore)] // ibig has a memory leak so miri fails this test
  382. fn test_add_peer_if_tracking_block_id_then_remove() {
  383. let mut tracker = MessageTracker::new();
  384. let peer_id1 = PeerId::random();
  385. let peer_id2 = PeerId::random();
  386. // Create a block ID
  387. let mut slab = NounSlab::new();
  388. let block_id_tuple = T(&mut slab, &[D(1), D(2), D(3), D(4), D(5)]);
  389. let block_id_str = tip5_hash_to_base58(block_id_tuple).unwrap_or_else(|_| {
  390. panic!(
  391. "Called `expect()` at {}:{} (git sha: {})",
  392. file!(),
  393. line!(),
  394. option_env!("GIT_SHA").unwrap_or("unknown")
  395. )
  396. });
  397. // Track the block ID with peer1
  398. tracker
  399. .track_block_id_and_peer(block_id_tuple, peer_id1)
  400. .unwrap_or_else(|_| {
  401. panic!(
  402. "Called `expect()` at {}:{} (git sha: {})",
  403. file!(),
  404. line!(),
  405. option_env!("GIT_SHA").unwrap_or("unknown")
  406. )
  407. });
  408. // Add peer2 to the existing block ID
  409. let result = tracker
  410. .add_peer_if_tracking_block_id(block_id_tuple, peer_id2)
  411. .unwrap_or_else(|_| {
  412. panic!(
  413. "Called `expect()` at {}:{} (git sha: {})",
  414. file!(),
  415. line!(),
  416. option_env!("GIT_SHA").unwrap_or("unknown")
  417. )
  418. });
  419. assert!(result); // Should return true since block ID exists
  420. // Verify both peers are associated with the block ID
  421. let peers = tracker.get_peers_for_block_id(block_id_tuple);
  422. assert_eq!(peers.len(), 2);
  423. assert!(peers.contains(&peer_id1));
  424. assert!(peers.contains(&peer_id2));
  425. // Now remove the block ID
  426. tracker.remove_block_id(block_id_tuple).unwrap_or_else(|_| {
  427. panic!(
  428. "Called `expect()` at {}:{} (git sha: {})",
  429. file!(),
  430. line!(),
  431. option_env!("GIT_SHA").unwrap_or("unknown")
  432. )
  433. });
  434. // Verify the block ID is no longer tracked
  435. let peers_after_removal = tracker.get_peers_for_block_id(block_id_tuple);
  436. assert_eq!(peers_after_removal.len(), 0);
  437. // Verify the block ID is removed from block_id_to_peers
  438. assert!(!tracker.block_id_to_peers.contains_key(&block_id_str));
  439. // Verify the peers either don't exist in the map anymore or don't have this block ID
  440. // For peer_id1
  441. if let Some(block_ids) = tracker.peer_to_block_ids.get(&peer_id1) {
  442. assert!(!block_ids.contains(&block_id_str));
  443. }
  444. // For peer_id2
  445. if let Some(block_ids) = tracker.peer_to_block_ids.get(&peer_id2) {
  446. assert!(!block_ids.contains(&block_id_str));
  447. }
  448. }
  449. #[test]
  450. #[cfg_attr(miri, ignore)] // ibig has a memory leak so miri fails this test
  451. fn test_process_bad_block_id_removes_peers() {
  452. let mut tracker = MessageTracker::new();
  453. let peer_id1 = PeerId::random();
  454. let peer_id2 = PeerId::random();
  455. // Create a block ID
  456. let mut slab = NounSlab::new();
  457. let block_id_tuple = T(&mut slab, &[D(1), D(2), D(3), D(4), D(5)]);
  458. // Create another block ID that both peers will share
  459. let other_block_id = T(&mut slab, &[D(6), D(7), D(8), D(9), D(10)]);
  460. // Track both block IDs with both peers
  461. tracker
  462. .track_block_id_and_peer(block_id_tuple, peer_id1)
  463. .unwrap_or_else(|_| {
  464. panic!(
  465. "Called `expect()` at {}:{} (git sha: {})",
  466. file!(),
  467. line!(),
  468. option_env!("GIT_SHA").unwrap_or("unknown")
  469. )
  470. });
  471. tracker
  472. .add_peer_if_tracking_block_id(block_id_tuple, peer_id2)
  473. .unwrap_or_else(|_| {
  474. panic!(
  475. "Called `expect()` at {}:{} (git sha: {})",
  476. file!(),
  477. line!(),
  478. option_env!("GIT_SHA").unwrap_or("unknown")
  479. )
  480. });
  481. tracker
  482. .track_block_id_and_peer(other_block_id, peer_id1)
  483. .unwrap_or_else(|_| {
  484. panic!(
  485. "Called `expect()` at {}:{} (git sha: {})",
  486. file!(),
  487. line!(),
  488. option_env!("GIT_SHA").unwrap_or("unknown")
  489. )
  490. });
  491. tracker
  492. .add_peer_if_tracking_block_id(other_block_id, peer_id2)
  493. .unwrap_or_else(|_| {
  494. panic!(
  495. "Called `expect()` at {}:{} (git sha: {})",
  496. file!(),
  497. line!(),
  498. option_env!("GIT_SHA").unwrap_or("unknown")
  499. )
  500. });
  501. // Verify both peers are tracked
  502. assert!(tracker.is_tracking_peer(peer_id1));
  503. assert!(tracker.is_tracking_peer(peer_id2));
  504. // Process the bad block ID
  505. let banned_peers = tracker
  506. .process_bad_block_id(block_id_tuple)
  507. .unwrap_or_else(|_| {
  508. panic!(
  509. "Called `expect()` at {}:{} (git sha: {})",
  510. file!(),
  511. line!(),
  512. option_env!("GIT_SHA").unwrap_or("unknown")
  513. )
  514. });
  515. // Verify both peers were returned for banning
  516. assert_eq!(banned_peers.len(), 2);
  517. assert!(banned_peers.contains(&peer_id1));
  518. assert!(banned_peers.contains(&peer_id2));
  519. // Verify both peers are no longer tracked
  520. assert!(!tracker.is_tracking_peer(peer_id1));
  521. assert!(!tracker.is_tracking_peer(peer_id2));
  522. // Verify the other block ID is also no longer tracked
  523. // (since we removed the peers entirely)
  524. assert!(!tracker.is_tracking_block_id(other_block_id));
  525. }
  526. #[test]
  527. fn test_fail2ban_logging() {
  528. let peer_id: PeerId = libp2p::PeerId::from_bytes(&[0; 2]).unwrap();
  529. assert_eq!("11", peer_id.to_base58());
  530. let ipv4_addr = Ipv4Addr::new(192, 168, 1, 1);
  531. let ipv6_addr = Ipv6Addr::new(0x2001, 0x0db8, 0x0db8, 0x0db8, 0x0db8, 0x0db8, 0x0db8, 0x1);
  532. // Check the display representation of the IP addresses
  533. let ipv4_display = format!("{}", ipv4_addr);
  534. let ipv6_display = format!("{}", ipv6_addr);
  535. assert_eq!(ipv4_display, "192.168.1.1");
  536. assert_eq!(ipv6_display, "2001:db8:db8:db8:db8:db8:db8:1");
  537. }
  538. }