作者:read-late
项目:bazi
func (c controlRPC) PeerVolumeAllow(ctx context.Context, req *wire.PeerVolumeAllowRequest) (*wire.PeerVolumeAllowResponse, error) {
var pub peer.PublicKey
if err := pub.UnmarshalBinary(req.Pub); err != nil {
return nil, grpc.Errorf(codes.InvalidArgument, "bad peer public key: %v", err)
}
allowVolume := func(tx *db.Tx) error {
p, err := tx.Peers().Get(&pub)
if err != nil {
return err
}
v, err := tx.Volumes().GetByName(req.VolumeName)
if err != nil {
return err
}
return p.Volumes().Allow(v)
}
if err := c.app.DB.Update(allowVolume); err != nil {
if err == db.ErrPeerNotFound {
return nil, grpc.Errorf(codes.InvalidArgument, "peer not found")
}
log.Printf("db error: allowing peer volume: %v", err)
return nil, grpc.Errorf(codes.Internal, "database error")
}
return &wire.PeerVolumeAllowResponse{}, nil
}
作者:som-snyt
项目:bazi
func (app *App) openStorage(backend string) (kv.KV, error) {
switch backend {
case "local":
kvpath := filepath.Join(app.DataDir, "chunks")
return kvfiles.Open(kvpath)
}
if backend != "" && backend[0] == '/' {
return kvfiles.Open(backend)
}
idx := strings.IndexByte(backend, ':')
if idx != -1 {
scheme, rest := backend[:idx], backend[idx+1:]
switch scheme {
case "peerkey":
var key peer.PublicKey
if err := key.Set(rest); err != nil {
return nil, err
}
p, err := app.DialPeer(&key)
if err != nil {
return nil, err
}
// TODO Close
return kvpeer.Open(p)
}
}
return nil, errors.New("unknown storage backend")
}
作者:read-late
项目:bazi
func TestPublicKeySetBadShort(t *testing.T) {
var pub peer.PublicKey
err := pub.Set("42")
if err == nil {
t.Fatal("expected an error from Set")
}
if g, e := err.Error(), "not a valid public key: wrong size"; g != e {
t.Errorf("wrong error message: %q != %q", g, e)
}
}
作者:read-late
项目:bazi
func TestPublicKeyString(t *testing.T) {
pub := peer.PublicKey{
0x4d, 0x0e, 0x62, 0x5e, 0xff, 0x41, 0x00, 0x6a,
0x18, 0xb3, 0xbf, 0xda, 0x35, 0xb1, 0x40, 0xfc,
0xad, 0x91, 0x78, 0xfe, 0x6f, 0x6c, 0xaf, 0x74,
0xf5, 0x05, 0x9c, 0x35, 0xf0, 0xfe, 0x21, 0xaf,
}
if g, e := pub.String(), "4d0e625eff41006a18b3bfda35b140fcad9178fe6f6caf74f5059c35f0fe21af"; g != e {
t.Errorf("wrong pubkey output: %q != %q", g, e)
}
}
作者:read-late
项目:bazi
func TestPublicKeySet(t *testing.T) {
var pub peer.PublicKey
err := pub.Set("4d0e625eff41006a18b3bfda35b140fcad9178fe6f6caf74f5059c35f0fe21af")
if err != nil {
t.Errorf("Set: %v", err)
}
want := [ed25519.PublicKeySize]byte{
0x4d, 0x0e, 0x62, 0x5e, 0xff, 0x41, 0x00, 0x6a,
0x18, 0xb3, 0xbf, 0xda, 0x35, 0xb1, 0x40, 0xfc,
0xad, 0x91, 0x78, 0xfe, 0x6f, 0x6c, 0xaf, 0x74,
0xf5, 0x05, 0x9c, 0x35, 0xf0, 0xfe, 0x21, 0xaf,
}
if g, e := pub, want; g != e {
t.Errorf("wrong pubkey value: %q != %q", g, e)
}
}
作者:read-late
项目:bazi
func (c *PeersCursor) item(k, _ []byte) *Peer {
if k == nil {
return nil
}
bucket := c.c.Bucket().Bucket(k)
if bucket == nil {
panic("db peer corrupt, not a bucket")
}
var pub peer.PublicKey
if err := pub.UnmarshalBinary(k); err != nil {
panic("db peer corrupt: " + err.Error())
}
p := &Peer{
b: bucket,
pub: &pub,
}
return p
}
作者:read-late
项目:bazi
func (c controlRPC) PeerAdd(ctx context.Context, req *wire.PeerAddRequest) (*wire.PeerAddResponse, error) {
var pub peer.PublicKey
if err := pub.UnmarshalBinary(req.Pub); err != nil {
return nil, grpc.Errorf(codes.InvalidArgument, "bad peer public key: %v", err)
}
if bytes.Equal(pub[:], c.app.Keys.Sign.Pub[:]) {
return nil, grpc.Errorf(codes.InvalidArgument, "cannot add self as peer")
}
makePeer := func(tx *db.Tx) error {
if _, err := tx.Peers().Make(&pub); err != nil {
return err
}
return nil
}
if err := c.app.DB.Update(makePeer); err != nil {
log.Printf("db update error: put public key %x: %v", pub[:], err)
return nil, grpc.Errorf(codes.Internal, "database error")
}
return &wire.PeerAddResponse{}, nil
}
作者:som-snyt
项目:bazi
func (c controlRPC) PeerLocationSet(ctx context.Context, req *wire.PeerLocationSetRequest) (*wire.PeerLocationSetResponse, error) {
var pub peer.PublicKey
if err := pub.UnmarshalBinary(req.Pub); err != nil {
return nil, grpc.Errorf(codes.InvalidArgument, "bad peer public key: %v", err)
}
setLoc := func(tx *db.Tx) error {
p, err := tx.Peers().Get(&pub)
if err != nil {
return err
}
return p.Locations().Set(req.Netloc)
}
if err := c.app.DB.Update(setLoc); err != nil {
if err == db.ErrPeerNotFound {
return nil, grpc.Errorf(codes.InvalidArgument, "peer not found")
}
log.Printf("db error: setting peer addr: %v", err)
return nil, grpc.Errorf(codes.Internal, "database error")
}
return &wire.PeerLocationSetResponse{}, nil
}
作者:som-snyt
项目:bazi
func (c controlRPC) VolumeSync(ctx context.Context, req *wire.VolumeSyncRequest) (*wire.VolumeSyncResponse, error) {
var volID db.VolumeID
loadVolume := func(tx *db.Tx) error {
v, err := tx.Volumes().GetByName(req.VolumeName)
if err != nil {
if err == db.ErrVolNameNotFound {
return grpc.Errorf(codes.InvalidArgument, "%v", err)
}
return err
}
v.VolumeID(&volID)
return nil
}
if err := c.app.DB.View(loadVolume); err != nil {
return nil, err
}
var pub peer.PublicKey
if err := pub.UnmarshalBinary(req.Pub); err != nil {
return nil, grpc.Errorf(codes.InvalidArgument, "bad peer public key: %v", err)
}
client, err := c.app.DialPeer(&pub)
if err != nil {
return nil, err
}
defer client.Close()
volIDBuf, err := volID.MarshalBinary()
if err != nil {
return nil, err
}
peerReq := &wirepeer.VolumeSyncPullRequest{
VolumeID: volIDBuf,
Path: req.Path,
}
stream, err := client.VolumeSyncPull(ctx, peerReq)
if err != nil {
return nil, err
}
first, err := stream.Recv()
if err != nil && err != io.EOF {
return nil, err
}
switch first.Error {
case wirepeer.VolumeSyncPullItem_SUCCESS:
// nothing
case wirepeer.VolumeSyncPullItem_NOT_A_DIRECTORY:
// TODO maybe we should handle the path not being a dir, somehow
return nil, grpc.Errorf(codes.FailedPrecondition, "path to sync is not a directory")
default:
return nil, grpc.Errorf(codes.FailedPrecondition, "peer gave error: %v", first.Error.String())
}
recv := func() ([]*wirepeer.Dirent, error) {
if first.Children != nil {
tmp := first.Children
first.Children = nil
return tmp, nil
}
item, err := stream.Recv()
if err != nil {
return nil, err
}
return item.Children, nil
}
ref, err := c.app.GetVolume(&volID)
if err != nil {
return nil, err
}
defer ref.Close()
if err := ref.FS().SyncReceive(ctx, req.Path, first.Peers, recv); err != nil {
return nil, err
}
return &wire.VolumeSyncResponse{}, nil
}
作者:read-late
项目:bazi
func (c controlRPC) VolumeConnect(ctx context.Context, req *wire.VolumeConnectRequest) (*wire.VolumeConnectResponse, error) {
var pub peer.PublicKey
if err := pub.UnmarshalBinary(req.Pub); err != nil {
return nil, grpc.Errorf(codes.InvalidArgument, "bad peer public key: %v", err)
}
if err := c.app.ValidateKV(req.Backend); err != nil {
return nil, grpc.Errorf(codes.InvalidArgument, "invalid backend: %q", req.Backend)
}
client, err := c.app.DialPeer(&pub)
if err != nil {
return nil, err
}
defer client.Close()
presp, err := client.VolumeConnect(ctx, &wirepeer.VolumeConnectRequest{
VolumeName: req.VolumeName,
})
if err != nil {
return nil, err
}
var volID db.VolumeID
if err := volID.UnmarshalBinary(presp.VolumeID); err != nil {
return nil, err
}
volumeConnect := func(tx *db.Tx) error {
sharingKey, err := tx.SharingKeys().Get(req.SharingKeyName)
if err != nil {
return err
}
v, err := tx.Volumes().Add(req.LocalVolumeName, &volID, req.Backend, sharingKey)
if err != nil {
return err
}
p, err := tx.Peers().Get(&pub)
if err != nil {
return err
}
if err := p.Volumes().Allow(v); err != nil {
return err
}
return nil
}
if err := c.app.DB.Update(volumeConnect); err != nil {
switch err {
case db.ErrVolNameInvalid:
return nil, grpc.Errorf(codes.InvalidArgument, "%v", err)
case db.ErrVolNameExist:
return nil, grpc.Errorf(codes.AlreadyExists, "%v", err)
case db.ErrSharingKeyNameInvalid:
return nil, grpc.Errorf(codes.InvalidArgument, "%v", err)
case db.ErrSharingKeyNotFound:
return nil, grpc.Errorf(codes.FailedPrecondition, "%v", err)
}
return nil, err
}
return &wire.VolumeConnectResponse{}, nil
}