mirror of
				https://github.com/superseriousbusiness/gotosocial.git
				synced 2025-10-30 23:52:26 -05:00 
			
		
		
		
	
		
			
	
	
		
			679 lines
		
	
	
	
		
			17 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
		
		
			
		
	
	
			679 lines
		
	
	
	
		
			17 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
|  | // Copyright 2011 Google Inc. All rights reserved. | ||
|  | // Use of this source code is governed by the Apache 2.0 | ||
|  | // license that can be found in the LICENSE file. | ||
|  | 
 | ||
|  | // +build !appengine | ||
|  | 
 | ||
|  | package internal | ||
|  | 
 | ||
|  | import ( | ||
|  | 	"bytes" | ||
|  | 	"errors" | ||
|  | 	"fmt" | ||
|  | 	"io/ioutil" | ||
|  | 	"log" | ||
|  | 	"net" | ||
|  | 	"net/http" | ||
|  | 	"net/url" | ||
|  | 	"os" | ||
|  | 	"runtime" | ||
|  | 	"strconv" | ||
|  | 	"strings" | ||
|  | 	"sync" | ||
|  | 	"sync/atomic" | ||
|  | 	"time" | ||
|  | 
 | ||
|  | 	"github.com/golang/protobuf/proto" | ||
|  | 	netcontext "golang.org/x/net/context" | ||
|  | 
 | ||
|  | 	basepb "google.golang.org/appengine/internal/base" | ||
|  | 	logpb "google.golang.org/appengine/internal/log" | ||
|  | 	remotepb "google.golang.org/appengine/internal/remote_api" | ||
|  | ) | ||
|  | 
 | ||
|  | const ( | ||
|  | 	apiPath             = "/rpc_http" | ||
|  | 	defaultTicketSuffix = "/default.20150612t184001.0" | ||
|  | ) | ||
|  | 
 | ||
|  | var ( | ||
|  | 	// Incoming headers. | ||
|  | 	ticketHeader       = http.CanonicalHeaderKey("X-AppEngine-API-Ticket") | ||
|  | 	dapperHeader       = http.CanonicalHeaderKey("X-Google-DapperTraceInfo") | ||
|  | 	traceHeader        = http.CanonicalHeaderKey("X-Cloud-Trace-Context") | ||
|  | 	curNamespaceHeader = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace") | ||
|  | 	userIPHeader       = http.CanonicalHeaderKey("X-AppEngine-User-IP") | ||
|  | 	remoteAddrHeader   = http.CanonicalHeaderKey("X-AppEngine-Remote-Addr") | ||
|  | 	devRequestIdHeader = http.CanonicalHeaderKey("X-Appengine-Dev-Request-Id") | ||
|  | 
 | ||
|  | 	// Outgoing headers. | ||
|  | 	apiEndpointHeader      = http.CanonicalHeaderKey("X-Google-RPC-Service-Endpoint") | ||
|  | 	apiEndpointHeaderValue = []string{"app-engine-apis"} | ||
|  | 	apiMethodHeader        = http.CanonicalHeaderKey("X-Google-RPC-Service-Method") | ||
|  | 	apiMethodHeaderValue   = []string{"/VMRemoteAPI.CallRemoteAPI"} | ||
|  | 	apiDeadlineHeader      = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline") | ||
|  | 	apiContentType         = http.CanonicalHeaderKey("Content-Type") | ||
|  | 	apiContentTypeValue    = []string{"application/octet-stream"} | ||
|  | 	logFlushHeader         = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count") | ||
|  | 
 | ||
|  | 	apiHTTPClient = &http.Client{ | ||
|  | 		Transport: &http.Transport{ | ||
|  | 			Proxy:               http.ProxyFromEnvironment, | ||
|  | 			Dial:                limitDial, | ||
|  | 			MaxIdleConns:        1000, | ||
|  | 			MaxIdleConnsPerHost: 10000, | ||
|  | 			IdleConnTimeout:     90 * time.Second, | ||
|  | 		}, | ||
|  | 	} | ||
|  | 
 | ||
|  | 	defaultTicketOnce     sync.Once | ||
|  | 	defaultTicket         string | ||
|  | 	backgroundContextOnce sync.Once | ||
|  | 	backgroundContext     netcontext.Context | ||
|  | ) | ||
|  | 
 | ||
|  | func apiURL() *url.URL { | ||
|  | 	host, port := "appengine.googleapis.internal", "10001" | ||
|  | 	if h := os.Getenv("API_HOST"); h != "" { | ||
|  | 		host = h | ||
|  | 	} | ||
|  | 	if p := os.Getenv("API_PORT"); p != "" { | ||
|  | 		port = p | ||
|  | 	} | ||
|  | 	return &url.URL{ | ||
|  | 		Scheme: "http", | ||
|  | 		Host:   host + ":" + port, | ||
|  | 		Path:   apiPath, | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | func handleHTTP(w http.ResponseWriter, r *http.Request) { | ||
|  | 	c := &context{ | ||
|  | 		req:       r, | ||
|  | 		outHeader: w.Header(), | ||
|  | 		apiURL:    apiURL(), | ||
|  | 	} | ||
|  | 	r = r.WithContext(withContext(r.Context(), c)) | ||
|  | 	c.req = r | ||
|  | 
 | ||
|  | 	stopFlushing := make(chan int) | ||
|  | 
 | ||
|  | 	// Patch up RemoteAddr so it looks reasonable. | ||
|  | 	if addr := r.Header.Get(userIPHeader); addr != "" { | ||
|  | 		r.RemoteAddr = addr | ||
|  | 	} else if addr = r.Header.Get(remoteAddrHeader); addr != "" { | ||
|  | 		r.RemoteAddr = addr | ||
|  | 	} else { | ||
|  | 		// Should not normally reach here, but pick a sensible default anyway. | ||
|  | 		r.RemoteAddr = "127.0.0.1" | ||
|  | 	} | ||
|  | 	// The address in the headers will most likely be of these forms: | ||
|  | 	//	123.123.123.123 | ||
|  | 	//	2001:db8::1 | ||
|  | 	// net/http.Request.RemoteAddr is specified to be in "IP:port" form. | ||
|  | 	if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil { | ||
|  | 		// Assume the remote address is only a host; add a default port. | ||
|  | 		r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80") | ||
|  | 	} | ||
|  | 
 | ||
|  | 	// Start goroutine responsible for flushing app logs. | ||
|  | 	// This is done after adding c to ctx.m (and stopped before removing it) | ||
|  | 	// because flushing logs requires making an API call. | ||
|  | 	go c.logFlusher(stopFlushing) | ||
|  | 
 | ||
|  | 	executeRequestSafely(c, r) | ||
|  | 	c.outHeader = nil // make sure header changes aren't respected any more | ||
|  | 
 | ||
|  | 	stopFlushing <- 1 // any logging beyond this point will be dropped | ||
|  | 
 | ||
|  | 	// Flush any pending logs asynchronously. | ||
|  | 	c.pendingLogs.Lock() | ||
|  | 	flushes := c.pendingLogs.flushes | ||
|  | 	if len(c.pendingLogs.lines) > 0 { | ||
|  | 		flushes++ | ||
|  | 	} | ||
|  | 	c.pendingLogs.Unlock() | ||
|  | 	flushed := make(chan struct{}) | ||
|  | 	go func() { | ||
|  | 		defer close(flushed) | ||
|  | 		// Force a log flush, because with very short requests we | ||
|  | 		// may not ever flush logs. | ||
|  | 		c.flushLog(true) | ||
|  | 	}() | ||
|  | 	w.Header().Set(logFlushHeader, strconv.Itoa(flushes)) | ||
|  | 
 | ||
|  | 	// Avoid nil Write call if c.Write is never called. | ||
|  | 	if c.outCode != 0 { | ||
|  | 		w.WriteHeader(c.outCode) | ||
|  | 	} | ||
|  | 	if c.outBody != nil { | ||
|  | 		w.Write(c.outBody) | ||
|  | 	} | ||
|  | 	// Wait for the last flush to complete before returning, | ||
|  | 	// otherwise the security ticket will not be valid. | ||
|  | 	<-flushed | ||
|  | } | ||
|  | 
 | ||
|  | func executeRequestSafely(c *context, r *http.Request) { | ||
|  | 	defer func() { | ||
|  | 		if x := recover(); x != nil { | ||
|  | 			logf(c, 4, "%s", renderPanic(x)) // 4 == critical | ||
|  | 			c.outCode = 500 | ||
|  | 		} | ||
|  | 	}() | ||
|  | 
 | ||
|  | 	http.DefaultServeMux.ServeHTTP(c, r) | ||
|  | } | ||
|  | 
 | ||
|  | func renderPanic(x interface{}) string { | ||
|  | 	buf := make([]byte, 16<<10) // 16 KB should be plenty | ||
|  | 	buf = buf[:runtime.Stack(buf, false)] | ||
|  | 
 | ||
|  | 	// Remove the first few stack frames: | ||
|  | 	//   this func | ||
|  | 	//   the recover closure in the caller | ||
|  | 	// That will root the stack trace at the site of the panic. | ||
|  | 	const ( | ||
|  | 		skipStart  = "internal.renderPanic" | ||
|  | 		skipFrames = 2 | ||
|  | 	) | ||
|  | 	start := bytes.Index(buf, []byte(skipStart)) | ||
|  | 	p := start | ||
|  | 	for i := 0; i < skipFrames*2 && p+1 < len(buf); i++ { | ||
|  | 		p = bytes.IndexByte(buf[p+1:], '\n') + p + 1 | ||
|  | 		if p < 0 { | ||
|  | 			break | ||
|  | 		} | ||
|  | 	} | ||
|  | 	if p >= 0 { | ||
|  | 		// buf[start:p+1] is the block to remove. | ||
|  | 		// Copy buf[p+1:] over buf[start:] and shrink buf. | ||
|  | 		copy(buf[start:], buf[p+1:]) | ||
|  | 		buf = buf[:len(buf)-(p+1-start)] | ||
|  | 	} | ||
|  | 
 | ||
|  | 	// Add panic heading. | ||
|  | 	head := fmt.Sprintf("panic: %v\n\n", x) | ||
|  | 	if len(head) > len(buf) { | ||
|  | 		// Extremely unlikely to happen. | ||
|  | 		return head | ||
|  | 	} | ||
|  | 	copy(buf[len(head):], buf) | ||
|  | 	copy(buf, head) | ||
|  | 
 | ||
|  | 	return string(buf) | ||
|  | } | ||
|  | 
 | ||
|  | // context represents the context of an in-flight HTTP request. | ||
|  | // It implements the appengine.Context and http.ResponseWriter interfaces. | ||
|  | type context struct { | ||
|  | 	req *http.Request | ||
|  | 
 | ||
|  | 	outCode   int | ||
|  | 	outHeader http.Header | ||
|  | 	outBody   []byte | ||
|  | 
 | ||
|  | 	pendingLogs struct { | ||
|  | 		sync.Mutex | ||
|  | 		lines   []*logpb.UserAppLogLine | ||
|  | 		flushes int | ||
|  | 	} | ||
|  | 
 | ||
|  | 	apiURL *url.URL | ||
|  | } | ||
|  | 
 | ||
|  | var contextKey = "holds a *context" | ||
|  | 
 | ||
|  | // jointContext joins two contexts in a superficial way. | ||
|  | // It takes values and timeouts from a base context, and only values from another context. | ||
|  | type jointContext struct { | ||
|  | 	base       netcontext.Context | ||
|  | 	valuesOnly netcontext.Context | ||
|  | } | ||
|  | 
 | ||
|  | func (c jointContext) Deadline() (time.Time, bool) { | ||
|  | 	return c.base.Deadline() | ||
|  | } | ||
|  | 
 | ||
|  | func (c jointContext) Done() <-chan struct{} { | ||
|  | 	return c.base.Done() | ||
|  | } | ||
|  | 
 | ||
|  | func (c jointContext) Err() error { | ||
|  | 	return c.base.Err() | ||
|  | } | ||
|  | 
 | ||
|  | func (c jointContext) Value(key interface{}) interface{} { | ||
|  | 	if val := c.base.Value(key); val != nil { | ||
|  | 		return val | ||
|  | 	} | ||
|  | 	return c.valuesOnly.Value(key) | ||
|  | } | ||
|  | 
 | ||
|  | // fromContext returns the App Engine context or nil if ctx is not | ||
|  | // derived from an App Engine context. | ||
|  | func fromContext(ctx netcontext.Context) *context { | ||
|  | 	c, _ := ctx.Value(&contextKey).(*context) | ||
|  | 	return c | ||
|  | } | ||
|  | 
 | ||
|  | func withContext(parent netcontext.Context, c *context) netcontext.Context { | ||
|  | 	ctx := netcontext.WithValue(parent, &contextKey, c) | ||
|  | 	if ns := c.req.Header.Get(curNamespaceHeader); ns != "" { | ||
|  | 		ctx = withNamespace(ctx, ns) | ||
|  | 	} | ||
|  | 	return ctx | ||
|  | } | ||
|  | 
 | ||
|  | func toContext(c *context) netcontext.Context { | ||
|  | 	return withContext(netcontext.Background(), c) | ||
|  | } | ||
|  | 
 | ||
|  | func IncomingHeaders(ctx netcontext.Context) http.Header { | ||
|  | 	if c := fromContext(ctx); c != nil { | ||
|  | 		return c.req.Header | ||
|  | 	} | ||
|  | 	return nil | ||
|  | } | ||
|  | 
 | ||
|  | func ReqContext(req *http.Request) netcontext.Context { | ||
|  | 	return req.Context() | ||
|  | } | ||
|  | 
 | ||
|  | func WithContext(parent netcontext.Context, req *http.Request) netcontext.Context { | ||
|  | 	return jointContext{ | ||
|  | 		base:       parent, | ||
|  | 		valuesOnly: req.Context(), | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | // DefaultTicket returns a ticket used for background context or dev_appserver. | ||
|  | func DefaultTicket() string { | ||
|  | 	defaultTicketOnce.Do(func() { | ||
|  | 		if IsDevAppServer() { | ||
|  | 			defaultTicket = "testapp" + defaultTicketSuffix | ||
|  | 			return | ||
|  | 		} | ||
|  | 		appID := partitionlessAppID() | ||
|  | 		escAppID := strings.Replace(strings.Replace(appID, ":", "_", -1), ".", "_", -1) | ||
|  | 		majVersion := VersionID(nil) | ||
|  | 		if i := strings.Index(majVersion, "."); i > 0 { | ||
|  | 			majVersion = majVersion[:i] | ||
|  | 		} | ||
|  | 		defaultTicket = fmt.Sprintf("%s/%s.%s.%s", escAppID, ModuleName(nil), majVersion, InstanceID()) | ||
|  | 	}) | ||
|  | 	return defaultTicket | ||
|  | } | ||
|  | 
 | ||
|  | func BackgroundContext() netcontext.Context { | ||
|  | 	backgroundContextOnce.Do(func() { | ||
|  | 		// Compute background security ticket. | ||
|  | 		ticket := DefaultTicket() | ||
|  | 
 | ||
|  | 		c := &context{ | ||
|  | 			req: &http.Request{ | ||
|  | 				Header: http.Header{ | ||
|  | 					ticketHeader: []string{ticket}, | ||
|  | 				}, | ||
|  | 			}, | ||
|  | 			apiURL: apiURL(), | ||
|  | 		} | ||
|  | 		backgroundContext = toContext(c) | ||
|  | 
 | ||
|  | 		// TODO(dsymonds): Wire up the shutdown handler to do a final flush. | ||
|  | 		go c.logFlusher(make(chan int)) | ||
|  | 	}) | ||
|  | 
 | ||
|  | 	return backgroundContext | ||
|  | } | ||
|  | 
 | ||
|  | // RegisterTestRequest registers the HTTP request req for testing, such that | ||
|  | // any API calls are sent to the provided URL. It returns a closure to delete | ||
|  | // the registration. | ||
|  | // It should only be used by aetest package. | ||
|  | func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netcontext.Context) netcontext.Context) (*http.Request, func()) { | ||
|  | 	c := &context{ | ||
|  | 		req:    req, | ||
|  | 		apiURL: apiURL, | ||
|  | 	} | ||
|  | 	ctx := withContext(decorate(req.Context()), c) | ||
|  | 	req = req.WithContext(ctx) | ||
|  | 	c.req = req | ||
|  | 	return req, func() {} | ||
|  | } | ||
|  | 
 | ||
|  | var errTimeout = &CallError{ | ||
|  | 	Detail:  "Deadline exceeded", | ||
|  | 	Code:    int32(remotepb.RpcError_CANCELLED), | ||
|  | 	Timeout: true, | ||
|  | } | ||
|  | 
 | ||
|  | func (c *context) Header() http.Header { return c.outHeader } | ||
|  | 
 | ||
|  | // Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status | ||
|  | // codes do not permit a response body (nor response entity headers such as | ||
|  | // Content-Length, Content-Type, etc). | ||
|  | func bodyAllowedForStatus(status int) bool { | ||
|  | 	switch { | ||
|  | 	case status >= 100 && status <= 199: | ||
|  | 		return false | ||
|  | 	case status == 204: | ||
|  | 		return false | ||
|  | 	case status == 304: | ||
|  | 		return false | ||
|  | 	} | ||
|  | 	return true | ||
|  | } | ||
|  | 
 | ||
|  | func (c *context) Write(b []byte) (int, error) { | ||
|  | 	if c.outCode == 0 { | ||
|  | 		c.WriteHeader(http.StatusOK) | ||
|  | 	} | ||
|  | 	if len(b) > 0 && !bodyAllowedForStatus(c.outCode) { | ||
|  | 		return 0, http.ErrBodyNotAllowed | ||
|  | 	} | ||
|  | 	c.outBody = append(c.outBody, b...) | ||
|  | 	return len(b), nil | ||
|  | } | ||
|  | 
 | ||
|  | func (c *context) WriteHeader(code int) { | ||
|  | 	if c.outCode != 0 { | ||
|  | 		logf(c, 3, "WriteHeader called multiple times on request.") // error level | ||
|  | 		return | ||
|  | 	} | ||
|  | 	c.outCode = code | ||
|  | } | ||
|  | 
 | ||
|  | func (c *context) post(body []byte, timeout time.Duration) (b []byte, err error) { | ||
|  | 	hreq := &http.Request{ | ||
|  | 		Method: "POST", | ||
|  | 		URL:    c.apiURL, | ||
|  | 		Header: http.Header{ | ||
|  | 			apiEndpointHeader: apiEndpointHeaderValue, | ||
|  | 			apiMethodHeader:   apiMethodHeaderValue, | ||
|  | 			apiContentType:    apiContentTypeValue, | ||
|  | 			apiDeadlineHeader: []string{strconv.FormatFloat(timeout.Seconds(), 'f', -1, 64)}, | ||
|  | 		}, | ||
|  | 		Body:          ioutil.NopCloser(bytes.NewReader(body)), | ||
|  | 		ContentLength: int64(len(body)), | ||
|  | 		Host:          c.apiURL.Host, | ||
|  | 	} | ||
|  | 	if info := c.req.Header.Get(dapperHeader); info != "" { | ||
|  | 		hreq.Header.Set(dapperHeader, info) | ||
|  | 	} | ||
|  | 	if info := c.req.Header.Get(traceHeader); info != "" { | ||
|  | 		hreq.Header.Set(traceHeader, info) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	tr := apiHTTPClient.Transport.(*http.Transport) | ||
|  | 
 | ||
|  | 	var timedOut int32 // atomic; set to 1 if timed out | ||
|  | 	t := time.AfterFunc(timeout, func() { | ||
|  | 		atomic.StoreInt32(&timedOut, 1) | ||
|  | 		tr.CancelRequest(hreq) | ||
|  | 	}) | ||
|  | 	defer t.Stop() | ||
|  | 	defer func() { | ||
|  | 		// Check if timeout was exceeded. | ||
|  | 		if atomic.LoadInt32(&timedOut) != 0 { | ||
|  | 			err = errTimeout | ||
|  | 		} | ||
|  | 	}() | ||
|  | 
 | ||
|  | 	hresp, err := apiHTTPClient.Do(hreq) | ||
|  | 	if err != nil { | ||
|  | 		return nil, &CallError{ | ||
|  | 			Detail: fmt.Sprintf("service bridge HTTP failed: %v", err), | ||
|  | 			Code:   int32(remotepb.RpcError_UNKNOWN), | ||
|  | 		} | ||
|  | 	} | ||
|  | 	defer hresp.Body.Close() | ||
|  | 	hrespBody, err := ioutil.ReadAll(hresp.Body) | ||
|  | 	if hresp.StatusCode != 200 { | ||
|  | 		return nil, &CallError{ | ||
|  | 			Detail: fmt.Sprintf("service bridge returned HTTP %d (%q)", hresp.StatusCode, hrespBody), | ||
|  | 			Code:   int32(remotepb.RpcError_UNKNOWN), | ||
|  | 		} | ||
|  | 	} | ||
|  | 	if err != nil { | ||
|  | 		return nil, &CallError{ | ||
|  | 			Detail: fmt.Sprintf("service bridge response bad: %v", err), | ||
|  | 			Code:   int32(remotepb.RpcError_UNKNOWN), | ||
|  | 		} | ||
|  | 	} | ||
|  | 	return hrespBody, nil | ||
|  | } | ||
|  | 
 | ||
|  | func Call(ctx netcontext.Context, service, method string, in, out proto.Message) error { | ||
|  | 	if ns := NamespaceFromContext(ctx); ns != "" { | ||
|  | 		if fn, ok := NamespaceMods[service]; ok { | ||
|  | 			fn(in, ns) | ||
|  | 		} | ||
|  | 	} | ||
|  | 
 | ||
|  | 	if f, ctx, ok := callOverrideFromContext(ctx); ok { | ||
|  | 		return f(ctx, service, method, in, out) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	// Handle already-done contexts quickly. | ||
|  | 	select { | ||
|  | 	case <-ctx.Done(): | ||
|  | 		return ctx.Err() | ||
|  | 	default: | ||
|  | 	} | ||
|  | 
 | ||
|  | 	c := fromContext(ctx) | ||
|  | 	if c == nil { | ||
|  | 		// Give a good error message rather than a panic lower down. | ||
|  | 		return errNotAppEngineContext | ||
|  | 	} | ||
|  | 
 | ||
|  | 	// Apply transaction modifications if we're in a transaction. | ||
|  | 	if t := transactionFromContext(ctx); t != nil { | ||
|  | 		if t.finished { | ||
|  | 			return errors.New("transaction context has expired") | ||
|  | 		} | ||
|  | 		applyTransaction(in, &t.transaction) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	// Default RPC timeout is 60s. | ||
|  | 	timeout := 60 * time.Second | ||
|  | 	if deadline, ok := ctx.Deadline(); ok { | ||
|  | 		timeout = deadline.Sub(time.Now()) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	data, err := proto.Marshal(in) | ||
|  | 	if err != nil { | ||
|  | 		return err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	ticket := c.req.Header.Get(ticketHeader) | ||
|  | 	// Use a test ticket under test environment. | ||
|  | 	if ticket == "" { | ||
|  | 		if appid := ctx.Value(&appIDOverrideKey); appid != nil { | ||
|  | 			ticket = appid.(string) + defaultTicketSuffix | ||
|  | 		} | ||
|  | 	} | ||
|  | 	// Fall back to use background ticket when the request ticket is not available in Flex or dev_appserver. | ||
|  | 	if ticket == "" { | ||
|  | 		ticket = DefaultTicket() | ||
|  | 	} | ||
|  | 	if dri := c.req.Header.Get(devRequestIdHeader); IsDevAppServer() && dri != "" { | ||
|  | 		ticket = dri | ||
|  | 	} | ||
|  | 	req := &remotepb.Request{ | ||
|  | 		ServiceName: &service, | ||
|  | 		Method:      &method, | ||
|  | 		Request:     data, | ||
|  | 		RequestId:   &ticket, | ||
|  | 	} | ||
|  | 	hreqBody, err := proto.Marshal(req) | ||
|  | 	if err != nil { | ||
|  | 		return err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	hrespBody, err := c.post(hreqBody, timeout) | ||
|  | 	if err != nil { | ||
|  | 		return err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	res := &remotepb.Response{} | ||
|  | 	if err := proto.Unmarshal(hrespBody, res); err != nil { | ||
|  | 		return err | ||
|  | 	} | ||
|  | 	if res.RpcError != nil { | ||
|  | 		ce := &CallError{ | ||
|  | 			Detail: res.RpcError.GetDetail(), | ||
|  | 			Code:   *res.RpcError.Code, | ||
|  | 		} | ||
|  | 		switch remotepb.RpcError_ErrorCode(ce.Code) { | ||
|  | 		case remotepb.RpcError_CANCELLED, remotepb.RpcError_DEADLINE_EXCEEDED: | ||
|  | 			ce.Timeout = true | ||
|  | 		} | ||
|  | 		return ce | ||
|  | 	} | ||
|  | 	if res.ApplicationError != nil { | ||
|  | 		return &APIError{ | ||
|  | 			Service: *req.ServiceName, | ||
|  | 			Detail:  res.ApplicationError.GetDetail(), | ||
|  | 			Code:    *res.ApplicationError.Code, | ||
|  | 		} | ||
|  | 	} | ||
|  | 	if res.Exception != nil || res.JavaException != nil { | ||
|  | 		// This shouldn't happen, but let's be defensive. | ||
|  | 		return &CallError{ | ||
|  | 			Detail: "service bridge returned exception", | ||
|  | 			Code:   int32(remotepb.RpcError_UNKNOWN), | ||
|  | 		} | ||
|  | 	} | ||
|  | 	return proto.Unmarshal(res.Response, out) | ||
|  | } | ||
|  | 
 | ||
|  | func (c *context) Request() *http.Request { | ||
|  | 	return c.req | ||
|  | } | ||
|  | 
 | ||
|  | func (c *context) addLogLine(ll *logpb.UserAppLogLine) { | ||
|  | 	// Truncate long log lines. | ||
|  | 	// TODO(dsymonds): Check if this is still necessary. | ||
|  | 	const lim = 8 << 10 | ||
|  | 	if len(*ll.Message) > lim { | ||
|  | 		suffix := fmt.Sprintf("...(length %d)", len(*ll.Message)) | ||
|  | 		ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	c.pendingLogs.Lock() | ||
|  | 	c.pendingLogs.lines = append(c.pendingLogs.lines, ll) | ||
|  | 	c.pendingLogs.Unlock() | ||
|  | } | ||
|  | 
 | ||
|  | var logLevelName = map[int64]string{ | ||
|  | 	0: "DEBUG", | ||
|  | 	1: "INFO", | ||
|  | 	2: "WARNING", | ||
|  | 	3: "ERROR", | ||
|  | 	4: "CRITICAL", | ||
|  | } | ||
|  | 
 | ||
|  | func logf(c *context, level int64, format string, args ...interface{}) { | ||
|  | 	if c == nil { | ||
|  | 		panic("not an App Engine context") | ||
|  | 	} | ||
|  | 	s := fmt.Sprintf(format, args...) | ||
|  | 	s = strings.TrimRight(s, "\n") // Remove any trailing newline characters. | ||
|  | 	c.addLogLine(&logpb.UserAppLogLine{ | ||
|  | 		TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3), | ||
|  | 		Level:         &level, | ||
|  | 		Message:       &s, | ||
|  | 	}) | ||
|  | 	// Only duplicate log to stderr if not running on App Engine second generation | ||
|  | 	if !IsSecondGen() { | ||
|  | 		log.Print(logLevelName[level] + ": " + s) | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | // flushLog attempts to flush any pending logs to the appserver. | ||
|  | // It should not be called concurrently. | ||
|  | func (c *context) flushLog(force bool) (flushed bool) { | ||
|  | 	c.pendingLogs.Lock() | ||
|  | 	// Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious. | ||
|  | 	n, rem := 0, 30<<20 | ||
|  | 	for ; n < len(c.pendingLogs.lines); n++ { | ||
|  | 		ll := c.pendingLogs.lines[n] | ||
|  | 		// Each log line will require about 3 bytes of overhead. | ||
|  | 		nb := proto.Size(ll) + 3 | ||
|  | 		if nb > rem { | ||
|  | 			break | ||
|  | 		} | ||
|  | 		rem -= nb | ||
|  | 	} | ||
|  | 	lines := c.pendingLogs.lines[:n] | ||
|  | 	c.pendingLogs.lines = c.pendingLogs.lines[n:] | ||
|  | 	c.pendingLogs.Unlock() | ||
|  | 
 | ||
|  | 	if len(lines) == 0 && !force { | ||
|  | 		// Nothing to flush. | ||
|  | 		return false | ||
|  | 	} | ||
|  | 
 | ||
|  | 	rescueLogs := false | ||
|  | 	defer func() { | ||
|  | 		if rescueLogs { | ||
|  | 			c.pendingLogs.Lock() | ||
|  | 			c.pendingLogs.lines = append(lines, c.pendingLogs.lines...) | ||
|  | 			c.pendingLogs.Unlock() | ||
|  | 		} | ||
|  | 	}() | ||
|  | 
 | ||
|  | 	buf, err := proto.Marshal(&logpb.UserAppLogGroup{ | ||
|  | 		LogLine: lines, | ||
|  | 	}) | ||
|  | 	if err != nil { | ||
|  | 		log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err) | ||
|  | 		rescueLogs = true | ||
|  | 		return false | ||
|  | 	} | ||
|  | 
 | ||
|  | 	req := &logpb.FlushRequest{ | ||
|  | 		Logs: buf, | ||
|  | 	} | ||
|  | 	res := &basepb.VoidProto{} | ||
|  | 	c.pendingLogs.Lock() | ||
|  | 	c.pendingLogs.flushes++ | ||
|  | 	c.pendingLogs.Unlock() | ||
|  | 	if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil { | ||
|  | 		log.Printf("internal.flushLog: Flush RPC: %v", err) | ||
|  | 		rescueLogs = true | ||
|  | 		return false | ||
|  | 	} | ||
|  | 	return true | ||
|  | } | ||
|  | 
 | ||
|  | const ( | ||
|  | 	// Log flushing parameters. | ||
|  | 	flushInterval      = 1 * time.Second | ||
|  | 	forceFlushInterval = 60 * time.Second | ||
|  | ) | ||
|  | 
 | ||
|  | func (c *context) logFlusher(stop <-chan int) { | ||
|  | 	lastFlush := time.Now() | ||
|  | 	tick := time.NewTicker(flushInterval) | ||
|  | 	for { | ||
|  | 		select { | ||
|  | 		case <-stop: | ||
|  | 			// Request finished. | ||
|  | 			tick.Stop() | ||
|  | 			return | ||
|  | 		case <-tick.C: | ||
|  | 			force := time.Now().Sub(lastFlush) > forceFlushInterval | ||
|  | 			if c.flushLog(force) { | ||
|  | 				lastFlush = time.Now() | ||
|  | 			} | ||
|  | 		} | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | func ContextForTesting(req *http.Request) netcontext.Context { | ||
|  | 	return toContext(&context{req: req}) | ||
|  | } |