A powerful framework to manage a pack of co-processors with few lines of code
If you read the previous post on DPU toolchain, you understood that all the tools necessary to write and test DPU programs are available. As a second step, developers can write an application that commands and controls a fleet of thousands of DPUs, access them via a collection of C functions offering the bare minimum necessary to pilot co-processors.
However, when time comes to scale up to rich applications controlling a huge number of co-processors, such an API is sufficient, but requires a significant amount of work:
- Leveraging from C to a language with higher levels of abstraction
- Implementing the glue that maps DPUs (the devices) to their actual functionality (the programs)
- Ensuring that the program can be easily tested and integrated within a product
Kelonia framework helps in solving those problems by offering an environment that makes Processors In Memory (nearly) invisible from the application standpoint.
In this post, we invite you to discover how a simple DPU program becomes a part of a much larger software system, thanks to Kelonia.
Framework overview
Kelonia is thought by and for people who have agile development life-cycles in mind. As such, it isolates the different parts of the programming process, facilitates continuous testing and integration of individual bricks and provides the necessary framework to smoothly assemble them all together.
From the application standpoint, a “DPU” is an “object” representing both the physical device and the program: a fleet is a collection of components defining methods to trigger the activity on an associated DPU:
- As a functional co-processor: boot the DPU with the application supplied parameters and retrieve the computed results
- In an actor-driven design: send messages to specific actors
As we will see later, this abstract representation of DPUs enables stubbing their actual implementations, so that the application development can be completed in parallel with the design of DPU program. The only thing that both ends need to clearly agree on is the “interface”.
An interface is basically the decisions taken by the DPU program designer in terms of interaction with the host environment. For example, one may decide that “input parameters are posted into individual mailboxes”, “an input buffer resides at address X of the DRAM”, etc. Kelonia requires this contract to be defined with our Device Interface Definition Language (DIDL), which is translated to sequences of operations in a given target language by UPMEM SDK tool dpugeni.
The framework itself is designed according to a well-known paradigm defining a technical layer on the one hand and business layer on the other:
The technical layer (Kelonia core) is a collection of interfaces and implementations requiring an assembly procedure to create an abstraction of a DPU fleet. The business layer (Kelonia foundry) defines specific assemblies to cover the common use cases so that a user can create a fleet of DPUs in very few lines of code. We intend to deliver the framework source code so that developers needing specific assemblies not covered by our foundry can cook their own recipes.
Example
To illustrate how Kelonia works, let’s start with a simple example: use one DPU to count the occurrences of a character within a text.
We first design a DPU program that counts characters in a document and a Java function that loads a document into the UPMEM DPU DRAM and schedules the DPU to perform the count.
DPU Program
The DPU program is a functional co-processor, expecting the text to reside in DRAM and the host to provide the text length and the character to be counted via the shared system mailbox. Upon new request, the program schedules 16 tasklets individually counting the character occurrences within a portion of DRAM and writing back the result into their respective individual mailboxes.
This implementation implies, of course, that the application sums all the results provided by individual tasklets to get the final one.
To summarize:
- 16 identical tasklets
- Read the document size (first word of the system mailbox) and the requested character (second word of the system mailbox)
- Count the number of occurrences of the requested character in a specific area of DRAM
- Write back the counter value into the first word of their respective mailbox
The source code can be found in the appendix of this post.
The interface
As we can see, the DPU program implicitly requires the host to comply with the interface defined above.
Our DPU DIDL (Device Interface Definition Language) allows you to specify this contract in a form that is understandable by the application. It basically consists of declaring a function, with two input parameters (the document length and the requested character) and a table of 16 results, then “map” those fields as DPU resources:
Character counter: Device Interface Description
function countCharacters( documentLength: in u32, requestedChar: in u8, counters: out u32[16] ) { // Document length and requested character are sent through the // system mailbox, shared by tasklets. documentLength <- sysmbox[0], requestedChar <- sysmbox[1], // Every tasklet computes the number of characters found on a specific // area of memory. The result is written back into the first word // of its mailbox. counters <- mbox[0..15][0] }
The result produced by dpugeni is a Java interface and a class including three functions:
- The declared function:
- void countCharacters(int documentLength, char requestedChar, int counters[16])
- Two functions to access the DRAM, automatically created by dpugeni:
- void copyToDevice(int destination, byte[] data)
- byte[] copyFromDevice(int source, int length)
Integration in Java
The Kelonia foundry provides a model to drive DPUs designed as a functional co-processor directly. This is called the basic model.
In the below example, we demonstrate how to drive a single DPU with the basic model.
The first thing to do is to get a “device description” of the DPU, as illustrated below. The first argument is an “instance” of the device, the second argument is the binary file containing the program executed by this DPU:
public void countCharacterWithOneDpu() throws DpuFoundryException, DpuLowLevelException {
DpuDeviceDescription charCountDevice =
new DpuDeviceDescription(CharcountInterface.Charcount.instance(), "charcount.bin");
From this device description, the Kelonia foundry can create a fleet, which in this very simple case includes only one device. The provided object is an assembly, which must be cleared when the program ends:
BasicDpuAssembly assembly = DpuFoundry.targeting(JAVA).build(BasicDpuAssemblyLine.usingDpu(charCountDevice));
try {
...
} finally {
assembly.clear();
}
A device interface provides the ability to request the device with intermediate objects called workers. Workers are associated to tasks, defined by the interface; in our case:
-
CharcountInterface.Task.countCharacters
- allows to invoke the co-processing function “countCharacters”
-
CharcountInterface.Task.memoryCopy
- allows to invoke the functions that copy data to and from the device’s DRAM
In practice, the two snippets below show how to obtain the workers:
CharcountInterface.CountCharactersWorker worker =
assembly.getWorker(CharcountInterface.Task.countCharacters);
CharcountInterface.MemoryCopyWorker copyWorker =
assembly.getWorker(CharcountInterface.Task.memoryCopy);
Once the application gets the workers, it can copy data into the DPU DRAM and invoke the co-processor’s function as if they were native Java functions:
byte[] myText() = loadTextFromFile();
copyWorker.copyToDevice(0, myText);
System.out.println("starting task");
int[ ] counters = worker.countCharacters(myText.length, (byte) 'a');
System.out.println("found " + IntStream.of(counters).sum() + " occurrences of 'a'");
Using large fleets of DPUs
Now let’s have a look at how Kelonia facilitates the management of a basic assembly with a large number of DPUs. The code is similar, except that:
- The application provides the number of DPUs (nrDpus) to the foundry
- Workers are per function and per DPU (dpuNumber)
DpuDeviceDescription charCountDevice = new DpuDeviceDescription(CharcountInterface.Charcount.instance(), "charcount.bin"); final int nrDpus = ...; BasicDpuAssembly assembly = DpuFoundry.targeting(JAVA).build(BasicDpuAssemblyLine.usingDpus(nrDpus, charCountDevice));
final CharcountInterface.CountCharactersWorker worker = assembly.getWorker(dpuNumber, CharcountInterface.Task.countCharacters); final CharcountInterface.MemoryCopyWorker copyWorker = assembly.getWorker(dpuNumber, CharcountInterface.Task.memoryCopy);
For example, let’s consider a character counter processing a very large file. The application can schedule N DPUs to perform the operation using a very simple consumer/producer strategy: as illustrated in the figure below:
- A DataProvider delivers subsequent blocks of 64MB upon request (function getMoreData)
- A multi-thread safe counter, CountResult, adds a given number of character occurrence upon call to function add
- Each DPU is managed by an individual thread achieving a loop that:
- Gets a block of 64MB
- Copies that block into the DPU DRAM
- Schedules the DPU function countCharacters
- Adds the result to CountResult
- Threads are looping in parallel until the file is entirely read
Each thread implements the following function:
One thread driving a given DPU
private static Thread counterThread(int dpuNumber, DataProvider file, final CountResult result, BasicDpuAssembly assembly) {
final CharcountInterface.CountCharactersWorker worker = assembly.getWorker(dpuNumber, CharcountInterface.Task.countCharacters); final CharcountInterface.MemoryCopyWorker copyWorker = assembly.getWorker(dpuNumber, CharcountInterface.Task.memoryCopy); Thread t = new Thread(() -> { try { byte[] myText; do { myText = file.getMoreData(); if (myText != null) { copyWorker.copyToDevice(0, myText); int[] counters = worker.countCharacters(myText.length, (byte) 'a'); result.add(IntStream.of(counters).sum()); } } while (myText != null); } catch (DpuLowLevelException e) { System.out.println("DPU " + dpuNumber + " caught " + e); } }); t.start(); return t; }
The main routine can now schedule the threads and wait for the process to complete:
public void countCharacterWithMultipleDpu() throws DpuFoundryException { DpuDeviceDescription charCountDevice = new DpuDeviceDescription(CharcountInterface.Charcount.instance(), "charcount.bin"); final int nrDpus = N; BasicDpuAssembly assembly = DpuFoundry.targeting(JAVA).build(BasicDpuAssemblyLine.usingDpus(nrDpus, charCountDevice)); List<Thread> threads = new ArrayList<>(); DataProvider provider = new DataProvider(); CountResult result = new CountResult(); try { for (int eachDpu = 0; eachDpu < nrDpus; eachDpu++) { threads.add(counterThread(eachDpu, provider, result, assembly)); } threads.forEach(t -> { try { t.join(); } catch (InterruptedException ignored) { } }); System.out.println("Found " + result.getResultValue() + " 'a's"); } finally { assembly.clear(); } }
Stubbing and mocking DPUs
Developing complex architectures often requires the ability to “plug or unplug” modules during the design and validation stages. In our case, this means being able to execute an application with or without DPUs.
A typical illustration consists of writing tests where DPUs are – temporarily or not – replaced by objects that mimic their behavior, commonly called “mock objects”.
We experimented Kelonia with Mockito – a popular mocking framework – to demonstrate its compatibility with this practice. It will work the same way with any other similar framework, such as JMock or EasyMock.
The simplest way to mock DPUs with Kelonia is to override the interface generated by dpugeni, in order to return mocks of the requested workers, as shown below:
CharcountInterface charcountInterface = new CharcountInterface.StubbedCharcount() { @Override public Map<WorkerFactory, Set<WorkerTaskDescription>> workerFactories() { CharcountInterface.MemoryCopyWorker memoryWorker = mock(CharcountInterface.MemoryCopyWorker.class); CharcountInterface.CountCharactersWorker countWorker = mock(CharcountInterface.CountCharactersWorker.class); try { when(countWorker.countCharacters(anyInt(), anyByte())).thenReturn(new int[]{1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144}); Mockito.doNothing().when(memoryWorker).copyToDevice(anyInt(), any()); } catch (DpuLowLevelException e) { // This would not happen } return new CharcountInterface.StubbedCharcount.WorkerFactoryBuilder() .withWorker(dpu -> countWorker, Task.countCharacters) .withWorker(dpu -> memoryWorker, Task.memoryCopy) .build(); } }; countDevice = new DpuDeviceDescription(charcountInterface, binary, DpuBackend.Stubbed);
Maven integration
Framework integration today necessarily implies considering easy integration with standard build systems, such as maven, Graddle or SBT.
Our maven plugin simplifies the development and deployment of applications with DPUs. The plugin comes with a default file organization, which can be changed if needed:
- In src/main/dpu: the DPU program sources and device interface definitions,
- In target/generated-sources: the interface classes generated by dpugeni, treated as source files belonging to the application like any other java source
- In target/generated-resources: the DPU binary program that will be embedded as a resource file in the final jar
A typical usage of this plugin in a maven build system is quite simple for people familiar with this build tool:
<plugin> <groupId>com.upmem</groupId> <artifactId>dpu-build-maven-plugin</artifactId> <executions> <execution> <goals> <goal>dpu-build</goal> </goals> <configuration> <inputs> <input>com/mycompany/myapp</input> </inputs> </configuration> </execution> </executions> </plugin>
Kelonia is a powerful framework:
- Managing thousands of co-processors as simple software entities
- Designed for easy development of complex applications
- Unifying the build of individual bricks when using maven
Kelonia is developing very fast and will soon integrate many more features.
We are definitely open to feedbacks and suggestions.
Please feel free to contact us.
Appendix: DPU program source code
The C code:
Character counter: DPU program
int charcount() { /* Get the document length and requested character from the system mailbox. */ uint32_t *host_request = (uint32_t *) sys_mbox_recv(); unsigned int document_len = host_request[0]; uint32_t characters = host_request[1] & 0xff; /* Repeat the requested characters four times, so that we can optimize the search with cmpb4. */ characters |= ((characters << 8) | (characters << 16) | (characters << 24)); /* The number of occurrences found by this tasklet. */ unsigned int result = 0; /* * Memory distribution using a rake: tasklet number X first parses * 256 bytes at address X*256. It then processes 256 bytes at * address (X*256 + 16*256) and so on. * * cache contains the current 256 bytes to fetch. */ uint32_t *cache = dma_alloc(256); mram_addr_t my_block_address = (mram_addr_t) (me() << 8); for (; my_block_address < document_len; my_block_address += (16 << 8)) { mram_ll_read256(my_block_address, cache); /* * Compare every byte in the block with the requested character. * * We use the DPU instruction cmpb4 to compare 4 characters at a time. */ int i, j; for (i = 0; i < 64; i++) { /* Compare 4 bytes at a time. */ __builtin_cmpb4_rrr(j, cache[i], characters); /* And count how many ones are in the resulting comparison. */ __builtin_cao_rr(j, j); result += j; } } mbox_send(&result, sizeof(result)); return 0; }
The corresponding Runtime Environment script:
Character counter: RTE
// // Copyright (c) 2014-2016 - uPmem // // Character count test program configuration // Distribute the character counter on 16 tasklets. // Each tasklet is in charge of a specific area of memory // and writes back the number of character occurrences found // within this area into its mailbox. tasklet add 0..15 medium charcount 1 // The shared mailbox contains 2 words: the document length // and the character to find. sysmbox 2 save quit