Files
1brc/zig/src/main.zig

176 lines
4.9 KiB
Zig

const std = @import("std");
const out = std.io.getStdOut().writer();
const WORKER_SIZE = 64;
const READ_SIZE = 8 * 1024;
const InfoToKeepTrack = struct {
min: f32,
max: f32,
mean: f64,
count: usize,
};
const DataHash = std.StringArrayHashMap(*InfoToKeepTrack);
fn parseFloat(num: []const u8) f32 {
const mult: f32 = if (num[0] == '-') -1 else 1;
const n = num[if (num[0] == '-') 1 else 0..];
var nat: u32 = 0;
var sub: u32 = 0;
var sub_count: f32 = 1;
var e = false;
for (n) |i| {
if (i == '.') {
e = true;
continue;
}
if (e) {
sub *= 10;
sub += i - 48;
sub_count *= 10;
} else {
nat *= 10;
nat += i - 48;
}
}
return mult * @as(f32, @floatFromInt(nat)) + @as(f32, @floatFromInt(sub)) / sub_count;
}
fn processLines(alloc: std.mem.Allocator, hashmap: *DataHash, buf: []u8) void {
var iter = std.mem.split(u8, buf, "\n");
while (iter.next()) |line| {
if (line.len == 0) {
continue;
}
const index = std.mem.indexOf(u8, line, ";").?;
const name = line[0..index];
//const number = std.fmt.parseFloat(f32, line[index + 1 ..]) catch unreachable;
const number = parseFloat(line[index + 1 ..]);
if (hashmap.get(name)) |v| {
var value = v;
value.count += 1;
value.max = @max(value.max, number);
value.min = @min(value.min, number);
value.mean += number;
} else {
const new_info = alloc.create(InfoToKeepTrack) catch unreachable;
new_info.count = 1;
new_info.max = number;
new_info.min = number;
new_info.mean = number;
const new_name = alloc.alloc(u8, name.len) catch unreachable;
std.mem.copyForwards(u8, new_name, name);
hashmap.put(new_name, new_info) catch unreachable;
}
}
}
fn count_fn(buf: []u8, alloc: std.mem.Allocator, finish: *bool, ghash: *DataHash, ghash_mutex: *std.Thread.Mutex, working: *bool) void {
var internal_hash_map = DataHash.init(alloc);
defer internal_hash_map.deinit();
while (!finish.* or working.*) {
if (working.*) {
const lastEnter = std.mem.lastIndexOf(u8, buf, "\n").? - 1;
processLines(alloc, &internal_hash_map, buf[0..lastEnter]);
working.* = false;
} else {
std.time.sleep(1);
}
}
ghash_mutex.lock();
for (internal_hash_map.keys(), internal_hash_map.values()) |k, iv| {
if (ghash.get(k)) |v| {
v.max = @max(v.max, iv.max);
v.min = @min(v.min, iv.min);
v.mean = v.mean + iv.mean;
v.count += iv.count;
alloc.destroy(iv);
alloc.free(k);
} else {
ghash.put(k, iv) catch unreachable;
}
}
ghash_mutex.unlock();
}
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{ .thread_safe = true }){};
defer _ = gpa.deinit();
const alloc = gpa.allocator();
var file = try std.fs.cwd().openFile("../measurements.txt", .{});
var buf: [WORKER_SIZE][READ_SIZE]u8 = undefined;
var threads: [WORKER_SIZE]std.Thread = undefined;
var threads_mux: [WORKER_SIZE]bool = undefined;
var finish = false;
var count = DataHash.init(alloc);
defer count.deinit();
var mutex = std.Thread.Mutex{};
for (0..WORKER_SIZE) |i| {
buf[i] = std.mem.zeroes([READ_SIZE]u8);
threads_mux[i] = false;
threads[i] = try std.Thread.spawn(.{}, count_fn, .{ &buf[i], alloc, &finish, &count, &mutex, &threads_mux[i] });
}
var round: usize = 0;
while (true) {
if (!threads_mux[round]) {
const read = try file.read(&buf[round]);
if (read < READ_SIZE) {
mutex.lock();
processLines(alloc, &count, buf[round][0..read]);
mutex.unlock();
break;
}
threads_mux[round] = true;
const lastEnter = std.mem.lastIndexOf(u8, &buf[round], "\n").? + 1;
try file.seekTo(try file.getPos() - (READ_SIZE - lastEnter));
}
round = (round + 1) % WORKER_SIZE;
}
blk: while (true) {
for (threads_mux) |b| {
if (b) {
std.time.sleep(1);
continue :blk;
}
}
break;
}
finish = true;
try out.print("Joining\n", .{});
for (0..WORKER_SIZE) |i| {
threads[i].join();
}
try out.print("Done joining\n", .{});
for (count.keys(), count.values()) |key, item| {
try out.print("'{s}': {d:.2}/{d:.2}/{d:.2}\n", .{ key, item.min, item.mean / @as(f64, @floatFromInt(item.count)), item.max });
alloc.destroy(item);
alloc.free(key);
}
//try out.print("Read {d} lines\n", .{count});
}