Skip to content

Commit 65a2746

Browse files
committed
Finished example of LLAMA cloud
1 parent fe5432d commit 65a2746

File tree

1 file changed

+188
-2
lines changed

1 file changed

+188
-2
lines changed

src/KernelMemory.Extensions.ConsoleTest/Samples/CustomParsersSample.cs

Lines changed: 188 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,16 @@
55
using Microsoft.KernelMemory;
66
using Microsoft.KernelMemory.Context;
77
using Microsoft.KernelMemory.DataFormats;
8+
using Microsoft.KernelMemory.Diagnostics;
89
using Microsoft.KernelMemory.DocumentStorage.DevTools;
910
using Microsoft.KernelMemory.FileSystem.DevTools;
1011
using Microsoft.KernelMemory.Handlers;
1112
using Microsoft.KernelMemory.MemoryStorage.DevTools;
13+
using Microsoft.KernelMemory.Pipeline;
14+
using System.Security.Cryptography;
15+
using System.Text;
16+
using System.Linq;
17+
using HandlebarsDotNet.Extensions;
1218

1319
namespace SemanticMemory.Samples;
1420

@@ -48,6 +54,9 @@ public async Task RunSample(string fileToParse)
4854
TextPartitioningHandler textPartitioning = new("partition", orchestrator);
4955
await orchestrator.AddHandlerAsync(textPartitioning);
5056

57+
CustomSamplePartitioningHandler customMarkdownPartition = new("markdownpartition", orchestrator);
58+
await orchestrator.AddHandlerAsync(customMarkdownPartition);
59+
5160
GenerateEmbeddingsHandler textEmbedding = new("gen_embeddings", orchestrator);
5261
await orchestrator.AddHandlerAsync(textEmbedding);
5362

@@ -66,11 +75,12 @@ public async Task RunSample(string fileToParse)
6675
new TagCollection { { "example", "books" } })
6776
.AddUploadFile(fileName, fileName, fileToParse)
6877
.Then("extract")
69-
.Then("partition")
78+
//.Then("partition")
79+
.Then("markdownpartition")
7080
.Then("gen_embeddings")
7181
.Then("save_records");
7282

73-
contextProvider.AddLLamaCloudParserOptions(fileName, "This is a manual for Dreame vacuum cleaner, I need you to extract a series of sections that can be useful for an helpdesk to answer user questions. You will create sections where each sections contains a question and an answer taken from the text.");
83+
contextProvider.AddLLamaCloudParserOptions(fileName, "This is a manual for Dreame vacuum cleaner, I need you to extract a series of sections that can be useful for an helpdesk to answer user questions. You will create sections where each sections contains a question and an answer taken from the text. Each question will be separated with ---");
7484

7585
var pipeline = pipelineBuilder.Build();
7686
await orchestrator.RunPipelineAsync(pipeline);
@@ -162,3 +172,179 @@ private static IKernelMemoryBuilder CreateBasicKernelMemoryBuilder(
162172
return kernelMemoryBuilder;
163173
}
164174
}
175+
176+
public sealed class CustomSamplePartitioningHandler : IPipelineStepHandler
177+
{
178+
private readonly IPipelineOrchestrator _orchestrator;
179+
private readonly ILogger<TextPartitioningHandler> _log;
180+
181+
/// <inheritdoc />
182+
public string StepName { get; }
183+
184+
/// <summary>
185+
/// Handler responsible for partitioning text in small chunks.
186+
/// Note: stepName and other params are injected with DI.
187+
/// </summary>
188+
/// <param name="stepName">Pipeline step for which the handler will be invoked</param>
189+
/// <param name="orchestrator">Current orchestrator used by the pipeline, giving access to content and other helps.</param>
190+
/// <param name="options">The customize text partitioning option</param>
191+
/// <param name="loggerFactory">Application logger factory</param>
192+
public CustomSamplePartitioningHandler(
193+
string stepName,
194+
IPipelineOrchestrator orchestrator,
195+
ILoggerFactory? loggerFactory = null)
196+
{
197+
this.StepName = stepName;
198+
this._orchestrator = orchestrator;
199+
200+
this._log = (loggerFactory ?? DefaultLogger.Factory).CreateLogger<TextPartitioningHandler>();
201+
this._log.LogInformation("Handler '{0}' ready", stepName);
202+
}
203+
204+
/// <inheritdoc />
205+
public async Task<(ReturnType returnType, DataPipeline updatedPipeline)> InvokeAsync(
206+
DataPipeline pipeline, CancellationToken cancellationToken = default)
207+
{
208+
this._log.LogDebug("Markdown question Partitioning text, pipeline '{0}/{1}'", pipeline.Index, pipeline.DocumentId);
209+
210+
if (pipeline.Files.Count == 0)
211+
{
212+
this._log.LogWarning("Pipeline '{0}/{1}': there are no files to process, moving to next pipeline step.", pipeline.Index, pipeline.DocumentId);
213+
return (ReturnType.Success, pipeline);
214+
}
215+
216+
var context = pipeline.GetContext();
217+
218+
foreach (DataPipeline.FileDetails uploadedFile in pipeline.Files)
219+
{
220+
// Track new files being generated (cannot edit originalFile.GeneratedFiles while looping it)
221+
Dictionary<string, DataPipeline.GeneratedFileDetails> newFiles = [];
222+
223+
foreach (KeyValuePair<string, DataPipeline.GeneratedFileDetails> generatedFile in uploadedFile.GeneratedFiles)
224+
{
225+
var file = generatedFile.Value;
226+
if (file.AlreadyProcessedBy(this))
227+
{
228+
this._log.LogTrace("File {0} already processed by this handler", file.Name);
229+
continue;
230+
}
231+
232+
// Partition only the original text
233+
if (file.ArtifactType != DataPipeline.ArtifactTypes.ExtractedText)
234+
{
235+
this._log.LogTrace("Skipping file {0} (not original text)", file.Name);
236+
continue;
237+
}
238+
239+
// Use a different partitioning strategy depending on the file type
240+
BinaryData partitionContent = await this._orchestrator.ReadFileAsync(pipeline, file.Name, cancellationToken).ConfigureAwait(false);
241+
string partitionsMimeType = MimeTypes.MarkDown;
242+
243+
// Skip empty partitions. Also: partitionContent.ToString() throws an exception if there are no bytes.
244+
if (partitionContent.IsEmpty) { continue; }
245+
int partition = 1;
246+
switch (file.MimeType)
247+
{
248+
case MimeTypes.MarkDown:
249+
{
250+
this._log.LogDebug("Partitioning MarkDown file {0}", file.Name);
251+
string content = partitionContent.ToString();
252+
partitionsMimeType = MimeTypes.MarkDown;
253+
254+
var sb = new StringBuilder(1024);
255+
using (var reader = new StringReader(content))
256+
{
257+
string? line;
258+
while ((line = reader.ReadLine()) != null)
259+
{
260+
if (string.IsNullOrWhiteSpace(line))
261+
{
262+
continue;
263+
}
264+
265+
if (line.StartsWith("---"))
266+
{
267+
partition = await AddSegment(pipeline, uploadedFile, newFiles, partitionsMimeType, partition, sb, cancellationToken).ConfigureAwait(false);
268+
sb.Clear();
269+
continue;
270+
}
271+
272+
sb.AppendLine(line);
273+
}
274+
}
275+
276+
// Write remaining content if any
277+
if (sb.Length > 0)
278+
{
279+
await AddSegment(pipeline, uploadedFile, newFiles, partitionsMimeType, partition, sb, cancellationToken).ConfigureAwait(false);
280+
}
281+
282+
break;
283+
}
284+
285+
default:
286+
this._log.LogWarning("File {0} cannot be partitioned, type '{1}' not supported", file.Name, file.MimeType);
287+
// Don't partition other files
288+
continue;
289+
}
290+
}
291+
292+
// Add new files to pipeline status
293+
foreach (var file in newFiles)
294+
{
295+
uploadedFile.GeneratedFiles.Add(file.Key, file.Value);
296+
}
297+
}
298+
299+
return (ReturnType.Success, pipeline);
300+
}
301+
302+
private async Task<int> AddSegment(DataPipeline pipeline, DataPipeline.FileDetails uploadedFile, Dictionary<string, DataPipeline.GeneratedFileDetails> newFiles, string partitionsMimeType, int partition, StringBuilder sb, CancellationToken cancellationToken)
303+
{
304+
if (sb.Length == 0)
305+
{
306+
//do not increment partition, an empty segment is not a segment.
307+
return partition;
308+
}
309+
310+
var text = sb.ToString().Trim('\n', '\r', ' ');
311+
312+
//is empty after trimming?
313+
if (string.IsNullOrWhiteSpace(text))
314+
{
315+
//do not increment partition, an empty segment is not a segment.
316+
return partition;
317+
}
318+
319+
var destFile = uploadedFile.GetPartitionFileName(partition);
320+
var textData = new BinaryData(sb.ToString());
321+
await this._orchestrator.WriteFileAsync(pipeline, destFile, textData, cancellationToken).ConfigureAwait(false);
322+
323+
var destFileDetails = new DataPipeline.GeneratedFileDetails
324+
{
325+
Id = Guid.NewGuid().ToString("N"),
326+
ParentId = uploadedFile.Id,
327+
Name = destFile,
328+
Size = sb.Length,
329+
MimeType = partitionsMimeType,
330+
ArtifactType = DataPipeline.ArtifactTypes.TextPartition,
331+
PartitionNumber = partition,
332+
SectionNumber = 1,
333+
Tags = pipeline.Tags,
334+
ContentSHA256 = textData.CalculateSHA256(),
335+
};
336+
newFiles.Add(destFile, destFileDetails);
337+
destFileDetails.MarkProcessedBy(this);
338+
partition++;
339+
return partition;
340+
}
341+
}
342+
343+
internal static class BinaryDataExtensions
344+
{
345+
public static string CalculateSHA256(this BinaryData binaryData)
346+
{
347+
byte[] byteArray = SHA256.HashData(binaryData.ToMemory().Span);
348+
return Convert.ToHexString(byteArray).ToLowerInvariant();
349+
}
350+
}

0 commit comments

Comments
 (0)