Sync main with upstream ai-dev
parent
67f14790f4
commit
16bcf34e4a
@ -0,0 +1,56 @@
|
||||
using System.Text.Json;
|
||||
using System.Text.Json.Serialization;
|
||||
|
||||
namespace OnlineMsgServer.Common
|
||||
{
|
||||
internal sealed class PeerRelayEnvelope
|
||||
{
|
||||
public const string OverlayName = "oms-peer/1";
|
||||
|
||||
public string Overlay { get; init; } = OverlayName;
|
||||
public string Kind { get; init; } = "";
|
||||
public string TargetKey { get; init; } = "";
|
||||
public string Payload { get; init; } = "";
|
||||
|
||||
private static readonly JsonSerializerOptions Options = new()
|
||||
{
|
||||
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
|
||||
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull
|
||||
};
|
||||
|
||||
public string ToJsonString()
|
||||
{
|
||||
return JsonSerializer.Serialize(this, Options);
|
||||
}
|
||||
|
||||
public static bool TryParse(string? jsonString, out PeerRelayEnvelope envelope)
|
||||
{
|
||||
envelope = new PeerRelayEnvelope();
|
||||
if (string.IsNullOrWhiteSpace(jsonString))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
PeerRelayEnvelope? parsed = JsonSerializer.Deserialize<PeerRelayEnvelope>(jsonString, Options);
|
||||
if (parsed == null || !string.Equals(parsed.Overlay, OverlayName, StringComparison.Ordinal))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (string.IsNullOrWhiteSpace(parsed.Kind))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
envelope = parsed;
|
||||
return true;
|
||||
}
|
||||
catch
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,689 @@
|
||||
using System.IO;
|
||||
using System.Net.WebSockets;
|
||||
using System.Security.Cryptography;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using OnlineMsgServer.Common;
|
||||
using WebSocketSharp.Server;
|
||||
|
||||
namespace OnlineMsgServer.Core
|
||||
{
|
||||
internal static class PeerNetworkService
|
||||
{
|
||||
private static readonly object _lock = new();
|
||||
private static readonly Dictionary<string, PeerOutboundClient> _outboundPeers = [];
|
||||
|
||||
private static SecurityConfig _config = SecurityRuntime.Config;
|
||||
private static SeenMessageCache _seenCache = new(120);
|
||||
private static WebSocketSessionManager? _sessions;
|
||||
private static CancellationTokenSource? _cts;
|
||||
|
||||
public static void Initialize(SecurityConfig config, WebSocketSessionManager sessions)
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
_config = config;
|
||||
_sessions = sessions;
|
||||
_seenCache = new SeenMessageCache(config.SeenCacheSeconds);
|
||||
}
|
||||
}
|
||||
|
||||
public static void Start()
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
if (_cts != null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_cts = new CancellationTokenSource();
|
||||
foreach (string peerUrl in _config.PeerUrls)
|
||||
{
|
||||
if (_outboundPeers.ContainsKey(peerUrl))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
PeerOutboundClient peerClient = new(peerUrl, BuildPeerDisplayName(peerUrl));
|
||||
_outboundPeers[peerUrl] = peerClient;
|
||||
peerClient.Start(_cts.Token);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void Stop()
|
||||
{
|
||||
CancellationTokenSource? cts;
|
||||
List<PeerOutboundClient> peers;
|
||||
|
||||
lock (_lock)
|
||||
{
|
||||
cts = _cts;
|
||||
_cts = null;
|
||||
peers = [.. _outboundPeers.Values];
|
||||
_outboundPeers.Clear();
|
||||
}
|
||||
|
||||
cts?.Cancel();
|
||||
foreach (PeerOutboundClient peer in peers)
|
||||
{
|
||||
peer.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
public static bool IsPeerUserName(string? userName)
|
||||
{
|
||||
return !string.IsNullOrWhiteSpace(userName) &&
|
||||
userName.StartsWith(_config.PeerUserPrefix, StringComparison.Ordinal);
|
||||
}
|
||||
|
||||
public static string GetPeerUserName()
|
||||
{
|
||||
string userName = $"{_config.PeerUserPrefix}{_config.PeerNodeName}".Trim();
|
||||
return userName.Length <= 64 ? userName : userName[..64];
|
||||
}
|
||||
|
||||
public static string GetVisibleUserName(string? userName)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(userName))
|
||||
{
|
||||
return "";
|
||||
}
|
||||
|
||||
string trimmed = userName.Trim();
|
||||
if (!IsPeerUserName(trimmed))
|
||||
{
|
||||
return trimmed;
|
||||
}
|
||||
|
||||
string visibleName = trimmed[_config.PeerUserPrefix.Length..].Trim();
|
||||
return string.IsNullOrWhiteSpace(visibleName) ? trimmed : visibleName;
|
||||
}
|
||||
|
||||
public static bool TryMarkSeen(string senderIdentity, string type, string key, string payload)
|
||||
{
|
||||
return _seenCache.TryMark(senderIdentity, type, key, payload);
|
||||
}
|
||||
|
||||
public static bool TryHandlePeerRelayForward(string wsid, string targetKey, SignedMessagePayload payload)
|
||||
{
|
||||
if (!UserService.IsPeerNodeSession(wsid))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!string.Equals(targetKey, RsaService.GetRsaPublickKey(), StringComparison.Ordinal))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!PeerRelayEnvelope.TryParse(payload.Payload, out PeerRelayEnvelope envelope))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
string sourcePublicKey = UserService.GetPeerPublicKeyBySessionId(wsid) ?? "";
|
||||
string sourceDisplayName = GetVisibleUserName(UserService.GetUserNameByID(wsid));
|
||||
ProcessPeerEnvelope(sourcePublicKey, sourceDisplayName, envelope);
|
||||
return true;
|
||||
}
|
||||
|
||||
public static void RelayForwardMiss(string targetKey, string payload, string? excludePeerPublicKey = null)
|
||||
{
|
||||
PeerRelayEnvelope envelope = new()
|
||||
{
|
||||
Kind = "forward",
|
||||
TargetKey = targetKey,
|
||||
Payload = payload
|
||||
};
|
||||
|
||||
RelayPeerEnvelope(envelope, excludePeerPublicKey);
|
||||
}
|
||||
|
||||
public static void RelayBroadcast(string payload, string? excludePeerPublicKey = null)
|
||||
{
|
||||
PeerRelayEnvelope envelope = new()
|
||||
{
|
||||
Kind = "broadcast",
|
||||
TargetKey = "",
|
||||
Payload = payload
|
||||
};
|
||||
|
||||
RelayPeerEnvelope(envelope, excludePeerPublicKey);
|
||||
}
|
||||
|
||||
public static void DeliverBroadcastToLocalClients(string senderName, string payload, string? excludeSessionId = null)
|
||||
{
|
||||
WebSocketSessionManager sessions = RequireSessions();
|
||||
Message response = new()
|
||||
{
|
||||
Type = "broadcast",
|
||||
Data = payload,
|
||||
Key = senderName
|
||||
};
|
||||
string jsonString = response.ToJsonString();
|
||||
|
||||
foreach (IWebSocketSession session in sessions.Sessions)
|
||||
{
|
||||
if (session.ID == excludeSessionId)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!UserService.IsAuthenticated(session.ID) || UserService.IsPeerNodeSession(session.ID))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
string? publicKey = UserService.GetUserPublicKeyByID(session.ID);
|
||||
if (string.IsNullOrWhiteSpace(publicKey))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
string encryptString = RsaService.EncryptForClient(publicKey, jsonString);
|
||||
session.Context.WebSocket.Send(encryptString);
|
||||
}
|
||||
}
|
||||
|
||||
public static bool DeliverForwardToLocalClient(string senderPublicKey, string targetPublicKey, string payload)
|
||||
{
|
||||
WebSocketSessionManager sessions = RequireSessions();
|
||||
List<User> userList = UserService.GetUserListByPublicKey(targetPublicKey, includePeerNodes: false);
|
||||
if (userList.Count == 0)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
Message response = new()
|
||||
{
|
||||
Type = "forward",
|
||||
Data = payload,
|
||||
Key = senderPublicKey
|
||||
};
|
||||
string jsonString = response.ToJsonString();
|
||||
string encryptString = RsaService.EncryptForClient(targetPublicKey, jsonString);
|
||||
|
||||
foreach (IWebSocketSession session in sessions.Sessions)
|
||||
{
|
||||
if (userList.Exists(u => u.ID == session.ID))
|
||||
{
|
||||
session.Context.WebSocket.Send(encryptString);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private static void ProcessPeerEnvelope(string sourcePublicKey, string sourceDisplayName, PeerRelayEnvelope envelope)
|
||||
{
|
||||
if (!TryMarkSeen(sourcePublicKey, envelope.Kind, envelope.TargetKey, envelope.Payload))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
switch (envelope.Kind)
|
||||
{
|
||||
case "broadcast":
|
||||
DeliverBroadcastToLocalClients(sourceDisplayName, envelope.Payload);
|
||||
RelayPeerEnvelope(envelope, sourcePublicKey);
|
||||
break;
|
||||
case "forward":
|
||||
bool delivered = DeliverForwardToLocalClient(sourcePublicKey, envelope.TargetKey, envelope.Payload);
|
||||
if (!delivered)
|
||||
{
|
||||
RelayPeerEnvelope(envelope, sourcePublicKey);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
Log.Security("peer_envelope_invalid_kind", $"kind={envelope.Kind}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private static void RelayPeerEnvelope(PeerRelayEnvelope envelope, string? excludePeerPublicKey)
|
||||
{
|
||||
string payloadJson = envelope.ToJsonString();
|
||||
HashSet<string> sentPeerKeys = [];
|
||||
|
||||
foreach (PeerOutboundClient peer in SnapshotOutboundPeers())
|
||||
{
|
||||
string? remotePublicKey = peer.RemotePublicKey;
|
||||
if (!peer.IsAuthenticated || string.IsNullOrWhiteSpace(remotePublicKey))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (string.Equals(remotePublicKey, excludePeerPublicKey, StringComparison.Ordinal) ||
|
||||
!sentPeerKeys.Add(remotePublicKey))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
peer.TrySendRelayEnvelope(payloadJson);
|
||||
}
|
||||
|
||||
SendPeerEnvelopeToInboundPeers(payloadJson, sentPeerKeys, excludePeerPublicKey);
|
||||
}
|
||||
|
||||
private static void SendPeerEnvelopeToInboundPeers(string payloadJson, HashSet<string> sentPeerKeys, string? excludePeerPublicKey)
|
||||
{
|
||||
WebSocketSessionManager sessions = RequireSessions();
|
||||
Message response = new()
|
||||
{
|
||||
Type = "forward",
|
||||
Key = RsaService.GetRsaPublickKey(),
|
||||
Data = payloadJson
|
||||
};
|
||||
string jsonString = response.ToJsonString();
|
||||
|
||||
foreach (User user in UserService.GetAuthenticatedUsers(includePeerNodes: true))
|
||||
{
|
||||
if (!user.IsPeerNode || string.IsNullOrWhiteSpace(user.PublicKey))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (string.Equals(user.PublicKey, excludePeerPublicKey, StringComparison.Ordinal) ||
|
||||
!sentPeerKeys.Add(user.PublicKey))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
string encryptString = RsaService.EncryptForClient(user.PublicKey, jsonString);
|
||||
foreach (IWebSocketSession session in sessions.Sessions)
|
||||
{
|
||||
if (session.ID == user.ID)
|
||||
{
|
||||
session.Context.WebSocket.Send(encryptString);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static List<PeerOutboundClient> SnapshotOutboundPeers()
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
return [.. _outboundPeers.Values];
|
||||
}
|
||||
}
|
||||
|
||||
private static WebSocketSessionManager RequireSessions()
|
||||
{
|
||||
return _sessions ?? throw new InvalidOperationException("peer network sessions not initialized");
|
||||
}
|
||||
|
||||
private static string BuildPeerDisplayName(string peerUrl)
|
||||
{
|
||||
try
|
||||
{
|
||||
Uri uri = new(peerUrl);
|
||||
string displayName = $"{_config.PeerUserPrefix}{BuildGuestAlias(uri.Host)}";
|
||||
return displayName.Length <= 64 ? displayName : displayName[..64];
|
||||
}
|
||||
catch
|
||||
{
|
||||
return GetPeerUserName();
|
||||
}
|
||||
}
|
||||
|
||||
private static void HandlePeerSocketMessage(PeerOutboundClient peer, string text)
|
||||
{
|
||||
if (TryHandlePeerHello(peer, text))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
string plainText;
|
||||
try
|
||||
{
|
||||
plainText = RsaService.Decrypt(text);
|
||||
}
|
||||
catch
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
using JsonDocument doc = JsonDocument.Parse(plainText);
|
||||
JsonElement root = doc.RootElement;
|
||||
if (!root.TryGetProperty("type", out JsonElement typeElement) || typeElement.ValueKind != JsonValueKind.String)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
string type = typeElement.GetString() ?? "";
|
||||
switch (type)
|
||||
{
|
||||
case "auth_ok":
|
||||
peer.MarkAuthenticated();
|
||||
Log.Debug($"peer auth ok {peer.PeerUrl}");
|
||||
return;
|
||||
case "forward":
|
||||
case "broadcast":
|
||||
if (!root.TryGetProperty("data", out JsonElement dataElement))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
string payload = ExtractPayloadString(dataElement);
|
||||
if (PeerRelayEnvelope.TryParse(payload, out PeerRelayEnvelope envelope))
|
||||
{
|
||||
ProcessPeerEnvelope(peer.RemotePublicKey ?? "", GetVisibleUserName(peer.DisplayName), envelope);
|
||||
}
|
||||
return;
|
||||
default:
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
private static string BuildGuestAlias(string seed)
|
||||
{
|
||||
byte[] hash = SHA256.HashData(Encoding.UTF8.GetBytes(seed));
|
||||
int value = BitConverter.ToInt32(hash, 0) & int.MaxValue;
|
||||
return $"guest-{(value % 900000) + 100000:D6}";
|
||||
}
|
||||
|
||||
private static bool TryHandlePeerHello(PeerOutboundClient peer, string text)
|
||||
{
|
||||
try
|
||||
{
|
||||
using JsonDocument doc = JsonDocument.Parse(text);
|
||||
JsonElement root = doc.RootElement;
|
||||
if (!root.TryGetProperty("type", out JsonElement typeElement) ||
|
||||
typeElement.ValueKind != JsonValueKind.String ||
|
||||
!string.Equals(typeElement.GetString(), "publickey", StringComparison.Ordinal))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!root.TryGetProperty("data", out JsonElement dataElement) || dataElement.ValueKind != JsonValueKind.Object)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!dataElement.TryGetProperty("publicKey", out JsonElement publicKeyElement) ||
|
||||
publicKeyElement.ValueKind != JsonValueKind.String ||
|
||||
!dataElement.TryGetProperty("authChallenge", out JsonElement challengeElement) ||
|
||||
challengeElement.ValueKind != JsonValueKind.String)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
string remotePublicKey = publicKeyElement.GetString() ?? "";
|
||||
string challenge = challengeElement.GetString() ?? "";
|
||||
if (string.IsNullOrWhiteSpace(remotePublicKey) || string.IsNullOrWhiteSpace(challenge))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
peer.SetRemotePublicKey(remotePublicKey);
|
||||
SendPeerAuth(peer, remotePublicKey, challenge);
|
||||
return true;
|
||||
}
|
||||
catch
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private static void SendPeerAuth(PeerOutboundClient peer, string remotePublicKey, string challenge)
|
||||
{
|
||||
string localPublicKey = RsaService.GetRsaPublickKey();
|
||||
string userName = GetPeerUserName();
|
||||
long timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
|
||||
string nonce = SecurityValidator.CreateNonce();
|
||||
string signingInput = ClientRegistrationPayload.BuildSigningInput(userName, localPublicKey, challenge, timestamp, nonce);
|
||||
string signature = RsaService.Sign(signingInput);
|
||||
|
||||
Message request = new()
|
||||
{
|
||||
Type = "publickey",
|
||||
Key = userName,
|
||||
Data = new
|
||||
{
|
||||
publicKey = localPublicKey,
|
||||
challenge,
|
||||
timestamp,
|
||||
nonce,
|
||||
signature
|
||||
}
|
||||
};
|
||||
|
||||
string cipherText = RsaService.EncryptForClient(remotePublicKey, request.ToJsonString());
|
||||
peer.TrySendRaw(cipherText);
|
||||
}
|
||||
|
||||
private static string ExtractPayloadString(JsonElement dataElement)
|
||||
{
|
||||
return dataElement.ValueKind == JsonValueKind.String
|
||||
? dataElement.GetString() ?? ""
|
||||
: dataElement.GetRawText();
|
||||
}
|
||||
|
||||
private sealed class PeerOutboundClient(string peerUrl, string displayName) : IDisposable
|
||||
{
|
||||
private readonly object _socketLock = new();
|
||||
|
||||
private ClientWebSocket? _socket;
|
||||
private Task? _runTask;
|
||||
private CancellationToken _cancellationToken;
|
||||
|
||||
public string PeerUrl { get; } = peerUrl;
|
||||
public string DisplayName { get; } = displayName;
|
||||
public string? RemotePublicKey { get; private set; }
|
||||
public bool IsAuthenticated { get; private set; }
|
||||
|
||||
public void Start(CancellationToken cancellationToken)
|
||||
{
|
||||
_cancellationToken = cancellationToken;
|
||||
_runTask = Task.Run(RunAsync, cancellationToken);
|
||||
}
|
||||
|
||||
public void SetRemotePublicKey(string remotePublicKey)
|
||||
{
|
||||
RemotePublicKey = remotePublicKey;
|
||||
}
|
||||
|
||||
public void MarkAuthenticated()
|
||||
{
|
||||
IsAuthenticated = true;
|
||||
}
|
||||
|
||||
public bool TrySendRelayEnvelope(string relayPayload)
|
||||
{
|
||||
if (!IsAuthenticated || string.IsNullOrWhiteSpace(RemotePublicKey))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
long timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
|
||||
string nonce = SecurityValidator.CreateNonce();
|
||||
string targetKey = RemotePublicKey;
|
||||
string signature = RsaService.Sign(SignedMessagePayload.BuildSigningInput("forward", targetKey, relayPayload, timestamp, nonce));
|
||||
|
||||
Message request = new()
|
||||
{
|
||||
Type = "forward",
|
||||
Key = targetKey,
|
||||
Data = new
|
||||
{
|
||||
payload = relayPayload,
|
||||
timestamp,
|
||||
nonce,
|
||||
signature
|
||||
}
|
||||
};
|
||||
|
||||
string cipherText = RsaService.EncryptForClient(RemotePublicKey, request.ToJsonString());
|
||||
return TrySendRaw(cipherText);
|
||||
}
|
||||
|
||||
public bool TrySendRaw(string text)
|
||||
{
|
||||
ClientWebSocket? socket;
|
||||
lock (_socketLock)
|
||||
{
|
||||
socket = _socket;
|
||||
}
|
||||
|
||||
if (socket == null || socket.State != WebSocketState.Open)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
byte[] payload = Encoding.UTF8.GetBytes(text);
|
||||
socket.SendAsync(payload, WebSocketMessageType.Text, true, _cancellationToken)
|
||||
.GetAwaiter()
|
||||
.GetResult();
|
||||
return true;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Security("peer_send_failed", $"peer={PeerUrl} error={ex.Message}");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
ClientWebSocket? socket;
|
||||
lock (_socketLock)
|
||||
{
|
||||
socket = _socket;
|
||||
_socket = null;
|
||||
}
|
||||
|
||||
IsAuthenticated = false;
|
||||
RemotePublicKey = null;
|
||||
|
||||
if (socket == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
socket.Abort();
|
||||
}
|
||||
catch
|
||||
{
|
||||
// ignore
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
socket.Dispose();
|
||||
}
|
||||
catch
|
||||
{
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
private async Task RunAsync()
|
||||
{
|
||||
while (!_cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
ClientWebSocket socket = new();
|
||||
if (PeerUrl.StartsWith("wss://", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
socket.Options.RemoteCertificateValidationCallback = static (_, _, _, _) => true;
|
||||
}
|
||||
|
||||
lock (_socketLock)
|
||||
{
|
||||
_socket = socket;
|
||||
}
|
||||
|
||||
IsAuthenticated = false;
|
||||
RemotePublicKey = null;
|
||||
|
||||
try
|
||||
{
|
||||
await socket.ConnectAsync(new Uri(PeerUrl), _cancellationToken);
|
||||
Log.Debug($"peer open {PeerUrl}");
|
||||
await ReceiveLoopAsync(socket, _cancellationToken);
|
||||
}
|
||||
catch (OperationCanceledException) when (_cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Security("peer_connect_failed", $"peer={PeerUrl} error={ex}");
|
||||
}
|
||||
finally
|
||||
{
|
||||
string closeReason = "";
|
||||
try
|
||||
{
|
||||
closeReason = socket.CloseStatusDescription
|
||||
?? socket.CloseStatus?.ToString()
|
||||
?? "";
|
||||
}
|
||||
catch
|
||||
{
|
||||
// ignore
|
||||
}
|
||||
|
||||
Dispose();
|
||||
Log.Debug($"peer close {PeerUrl} {closeReason}");
|
||||
}
|
||||
|
||||
if (_cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
await Task.Delay(TimeSpan.FromSeconds(_config.PeerReconnectSeconds), _cancellationToken)
|
||||
.ContinueWith(_ => { }, TaskScheduler.Default);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ReceiveLoopAsync(ClientWebSocket socket, CancellationToken cancellationToken)
|
||||
{
|
||||
byte[] buffer = new byte[16 * 1024];
|
||||
using MemoryStream messageBuffer = new();
|
||||
|
||||
while (!cancellationToken.IsCancellationRequested && socket.State == WebSocketState.Open)
|
||||
{
|
||||
WebSocketReceiveResult result = await socket.ReceiveAsync(buffer, cancellationToken);
|
||||
if (result.MessageType == WebSocketMessageType.Close)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
if (result.Count > 0)
|
||||
{
|
||||
messageBuffer.Write(buffer, 0, result.Count);
|
||||
}
|
||||
|
||||
if (!result.EndOfMessage)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (result.MessageType != WebSocketMessageType.Text)
|
||||
{
|
||||
messageBuffer.SetLength(0);
|
||||
continue;
|
||||
}
|
||||
|
||||
string text = Encoding.UTF8.GetString(messageBuffer.GetBuffer(), 0, (int)messageBuffer.Length);
|
||||
messageBuffer.SetLength(0);
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(text))
|
||||
{
|
||||
HandlePeerSocketMessage(this, text);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,55 @@
|
||||
using System.Security.Cryptography;
|
||||
using System.Text;
|
||||
|
||||
namespace OnlineMsgServer.Core
|
||||
{
|
||||
internal sealed class SeenMessageCache
|
||||
{
|
||||
private readonly object _lock = new();
|
||||
private readonly Dictionary<string, DateTime> _seenUntilUtc = [];
|
||||
private readonly int _ttlSeconds;
|
||||
|
||||
public SeenMessageCache(int ttlSeconds)
|
||||
{
|
||||
_ttlSeconds = Math.Max(ttlSeconds, 1);
|
||||
}
|
||||
|
||||
public bool TryMark(string senderIdentity, string type, string key, string payload)
|
||||
{
|
||||
string hash = ComputeHash(senderIdentity, type, key, payload);
|
||||
DateTime nowUtc = DateTime.UtcNow;
|
||||
|
||||
lock (_lock)
|
||||
{
|
||||
if (_seenUntilUtc.TryGetValue(hash, out DateTime untilUtc) && untilUtc > nowUtc)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
_seenUntilUtc[hash] = nowUtc.AddSeconds(_ttlSeconds);
|
||||
|
||||
List<string> expiredKeys = [];
|
||||
foreach (KeyValuePair<string, DateTime> item in _seenUntilUtc)
|
||||
{
|
||||
if (item.Value <= nowUtc)
|
||||
{
|
||||
expiredKeys.Add(item.Key);
|
||||
}
|
||||
}
|
||||
|
||||
foreach (string expiredKey in expiredKeys)
|
||||
{
|
||||
_seenUntilUtc.Remove(expiredKey);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private static string ComputeHash(string senderIdentity, string type, string key, string payload)
|
||||
{
|
||||
byte[] bytes = Encoding.UTF8.GetBytes(string.Join("\n", senderIdentity, type, key, payload));
|
||||
return Convert.ToHexString(SHA256.HashData(bytes));
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue