form.rs 39 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198
  1. #![allow(dead_code)]
  2. use crate::noun::slab::NounSlab;
  3. use blake3::{Hash, Hasher};
  4. use byteorder::{LittleEndian, WriteBytesExt};
  5. use nockvm::hamt::Hamt;
  6. use nockvm::interpreter::{self, interpret, Error, Mote};
  7. use nockvm::jets::cold::{Cold, Nounable};
  8. use nockvm::jets::hot::{HotEntry, URBIT_HOT_STATE};
  9. use nockvm::jets::nock::util::mook;
  10. use nockvm::mem::NockStack;
  11. use nockvm::mug::met3_usize;
  12. use nockvm::noun::{Atom, Cell, DirectAtom, IndirectAtom, Noun, Slots, D, T};
  13. use nockvm::trace::{path_to_cord, write_serf_trace_safe, TraceInfo};
  14. use nockvm_macros::tas;
  15. use std::any::Any;
  16. use std::fs::File;
  17. use std::path::PathBuf;
  18. use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
  19. use std::sync::Arc;
  20. use std::time::Instant;
  21. use tracing::{debug, error, info, warn};
  22. use crate::kernel::checkpoint::{Checkpoint, ExportedState, JamPaths, JammedCheckpoint};
  23. use crate::nockapp::wire::{wire_to_noun, WireRepr};
  24. use crate::noun::slam;
  25. use crate::utils::{create_context, current_da, NOCK_STACK_SIZE};
  26. use crate::{AtomExt, CrownError, NounExt, Result, ToBytesExt};
  27. use bincode::config::Configuration;
  28. use tokio::sync::{mpsc, oneshot};
  29. use tokio::time::Duration;
  30. pub(crate) const STATE_AXIS: u64 = 6;
  31. const LOAD_AXIS: u64 = 4;
  32. const PEEK_AXIS: u64 = 22;
  33. const POKE_AXIS: u64 = 23;
  34. const SNAPSHOT_VERSION: u32 = 0;
  35. const SERF_FINISHED_INTERVAL: Duration = Duration::from_millis(100);
  36. const SERF_THREAD_STACK_SIZE: usize = 8 * 1024 * 1024; // 8MB
  37. // Actions to request of the serf thread
  38. pub enum SerfAction {
  39. // Extract this state into the serf
  40. LoadState {
  41. state: Vec<u8>,
  42. result: oneshot::Sender<Result<()>>,
  43. },
  44. // Make a CheckPoint
  45. Checkpoint {
  46. result: oneshot::Sender<JammedCheckpoint>,
  47. },
  48. // Get the state of the serf
  49. GetStateBytes {
  50. result: oneshot::Sender<Result<Vec<u8>>>,
  51. },
  52. // Get the state noun of the kernel as a slab
  53. GetKernelStateSlab {
  54. result: oneshot::Sender<Result<NounSlab>>,
  55. },
  56. // Get the cold state as a NounSlab
  57. GetColdStateSlab {
  58. result: oneshot::Sender<NounSlab>,
  59. },
  60. // Run a peek
  61. Peek {
  62. ovo: NounSlab,
  63. result: oneshot::Sender<Result<NounSlab>>,
  64. },
  65. // Run a poke
  66. //
  67. // TODO: send back the event number after each poke
  68. Poke {
  69. wire: WireRepr,
  70. cause: NounSlab,
  71. result: oneshot::Sender<Result<NounSlab>>,
  72. },
  73. // Stop the loop
  74. Stop,
  75. }
  76. pub(crate) struct SerfThread {
  77. handle: std::thread::JoinHandle<()>,
  78. action_sender: mpsc::Sender<SerfAction>,
  79. /// Jam persistence buffer paths.
  80. pub jam_paths: Arc<JamPaths>,
  81. /// Buffer toggle for writing to the jam buffer.
  82. pub buffer_toggle: Arc<AtomicBool>,
  83. pub event_number: Arc<AtomicU64>,
  84. }
  85. impl SerfThread {
  86. async fn new(
  87. nock_stack_size: usize,
  88. jam_paths: Arc<JamPaths>,
  89. kernel_bytes: Vec<u8>,
  90. constant_hot_state: Vec<HotEntry>,
  91. trace: bool,
  92. ) -> Result<Self> {
  93. let jam_paths_cloned = jam_paths.clone();
  94. let (action_sender, action_receiver) = mpsc::channel(1);
  95. let (buffer_toggle_sender, buffer_toggle_receiver) = oneshot::channel();
  96. let (event_number_sender, event_number_receiver) = oneshot::channel();
  97. std::fs::create_dir_all(jam_paths.0.parent().unwrap_or_else(|| {
  98. panic!(
  99. "Panicked at {}:{} (git sha: {:?})",
  100. file!(),
  101. line!(),
  102. option_env!("GIT_SHA")
  103. )
  104. }))
  105. .unwrap_or_else(|err| {
  106. panic!(
  107. "Panicked with {err:?} at {}:{} (git sha: {:?})",
  108. file!(),
  109. line!(),
  110. option_env!("GIT_SHA")
  111. )
  112. });
  113. let handle = std::thread::Builder::new()
  114. .name("serf".to_string())
  115. .stack_size(SERF_THREAD_STACK_SIZE)
  116. .spawn(move || {
  117. let mut stack = NockStack::new(nock_stack_size, 0);
  118. let checkpoint = if jam_paths.checkpoint_exists() {
  119. info!("Found existing state - restoring from checkpoint");
  120. jam_paths.load_checkpoint(&mut stack).ok()
  121. } else {
  122. info!("No existing state found - initializing fresh state");
  123. None
  124. };
  125. let buffer_toggle = Arc::new(AtomicBool::new(
  126. checkpoint
  127. .as_ref()
  128. .map_or_else(|| false, |checkpoint| !checkpoint.buff_index),
  129. ));
  130. buffer_toggle_sender
  131. .send(buffer_toggle.clone())
  132. .expect("Could not send buffer toggle out of serf thread");
  133. let serf = Serf::new(stack, checkpoint, &kernel_bytes, &constant_hot_state, trace);
  134. event_number_sender
  135. .send(serf.event_num.clone())
  136. .expect("Could not send event number out of serf thread");
  137. serf_loop(serf, action_receiver, buffer_toggle);
  138. })?;
  139. let buffer_toggle = buffer_toggle_receiver.await?;
  140. let event_number = event_number_receiver.await?;
  141. Ok(SerfThread {
  142. buffer_toggle,
  143. handle,
  144. action_sender,
  145. jam_paths: jam_paths_cloned,
  146. event_number,
  147. })
  148. }
  149. // Future which completes when the serf thread finishes. Since rust threads only support polling or blocking joining, not notification,
  150. // we have to poll on a timer.
  151. pub(crate) async fn finished(&self) {
  152. let mut interval = tokio::time::interval(Duration::from_millis(100));
  153. loop {
  154. interval.tick().await;
  155. if self.handle.is_finished() {
  156. debug!("Serf finished");
  157. break;
  158. }
  159. }
  160. }
  161. pub(crate) async fn stop(&self) {
  162. self.action_sender
  163. .send(SerfAction::Stop)
  164. .await
  165. .expect("Failed to send stop action");
  166. self.finished().await;
  167. }
  168. pub(crate) fn join(self) -> Result<(), Box<dyn Any + Send + 'static>> {
  169. self.handle.join()
  170. }
  171. pub(crate) async fn get_kernel_state_slab(&self) -> Result<NounSlab> {
  172. let (result, result_fut) = oneshot::channel();
  173. self.action_sender
  174. .send(SerfAction::GetKernelStateSlab { result })
  175. .await?;
  176. result_fut.await?
  177. }
  178. pub(crate) async fn get_cold_state_slab(&self) -> Result<NounSlab> {
  179. let (result, result_fut) = oneshot::channel();
  180. self.action_sender
  181. .send(SerfAction::GetColdStateSlab { result })
  182. .await?;
  183. Ok(result_fut.await?)
  184. }
  185. pub(crate) async fn peek(&self, ovo: NounSlab) -> Result<NounSlab> {
  186. let (result, result_fut) = oneshot::channel();
  187. self.action_sender
  188. .send(SerfAction::Peek { ovo, result })
  189. .await?;
  190. result_fut.await?
  191. }
  192. pub(crate) async fn poke(&self, wire: WireRepr, cause: NounSlab) -> Result<NounSlab> {
  193. let (result, result_fut) = oneshot::channel();
  194. self.action_sender
  195. .send(SerfAction::Poke {
  196. wire,
  197. cause,
  198. result,
  199. })
  200. .await?;
  201. result_fut.await?
  202. }
  203. pub(crate) fn poke_sync(&self, wire: WireRepr, cause: NounSlab) -> Result<NounSlab> {
  204. let (result, result_fut) = oneshot::channel();
  205. self.action_sender.blocking_send(SerfAction::Poke {
  206. wire,
  207. cause,
  208. result,
  209. })?;
  210. result_fut.blocking_recv()?
  211. }
  212. pub(crate) fn peek_sync(&self, ovo: NounSlab) -> Result<NounSlab> {
  213. let (result, result_fut) = oneshot::channel();
  214. self.action_sender
  215. .blocking_send(SerfAction::Peek { ovo, result })?;
  216. result_fut.blocking_recv()?
  217. }
  218. pub(crate) async fn load_state_from_bytes(&self, state: Vec<u8>) -> Result<()> {
  219. let (result, result_fut) = oneshot::channel();
  220. self.action_sender
  221. .send(SerfAction::LoadState { state, result })
  222. .await?;
  223. result_fut.await?
  224. }
  225. pub(crate) async fn create_state_bytes(&self) -> Result<Vec<u8>> {
  226. let (result, result_fut) = oneshot::channel();
  227. self.action_sender
  228. .send(SerfAction::GetStateBytes { result })
  229. .await?;
  230. result_fut.await?
  231. }
  232. pub(crate) async fn checkpoint(&self) -> Result<JammedCheckpoint> {
  233. let (result, result_fut) = oneshot::channel();
  234. self.action_sender
  235. .send(SerfAction::Checkpoint { result })
  236. .await?;
  237. Ok(result_fut.await?)
  238. }
  239. }
  240. fn load_state_from_bytes(serf: &mut Serf, state_bytes: &[u8]) -> Result<()> {
  241. let noun = extract_state_from_bytes(serf.stack(), state_bytes)?;
  242. serf.load(noun)?;
  243. Ok(())
  244. }
  245. fn serf_loop(
  246. mut serf: Serf,
  247. mut action_receiver: mpsc::Receiver<SerfAction>,
  248. buffer_toggle: Arc<AtomicBool>,
  249. ) {
  250. loop {
  251. let Some(action) = action_receiver.blocking_recv() else {
  252. break;
  253. };
  254. match action {
  255. SerfAction::LoadState { state, result } => {
  256. let res = load_state_from_bytes(&mut serf, &state);
  257. result.send(res).expect("Could not send load state result");
  258. }
  259. SerfAction::Stop => {
  260. break;
  261. }
  262. SerfAction::GetStateBytes { result } => {
  263. let state_bytes = create_state_bytes(&mut serf);
  264. result
  265. .send(state_bytes)
  266. .expect("Could not send state bytes");
  267. }
  268. SerfAction::GetKernelStateSlab { result } => {
  269. let kernel_state_noun = serf.arvo.slot(STATE_AXIS);
  270. let kernel_state_slab = kernel_state_noun.map_or_else(
  271. |err| Err(CrownError::from(err)),
  272. |noun| {
  273. let mut slab = NounSlab::new();
  274. slab.copy_into(noun);
  275. Ok(slab)
  276. },
  277. );
  278. result
  279. .send(kernel_state_slab)
  280. .expect("Could not send kernel state slab");
  281. }
  282. SerfAction::GetColdStateSlab { result } => {
  283. let cold_state_noun = serf.context.cold.into_noun(serf.stack());
  284. let cold_state_slab = {
  285. let mut slab = NounSlab::new();
  286. slab.copy_into(cold_state_noun);
  287. slab
  288. };
  289. result
  290. .send(cold_state_slab)
  291. .expect("Could not send cold state slab");
  292. }
  293. SerfAction::Checkpoint { result } => {
  294. let checkpoint = create_checkpoint(&mut serf, buffer_toggle.clone());
  295. result.send(checkpoint).expect("Could not send checkpoint");
  296. }
  297. SerfAction::Peek { ovo, result } => {
  298. let ovo_noun = ovo.copy_to_stack(serf.stack());
  299. let noun_res = serf.peek(ovo_noun);
  300. let noun_slab_res = noun_res.map(|noun| {
  301. let mut slab = NounSlab::new();
  302. slab.copy_into(noun);
  303. slab
  304. });
  305. result
  306. .send(noun_slab_res)
  307. .expect("Failed to send peek result from serf thread");
  308. }
  309. SerfAction::Poke {
  310. wire,
  311. cause,
  312. result,
  313. } => {
  314. let cause_noun = cause.copy_to_stack(serf.stack());
  315. let noun_res = serf.poke(wire, cause_noun);
  316. let noun_slab_res = noun_res.map(|noun| {
  317. let mut slab = NounSlab::new();
  318. slab.copy_into(noun);
  319. slab
  320. });
  321. result
  322. .send(noun_slab_res)
  323. .expect("Failed to send poke result from serf thread");
  324. }
  325. }
  326. }
  327. }
  328. /// Extracts the kernel state from a jammed checkpoint or exported state
  329. fn extract_state_from_bytes(stack: &mut NockStack, state_bytes: &[u8]) -> Result<Noun> {
  330. // First try to decode as JammedCheckpoint
  331. match extract_from_checkpoint(stack, state_bytes) {
  332. Ok(noun) => {
  333. debug!("Successfully loaded state from JammedCheckpoint format");
  334. Ok(noun)
  335. }
  336. Err(e1) => {
  337. debug!("Failed to load as JammedCheckpoint: {}", e1);
  338. // Then try to decode as ExportedState
  339. match extract_from_exported_state(stack, state_bytes) {
  340. Ok(noun) => {
  341. debug!("Successfully loaded state from ExportedState format");
  342. Ok(noun)
  343. }
  344. Err(e2) => {
  345. warn!("Failed to load as ExportedState: {}", e2);
  346. warn!("State bytes format is not recognized");
  347. Err(CrownError::StateJamFormatError)
  348. }
  349. }
  350. }
  351. }
  352. }
  353. /// Extracts the kernel state from an ExportedState
  354. fn extract_from_exported_state(stack: &mut NockStack, state_jam: &[u8]) -> Result<Noun> {
  355. let config = bincode::config::standard();
  356. // Try to decode as ExportedState
  357. let (exported, _) = bincode::decode_from_slice::<ExportedState, Configuration>(
  358. state_jam, config,
  359. )
  360. .map_err(|e| {
  361. debug!("Failed to decode state jam as ExportedState: {}", e);
  362. CrownError::StateJamFormatError
  363. })?;
  364. // Verify the magic bytes
  365. if exported.magic_bytes != tas!(b"EXPJAM") {
  366. debug!("Invalid magic bytes for ExportedState: expected EXPJAM");
  367. return Err(CrownError::StateJamFormatError);
  368. }
  369. // Extract the kernel state from the jammed noun
  370. let noun = <Noun as NounExt>::cue_bytes(stack, &exported.jam.0).map_err(|e| {
  371. warn!("Failed to cue bytes from exported state jam: {:?}", e);
  372. CrownError::StateJamFormatError
  373. })?;
  374. debug!("Successfully extracted kernel state from ExportedState");
  375. Ok(noun)
  376. }
  377. /// Extracts the kernel state from a JammedCheckpoint
  378. fn extract_from_checkpoint(stack: &mut NockStack, state_jam: &[u8]) -> Result<Noun> {
  379. let config = bincode::config::standard();
  380. // Try to decode as JammedCheckpoint
  381. let (checkpoint, _) = bincode::decode_from_slice::<JammedCheckpoint, Configuration>(
  382. state_jam, config,
  383. )
  384. .map_err(|e| {
  385. debug!("Failed to decode state jam as JammedCheckpoint: {}", e);
  386. CrownError::StateJamFormatError
  387. })?;
  388. // Verify the magic bytes
  389. if checkpoint.magic_bytes != tas!(b"CHKJAM") {
  390. debug!("Invalid magic bytes for JammedCheckpoint: expected CHKJAM");
  391. return Err(CrownError::StateJamFormatError);
  392. }
  393. // Validate the checkpoint
  394. if !checkpoint.validate() {
  395. warn!("Checkpoint validation failed");
  396. return Err(CrownError::StateJamFormatError);
  397. }
  398. // Extract the kernel state from the jammed noun
  399. let cell = <Noun as NounExt>::cue_bytes(stack, &checkpoint.jam.0)
  400. .map_err(|e| {
  401. warn!("Failed to cue bytes from checkpoint jam: {:?}", e);
  402. CrownError::StateJamFormatError
  403. })?
  404. .as_cell()
  405. .map_err(|e| {
  406. warn!("Failed to convert noun to cell: {}", e);
  407. CrownError::StateJamFormatError
  408. })?;
  409. // The kernel state is the head of the cell
  410. debug!("Successfully extracted kernel state from JammedCheckpoint");
  411. Ok(cell.head())
  412. }
  413. /// Creates a serialized byte array from the current kernel state
  414. fn create_state_bytes(serf: &mut Serf) -> Result<Vec<u8>> {
  415. let version = serf.version;
  416. let ker_hash = serf.ker_hash;
  417. let event_num = serf.event_num.load(Ordering::SeqCst);
  418. let ker_state = serf.arvo.slot(STATE_AXIS).unwrap_or_else(|err| {
  419. panic!(
  420. "Panicked with {err:?} at {}:{} (git sha: {:?})",
  421. file!(),
  422. line!(),
  423. option_env!("GIT_SHA")
  424. )
  425. });
  426. let exported_state = ExportedState::new(serf.stack(), version, ker_hash, event_num, &ker_state);
  427. let encoded = exported_state
  428. .encode()
  429. .map_err(|_| CrownError::StateJamFormatError)?;
  430. Ok(encoded)
  431. }
  432. fn create_checkpoint(serf: &mut Serf, buffer_toggle: Arc<AtomicBool>) -> JammedCheckpoint {
  433. let version = serf.version;
  434. let ker_hash = serf.ker_hash;
  435. let event_num = serf.event_num.load(Ordering::SeqCst);
  436. let ker_state = serf.arvo.slot(STATE_AXIS).unwrap_or_else(|err| {
  437. panic!(
  438. "Panicked with {err:?} at {}:{} (git sha: {:?})",
  439. file!(),
  440. line!(),
  441. option_env!("GIT_SHA")
  442. )
  443. });
  444. let cold = serf.context.cold;
  445. let buff_index = buffer_toggle.load(Ordering::SeqCst);
  446. JammedCheckpoint::new(
  447. serf.stack(),
  448. version,
  449. buff_index,
  450. ker_hash,
  451. event_num,
  452. &cold,
  453. &ker_state,
  454. )
  455. }
  456. /// Represents a Sword kernel, containing a Serf and snapshot location.
  457. pub struct Kernel {
  458. /// The Serf managing the interface to the Sword.
  459. pub(crate) serf: SerfThread,
  460. /// Directory path for storing pma snapshots.
  461. pma_dir: Arc<PathBuf>,
  462. /// Atomic flag for terminating the kernel.
  463. pub terminator: Arc<AtomicBool>,
  464. }
  465. impl Kernel {
  466. /// Loads a kernel with a custom hot state.
  467. ///
  468. /// # Arguments
  469. ///
  470. /// * `snap_dir` - Directory for storing snapshots.
  471. /// * `kernel` - Byte slice containing the kernel as a jammed noun.
  472. /// * `hot_state` - Custom hot state entries.
  473. /// * `trace` - Whether to enable tracing.
  474. ///
  475. /// # Returns
  476. ///
  477. /// A new `Kernel` instance.
  478. pub async fn load_with_hot_state(
  479. pma_dir: PathBuf,
  480. jam_paths: JamPaths,
  481. kernel: &[u8],
  482. hot_state: &[HotEntry],
  483. trace: bool,
  484. ) -> Result<Self> {
  485. let jam_paths_arc = Arc::new(jam_paths);
  486. let kernel_vec = Vec::from(kernel);
  487. let hot_state_vec = Vec::from(hot_state);
  488. let pma_dir_arc = Arc::new(pma_dir);
  489. let serf = SerfThread::new(
  490. NOCK_STACK_SIZE, jam_paths_arc, kernel_vec, hot_state_vec, trace,
  491. )
  492. .await?;
  493. let terminator = Arc::new(AtomicBool::new(false));
  494. Ok(Self {
  495. serf,
  496. pma_dir: pma_dir_arc,
  497. terminator,
  498. })
  499. }
  500. /// Loads a kernel with default hot state.
  501. ///
  502. /// # Arguments
  503. ///
  504. /// * `snap_dir` - Directory for storing snapshots.
  505. /// * `kernel` - Byte slice containing the kernel code.
  506. /// * `trace` - Whether to enable tracing.
  507. ///
  508. /// # Returns
  509. ///
  510. /// A new `Kernel` instance.
  511. pub async fn load(
  512. pma_dir: PathBuf,
  513. jam_paths: JamPaths,
  514. kernel: &[u8],
  515. trace: bool,
  516. ) -> Result<Self> {
  517. Self::load_with_hot_state(pma_dir, jam_paths, kernel, &Vec::new(), trace).await
  518. }
  519. /// Loads a kernel with state from jammed bytes
  520. pub async fn load_with_kernel_state(
  521. pma_dir: PathBuf,
  522. jam_paths: JamPaths,
  523. kernel_jam: &[u8],
  524. state_bytes: &[u8],
  525. hot_state: &[HotEntry],
  526. trace: bool,
  527. ) -> Result<Self> {
  528. let kernel =
  529. Self::load_with_hot_state(pma_dir, jam_paths, kernel_jam, hot_state, trace).await?;
  530. match kernel
  531. .serf
  532. .load_state_from_bytes(Vec::from(state_bytes))
  533. .await
  534. {
  535. Ok(_) => {
  536. debug!("Successfully loaded state from bytes");
  537. Ok(kernel)
  538. }
  539. Err(e) => {
  540. error!("Failed to load state from state bytes: {}", e);
  541. error!("The state bytes format is not recognized. It should be either a JammedCheckpoint or an ExportedState.");
  542. Err(e)
  543. }
  544. }
  545. }
  546. /// Produces a checkpoint of the kernel state.
  547. pub async fn checkpoint(&self) -> Result<JammedCheckpoint> {
  548. self.serf.checkpoint().await
  549. }
  550. #[tracing::instrument(name = "nockapp::Kernel::poke", skip(self, cause))]
  551. pub async fn poke(&self, wire: WireRepr, cause: NounSlab) -> Result<NounSlab> {
  552. self.serf.poke(wire, cause).await
  553. }
  554. pub fn poke_sync(&self, wire: WireRepr, cause: NounSlab) -> Result<NounSlab> {
  555. self.serf.poke_sync(wire, cause)
  556. }
  557. pub fn peek_sync(&self, ovo: NounSlab) -> Result<NounSlab> {
  558. self.serf.peek_sync(ovo)
  559. }
  560. #[tracing::instrument(name = "nockapp::Kernel::peek", skip_all)]
  561. pub async fn peek(&mut self, ovo: NounSlab) -> Result<NounSlab> {
  562. self.serf.peek(ovo).await
  563. }
  564. pub async fn create_state_bytes(&self) -> Result<Vec<u8>> {
  565. self.serf.create_state_bytes().await
  566. }
  567. }
  568. /// Represents the Serf, which maintains context and provides an interface to
  569. /// the Sword.
  570. pub struct Serf {
  571. /// Version number of snapshot
  572. pub version: u32,
  573. /// Hash of boot kernel
  574. pub ker_hash: Hash,
  575. /// The current Arvo state.
  576. pub arvo: Noun,
  577. /// The interpreter context.
  578. pub context: interpreter::Context,
  579. /// The current event number.
  580. pub event_num: Arc<AtomicU64>,
  581. }
  582. impl Serf {
  583. /// Creates a new Serf instance.
  584. ///
  585. /// # Arguments
  586. ///
  587. /// * `stack` - The Nock stack.
  588. /// * `checkpoint` - Optional checkpoint to restore from.
  589. /// * `kernel_bytes` - Byte slice containing the kernel code.
  590. /// * `constant_hot_state` - Custom hot state entries.
  591. /// * `trace` - Bool indicating whether to enable nockvm tracing.
  592. ///
  593. /// # Returns
  594. ///
  595. /// A new `Serf` instance.
  596. fn new(
  597. mut stack: NockStack,
  598. checkpoint: Option<Checkpoint>,
  599. kernel_bytes: &[u8],
  600. constant_hot_state: &[HotEntry],
  601. trace: bool,
  602. ) -> Self {
  603. let hot_state = [URBIT_HOT_STATE, constant_hot_state].concat();
  604. let (cold, event_num_raw) = checkpoint.as_ref().map_or_else(
  605. || (Cold::new(&mut stack), 0),
  606. |snapshot| (snapshot.cold, snapshot.event_num),
  607. );
  608. let event_num = Arc::new(AtomicU64::new(event_num_raw));
  609. let trace_info = if trace {
  610. let file = File::create("trace.json").expect("Cannot create trace file trace.json");
  611. let pid = std::process::id();
  612. let process_start = std::time::Instant::now();
  613. Some(TraceInfo {
  614. file,
  615. pid,
  616. process_start,
  617. })
  618. } else {
  619. None
  620. };
  621. let mut context = create_context(stack, &hot_state, cold, trace_info);
  622. let version = checkpoint
  623. .as_ref()
  624. .map_or_else(|| SNAPSHOT_VERSION, |snapshot| snapshot.version);
  625. let mut arvo = {
  626. let kernel_trap = Noun::cue_bytes_slice(&mut context.stack, kernel_bytes)
  627. .expect("invalid kernel jam");
  628. let fol = T(&mut context.stack, &[D(9), D(2), D(0), D(1)]);
  629. if context.trace_info.is_some() {
  630. let start = Instant::now();
  631. let arvo = interpret(&mut context, kernel_trap, fol).unwrap_or_else(|err| {
  632. panic!(
  633. "Panicked with {err:?} at {}:{} (git sha: {:?})",
  634. file!(),
  635. line!(),
  636. option_env!("GIT_SHA")
  637. )
  638. });
  639. write_serf_trace_safe(&mut context, "boot", start);
  640. arvo
  641. } else {
  642. interpret(&mut context, kernel_trap, fol).unwrap_or_else(|err| {
  643. panic!(
  644. "Panicked with {err:?} at {}:{} (git sha: {:?})",
  645. file!(),
  646. line!(),
  647. option_env!("GIT_SHA")
  648. )
  649. })
  650. }
  651. };
  652. let mut hasher = Hasher::new();
  653. hasher.update(kernel_bytes);
  654. let ker_hash = hasher.finalize();
  655. let mut serf = Self {
  656. version,
  657. ker_hash,
  658. arvo,
  659. context,
  660. event_num,
  661. };
  662. if let Some(checkpoint) = checkpoint {
  663. if ker_hash != checkpoint.ker_hash {
  664. info!(
  665. "Kernel hash changed: {:?} -> {:?}",
  666. checkpoint.ker_hash, ker_hash
  667. );
  668. }
  669. arvo = serf.load(checkpoint.ker_state).expect("serf: load failed");
  670. }
  671. unsafe {
  672. serf.event_update(event_num_raw, arvo);
  673. serf.preserve_event_update_leftovers();
  674. }
  675. serf
  676. }
  677. /// Performs a peek operation on the Arvo state.
  678. ///
  679. /// # Arguments
  680. ///
  681. /// * `ovo` - The peek request noun.
  682. ///
  683. /// # Returns
  684. ///
  685. /// Result containing the peeked data or an error.
  686. #[tracing::instrument(skip_all)]
  687. pub fn peek(&mut self, ovo: Noun) -> Result<Noun> {
  688. if self.context.trace_info.is_some() {
  689. let trace_name = "peek";
  690. let start = Instant::now();
  691. let slam_res = self.slam(PEEK_AXIS, ovo);
  692. write_serf_trace_safe(&mut self.context, trace_name, start);
  693. slam_res
  694. } else {
  695. self.slam(PEEK_AXIS, ovo)
  696. }
  697. }
  698. /// Generates a goof (error) noun.
  699. ///
  700. /// # Arguments
  701. ///
  702. /// * `mote` - The error mote.
  703. /// * `traces` - Trace information.
  704. ///
  705. /// # Returns
  706. ///
  707. /// A noun representing the error.
  708. pub fn goof(&mut self, mote: Mote, traces: Noun) -> Noun {
  709. let tone = Cell::new(&mut self.context.stack, D(2), traces);
  710. let tang = mook(&mut self.context, tone, false)
  711. .expect("serf: goof: +mook crashed on bail")
  712. .tail();
  713. T(&mut self.context.stack, &[D(mote as u64), tang])
  714. }
  715. /// Performs a load operation on the Arvo state.
  716. ///
  717. /// # Arguments
  718. ///
  719. /// * `old` - The state to load.
  720. ///
  721. /// # Returns
  722. ///
  723. /// Result containing the loaded kernel or an error.
  724. pub fn load(&mut self, old: Noun) -> Result<Noun> {
  725. match self.soft(old, LOAD_AXIS, Some("load".to_string())) {
  726. Ok(res) => Ok(res),
  727. Err(goof) => {
  728. self.print_goof(goof);
  729. Err(CrownError::SerfLoadError)
  730. }
  731. }
  732. }
  733. pub fn print_goof(&mut self, goof: Noun) {
  734. let tang = goof
  735. .as_cell()
  736. .expect("print goof: expected goof to be a cell")
  737. .tail();
  738. tang.list_iter().for_each(|tank: Noun| {
  739. // TODO: Slogger should be emitting Results in case of failure
  740. self.context.slogger.slog(&mut self.context.stack, 1, tank);
  741. });
  742. }
  743. /// Performs a poke operation on the Arvo state.
  744. ///
  745. /// # Arguments
  746. ///
  747. /// * `job` - The poke job noun.
  748. ///
  749. /// # Returns
  750. ///
  751. /// Result containing the poke response or an error.
  752. #[tracing::instrument(level = "info", skip_all)]
  753. pub fn do_poke(&mut self, job: Noun) -> Result<Noun> {
  754. match self.soft(job, POKE_AXIS, Some("poke".to_string())) {
  755. Ok(res) => {
  756. let cell = res.as_cell().expect("serf: poke: +slam returned atom");
  757. let mut fec = cell.head();
  758. let eve = self.event_num.load(Ordering::SeqCst);
  759. unsafe {
  760. self.event_update(eve + 1, cell.tail());
  761. self.stack().preserve(&mut fec);
  762. self.preserve_event_update_leftovers();
  763. }
  764. Ok(fec)
  765. }
  766. Err(goof) => self.poke_swap(job, goof),
  767. }
  768. }
  769. /// Slams (applies) a gate at a specific axis of Arvo.
  770. ///
  771. /// # Arguments
  772. ///
  773. /// * `axis` - The axis to slam.
  774. /// * `ovo` - The sample noun.
  775. ///
  776. /// # Returns
  777. ///
  778. /// Result containing the slammed result or an error.
  779. pub fn slam(&mut self, axis: u64, ovo: Noun) -> Result<Noun> {
  780. let arvo = self.arvo;
  781. slam(&mut self.context, arvo, axis, ovo)
  782. }
  783. /// Performs a "soft" computation, handling errors gracefully.
  784. ///
  785. /// # Arguments
  786. ///
  787. /// * `ovo` - The input noun.
  788. /// * `axis` - The axis to slam.
  789. /// * `trace_name` - Optional name for tracing.
  790. ///
  791. /// # Returns
  792. ///
  793. /// Result containing the computed noun or an error noun.
  794. fn soft(&mut self, ovo: Noun, axis: u64, trace_name: Option<String>) -> Result<Noun, Noun> {
  795. let slam_res = if self.context.trace_info.is_some() {
  796. let start = Instant::now();
  797. let slam_res = self.slam(axis, ovo);
  798. write_serf_trace_safe(
  799. &mut self.context,
  800. trace_name.as_ref().unwrap_or_else(|| {
  801. panic!(
  802. "Panicked at {}:{} (git sha: {:?})",
  803. file!(),
  804. line!(),
  805. option_env!("GIT_SHA")
  806. )
  807. }),
  808. start,
  809. );
  810. slam_res
  811. } else {
  812. self.slam(axis, ovo)
  813. };
  814. match slam_res {
  815. Ok(res) => Ok(res),
  816. Err(error) => match error {
  817. CrownError::InterpreterError(e) => {
  818. let (mote, traces) = match e.0 {
  819. Error::Deterministic(mote, traces)
  820. | Error::NonDeterministic(mote, traces) => (mote, traces),
  821. Error::ScryBlocked(_) | Error::ScryCrashed(_) => {
  822. panic!("serf: soft: .^ invalid outside of virtual Nock")
  823. }
  824. };
  825. Err(self.goof(mote, traces))
  826. }
  827. _ => Err(D(0)),
  828. },
  829. }
  830. }
  831. /// Plays a list of events.
  832. ///
  833. /// # Arguments
  834. ///
  835. /// * `lit` - The list of events to play.
  836. ///
  837. /// # Returns
  838. ///
  839. /// Result containing the final Arvo state or an error.
  840. fn play_list(&mut self, mut lit: Noun) -> Result<Noun> {
  841. let mut eve = self.event_num.load(Ordering::SeqCst);
  842. while let Ok(cell) = lit.as_cell() {
  843. let ovo = cell.head();
  844. lit = cell.tail();
  845. let trace_name = if self.context.trace_info.is_some() {
  846. Some(format!("play [{}]", eve))
  847. } else {
  848. None
  849. };
  850. match self.soft(ovo, POKE_AXIS, trace_name) {
  851. Ok(res) => {
  852. let arvo = res.as_cell()?.tail();
  853. eve += 1;
  854. unsafe {
  855. self.event_update(eve, arvo);
  856. self.context.stack.preserve(&mut lit);
  857. self.preserve_event_update_leftovers();
  858. }
  859. }
  860. Err(goof) => {
  861. return Err(CrownError::KernelError(Some(goof)));
  862. }
  863. }
  864. }
  865. Ok(self.arvo)
  866. }
  867. /// Handles a poke error by swapping in a new event.
  868. ///
  869. /// # Arguments
  870. ///
  871. /// * `job` - The original poke job.
  872. /// * `goof` - The error noun.
  873. ///
  874. /// # Returns
  875. ///
  876. /// Result containing the new event or an error.
  877. fn poke_swap(&mut self, job: Noun, goof: Noun) -> Result<Noun> {
  878. let stack = &mut self.context.stack;
  879. self.context.cache = Hamt::<Noun>::new(stack);
  880. let job_cell = job.as_cell().expect("serf: poke: job not a cell");
  881. // job data is job without event_num
  882. let job_data = job_cell
  883. .tail()
  884. .as_cell()
  885. .expect("serf: poke: data not a cell");
  886. // job input is job without event_num or wire
  887. let job_input = job_data.tail();
  888. let wire = T(stack, &[D(0), D(tas!(b"arvo")), D(0)]);
  889. let crud = DirectAtom::new_panic(tas!(b"crud"));
  890. let event_num = D(self.event_num.load(Ordering::SeqCst) + 1);
  891. let mut ovo = T(stack, &[event_num, wire, goof, job_input]);
  892. let trace_name = if self.context.trace_info.is_some() {
  893. Some(Self::poke_trace_name(
  894. &mut self.context.stack,
  895. wire,
  896. crud.as_atom(),
  897. ))
  898. } else {
  899. None
  900. };
  901. match self.soft(ovo, POKE_AXIS, trace_name) {
  902. Ok(res) => {
  903. let cell = res.as_cell().expect("serf: poke: crud +slam returned atom");
  904. let mut fec = cell.head();
  905. let eve = self.event_num.load(Ordering::SeqCst);
  906. unsafe {
  907. self.event_update(eve + 1, cell.tail());
  908. self.context.stack.preserve(&mut ovo);
  909. self.context.stack.preserve(&mut fec);
  910. self.preserve_event_update_leftovers();
  911. }
  912. Ok(self.poke_bail(eve, eve, ovo, fec))
  913. }
  914. Err(goof_crud) => {
  915. let stack = &mut self.context.stack;
  916. let lud = T(stack, &[goof_crud, goof, D(0)]);
  917. Ok(self.poke_bail_noun(lud))
  918. }
  919. }
  920. }
  921. /// Generates a trace name for a poke operation.
  922. ///
  923. /// # Arguments
  924. ///
  925. /// * `stack` - The Nock stack.
  926. /// * `wire` - The wire noun.
  927. /// * `vent` - The vent atom.
  928. ///
  929. /// # Returns
  930. ///
  931. /// A string representing the trace name.
  932. fn poke_trace_name(stack: &mut NockStack, wire: Noun, vent: Atom) -> String {
  933. let wpc = path_to_cord(stack, wire);
  934. let wpc_len = met3_usize(wpc);
  935. let wpc_bytes = &wpc.as_ne_bytes()[0..wpc_len];
  936. let wpc_str = match std::str::from_utf8(wpc_bytes) {
  937. Ok(valid) => valid,
  938. Err(error) => {
  939. let (valid, _) = wpc_bytes.split_at(error.valid_up_to());
  940. unsafe { std::str::from_utf8_unchecked(valid) }
  941. }
  942. };
  943. let vc_len = met3_usize(vent);
  944. let vc_bytes = &vent.as_ne_bytes()[0..vc_len];
  945. let vc_str = match std::str::from_utf8(vc_bytes) {
  946. Ok(valid) => valid,
  947. Err(error) => {
  948. let (valid, _) = vc_bytes.split_at(error.valid_up_to());
  949. unsafe { std::str::from_utf8_unchecked(valid) }
  950. }
  951. };
  952. format!("poke [{} {}]", wpc_str, vc_str)
  953. }
  954. /// Performs a poke operation with a given cause.
  955. ///
  956. /// # Arguments
  957. ///
  958. /// * `wire` - The wire noun.
  959. /// * `cause` - The cause noun.
  960. ///
  961. /// # Returns
  962. ///
  963. /// Result containing the poke response or an error.
  964. #[tracing::instrument(level = "info", skip_all, fields(
  965. wire_source = wire.source
  966. ))]
  967. pub fn poke(&mut self, wire: WireRepr, cause: Noun) -> Result<Noun> {
  968. let random_bytes = rand::random::<u64>();
  969. let bytes = random_bytes.as_bytes()?;
  970. let eny: Atom = Atom::from_bytes(&mut self.context.stack, &bytes);
  971. let our = <nockvm::noun::Atom as AtomExt>::from_value(&mut self.context.stack, 0)?; // Using 0 as default value
  972. let now: Atom = unsafe {
  973. let mut t_vec: Vec<u8> = vec![];
  974. t_vec.write_u128::<LittleEndian>(current_da().0)?;
  975. IndirectAtom::new_raw_bytes(&mut self.context.stack, 16, t_vec.as_slice().as_ptr())
  976. .normalize_as_atom()
  977. };
  978. let event_num = D(self.event_num.load(Ordering::SeqCst) + 1);
  979. let base_wire_noun = wire_to_noun(&mut self.context.stack, &wire);
  980. let wire = T(&mut self.context.stack, &[D(tas!(b"poke")), base_wire_noun]);
  981. let poke = T(
  982. &mut self.context.stack,
  983. &[event_num, wire, eny.as_noun(), our.as_noun(), now.as_noun(), cause],
  984. );
  985. self.do_poke(poke)
  986. }
  987. /// Updates the Serf's state after an event.
  988. ///
  989. /// # Arguments
  990. ///
  991. /// * `new_event_num` - The new event number.
  992. /// * `new_arvo` - The new Arvo state.
  993. ///
  994. /// # Safety
  995. ///
  996. /// This function is unsafe because it modifies the Serf's state directly.
  997. #[tracing::instrument(level = "info", skip_all)]
  998. pub unsafe fn event_update(&mut self, new_event_num: u64, new_arvo: Noun) {
  999. self.arvo = new_arvo;
  1000. self.event_num.store(new_event_num, Ordering::SeqCst);
  1001. self.context.cache = Hamt::new(&mut self.context.stack);
  1002. self.context.scry_stack = D(0);
  1003. }
  1004. /// Preserves leftovers after an event update.
  1005. ///
  1006. /// # Safety
  1007. ///
  1008. /// This function is unsafe because it modifies the Serf's state directly.
  1009. #[tracing::instrument(level = "info", skip_all)]
  1010. pub unsafe fn preserve_event_update_leftovers(&mut self) {
  1011. let stack = &mut self.context.stack;
  1012. stack.preserve(&mut self.context.warm);
  1013. stack.preserve(&mut self.context.hot);
  1014. stack.preserve(&mut self.context.cache);
  1015. stack.preserve(&mut self.context.cold);
  1016. stack.preserve(&mut self.arvo);
  1017. stack.flip_top_frame(0);
  1018. }
  1019. /// Returns a mutable reference to the Nock stack.
  1020. ///
  1021. /// # Returns
  1022. ///
  1023. /// A mutable reference to the `NockStack`.
  1024. pub fn stack(&mut self) -> &mut NockStack {
  1025. &mut self.context.stack
  1026. }
  1027. /// Creates a poke swap noun.
  1028. ///
  1029. /// # Arguments
  1030. ///
  1031. /// * `eve` - The event number.
  1032. /// * `mug` - The mug value.
  1033. /// * `ovo` - The original noun.
  1034. /// * `fec` - The effect noun.
  1035. ///
  1036. /// # Returns
  1037. ///
  1038. /// A noun representing the poke swap.
  1039. pub fn poke_bail(&mut self, eve: u64, mug: u64, ovo: Noun, fec: Noun) -> Noun {
  1040. T(
  1041. self.stack(),
  1042. &[D(tas!(b"poke")), D(tas!(b"swap")), D(eve), D(mug), ovo, fec],
  1043. )
  1044. }
  1045. /// Creates a poke bail noun.
  1046. ///
  1047. /// # Arguments
  1048. ///
  1049. /// * `lud` - The lud noun.
  1050. ///
  1051. /// # Returns
  1052. ///
  1053. /// A noun representing the poke bail.
  1054. pub fn poke_bail_noun(&mut self, lud: Noun) -> Noun {
  1055. T(self.stack(), &[D(tas!(b"poke")), D(tas!(b"bail")), lud])
  1056. }
  1057. }
  1058. fn slot(noun: Noun, axis: u64) -> Result<Noun> {
  1059. Ok(noun.slot(axis)?)
  1060. }
  1061. #[cfg(test)]
  1062. mod tests {
  1063. use super::*;
  1064. use std::fs;
  1065. use std::path::Path;
  1066. use tempfile::TempDir;
  1067. async fn setup_kernel(jam: &str) -> (Kernel, TempDir) {
  1068. let temp_dir = TempDir::new().expect("Failed to create temp directory");
  1069. let snap_dir = temp_dir.path().to_path_buf();
  1070. let jam_paths = JamPaths::new(&snap_dir);
  1071. let jam_path = Path::new(env!("CARGO_MANIFEST_DIR"))
  1072. .join("..")
  1073. .join("assets")
  1074. .join(jam);
  1075. let jam_bytes =
  1076. fs::read(jam_path).unwrap_or_else(|_| panic!("Failed to read {} file", jam));
  1077. let kernel = Kernel::load(snap_dir, jam_paths, &jam_bytes, false)
  1078. .await
  1079. .expect("Could not load kernel");
  1080. (kernel, temp_dir)
  1081. }
  1082. // Convert this to an integration test and feed it the kernel.jam from Choo in CI/CD
  1083. // https://www.youtube.com/watch?v=4m1EFMoRFvY
  1084. // #[test]
  1085. // #[cfg_attr(miri, ignore)]
  1086. // fn test_kernel_boot() {
  1087. // let _ = setup_kernel("dumb.jam");
  1088. // }
  1089. // To test your own kernel, place a `kernel.jam` file in the `assets` directory
  1090. // and uncomment the following test:
  1091. //
  1092. // #[test]
  1093. // fn test_custom_kernel() {
  1094. // let (kernel, _temp_dir) = setup_kernel("kernel.jam");
  1095. // // Add your custom assertions here to test the kernel's behavior
  1096. // }
  1097. }